Added by Peter Amstutz about 9 years ago. Updated almost 8 years ago.

Implement log throttling in crunch-run. Behavior should be equivalent to services/api/lib/crunch_dispatch.rb#rate_limit

Use same configuration parameters as currently in API server config:

  # These two settings control how frequently log events are flushed to the
  # database.  Log lines are buffered until either crunch_log_bytes_per_event
  # has been reached or crunch_log_seconds_between_events has elapsed since
  # the last flush.
  crunch_log_bytes_per_event: 4096
  crunch_log_seconds_between_events: 1

  # The sample period for throttling logs, in seconds.
  crunch_log_throttle_period: 60

  # Maximum number of bytes that job can log over crunch_log_throttle_period
  # before being silenced until the end of the period.
  crunch_log_throttle_bytes: 65536

  # Maximum number of lines that job can log over crunch_log_throttle_period
  # before being silenced until the end of the period.
  crunch_log_throttle_lines: 1024

  # Maximum bytes that may be logged by a single job.  Log bytes that are
  # silenced by throttling are not counted against this total.
  crunch_limit_log_bytes_per_job: 67108864

Above parameters should be published in the discovery document for use by crunch-run.

Updated by Tom Morris almost 8 years ago

Updated by Radhika Chippada almost 8 years ago

Updated by Peter Amstutz almost 8 years ago

Updated by Radhika Chippada almost 8 years ago

Updated by Radhika Chippada almost 8 years ago

8019-crunchrun-log-throttle branch at 9dabca0eedbc9f842d542fea3463a441140d590c

Updated by Peter Amstutz almost 8 years ago

Reviewing 8019-crunchrun-log-throttle @ 9dabca0eedbc9f842d542fea3463a441140d590c

  • crunch-run needs to work with API servers that don't publish the crunchLog* settings. Provide defaults. It would also be better to copy the settings out of the discovery document just once into fields on ThrottledLogger/ArvLogWriter instead of looking them up from the discovery document every time.
  • The value used to initialize time.NewTicker should be crunchLogSecondsBetweenEvents
    func (tl *ThrottledLogger) flusher() {
        ticker := time.NewTicker(tl.crunchLogSecondsBetweenEvents)
  • Rename the ThrottledLogger.stopping channel to ThrottledLogger.flush.
            case _, open := <-tl.flush:
                // if closed, flush tl.buf, then exit the loop
                stopping = !open
  • ThrottledLogger.Write should check crunchLogBytesPerEvent and flush when exceeded.
      if tl.buf >= tl.crunchLogBytesPerEvent {
        tl.flush <- struct{}
  • This should reuse the variable 'now' that you've already stored previously.
            arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(int(crunchLogThrottlePeriod.(float64))))
  • There's no point in having ArvLogWriter.rateLimit return the value of logThrottleIsOpen (the first bool) if it is just discarded (L242)
  • Similarly, ArvLogWriter.rateLimit only return an error when checking arvlog.ArvClient.Discovery() but it shouldn't be doing that (1st comment) so it doesn't need to return err either.
  • Please rename stderrBufToFlush and stderrFlushedAt to bufToFlush and flushedAt because they are not stderr-specific.
  • This regular expression is wrong for crunch-run logs.
    var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)

    Should be
    var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
  • Consider passing in a single value for now to ArvLogWriter.rateLimit() instead of calling time.Now() many times
Updated by Peter Amstutz almost 8 years ago

To amend my previous comment, from crunch_dispatch.rb:

          # rate_limit returns true or false as to whether to actually log
          # the line or not.  It also modifies "line" in place to replace
          # it with an error if a logging limit is tripped.

logging.go is ignoring the return value of ArvLogWriter.rateLimit(), but it shouldn't.

Updated by Radhika Chippada almost 8 years ago

Peter: Addressed all your comments at c5c09df, except "ThrottledLogger.Write should check crunchLogBytesPerEvent and flush when exceeded"

When I try to add this to ThrottledLogger.Write, the tests are hanging (I ran all the tests in crunch-run and do not know which specific test(s) are hanging). Please advise.

Updated by Radhika Chippada almost 8 years ago

Updated by Peter Amstutz almost 8 years ago

8019-crunchrun-log-throttle @ c5c09df38966595b4f27c402d1e9ae5500d6d201

  • Is there a particular reason to skip empty lines?
            } else if len(line) == 0 {
  • Can you update this comment to say "once per crunchLogSecondsBetweenEvents"
    // (b) batches log messages and only calls the underlying Writer at most once
    // per second.
  • It already sets arvlog.logThrottleIsOpen inside rateLimit. The return value is supposed to be whether to log that particular line or not. So this should be changed from:
    logOpen, msg := arvlog.rateLimit(line, now)
    arvlog.bufToFlush.WriteString(string(msg) + "\n")
    rvlog.logThrottleIsOpen = logOpen

    logOpen, msg := arvlog.rateLimit(line, now)
    if logOpen {
         arvlog.bufToFlush.WriteString(string(msg) + "\n")
  • The logic for "Rate-limiting partial segments of long lines" doesn't make sense. I think you want remove the "partialLine" and "skipCounts" flags and adjust the if statement (this works in conjunction with the previous point, where returning false suppresses just the current line instead of closing the throttle entirely):
    if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
        arvlog.logThrottlePartialLineLastAt = now
    } else {
        return false, line
  • You mentioned tests are deadlocking when you write to the flush channel from ThrottledLogger.Write. I think I know why. If that method gets called multiple times, it would send the flush signal multiple times. Since the channel size is 1, it blocks after the 1st send. So you need an additional flag:
    type ThrottledLogger struct {
      pendingFlush bool

    func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
      if len(tl.buf) >= crunchLogBytesPerEvent && !tl.pendingFLush {
        tl.pendingFlush = true
        tl.flush <- struct{}

    func (tl *ThrottledLogger) flusher() {
            ready, tl.buf = tl.buf, nil
                    tl.pendingFlush = false
Updated by Radhika Chippada almost 8 years ago

Is there a particular reason to skip empty lines?

This is how we are doing in the original code (crunch_dispatch.rb) as well. Removing this is resulting an extra empty line at the end of log file, which would need updating the tests. I am not sure if empty lines are desired. Left this as is.

Can you update this comment to say "once per crunchLogSecondsBetweenEvents"


It already sets arvlog.logThrottleIsOpen inside rateLimit. The return value is supposed to be whether to log that particular line or not. So this should be changed from ...


The logic for "Rate-limiting partial segments of long lines" doesn't make sense. I think you want remove the "partialLine" and "skipCounts" flags ...

Removed skipCounts, but need partialLine flag to ensure we only log the message once per "Rate-limiting partial segments of long lines to one every n seconds". Also fixed an issue in block at line 301 in this regards

You mentioned tests are deadlocking when you write to the flush channel from ThrottledLogger.Write. I think I know why. If that method gets called multiple times, it would send the flush signal multiple times. Since the channel size is 1, it blocks after the 1st send. So you need an additional flag "pendingFlush"

Even after adding this the tests are "taking too long". On further debugging, I narrowed it down to the test "TestWriteLogsLarge". It is basically flooding the log and causing many flusher events from tl.Write func. Commented out this logic for now. It does not appear that we want to do this for the case where we have log flooding situation. Do we instead want to set the ticker in flusher func back to 1 or a factor of crunchLogSecondsBetweenEvents, whichever is greater?

Updated by Radhika Chippada almost 8 years ago

At e54bce82

Two tests "TestCrunchStat" and "TestNodeInfoLog" were failing. This was due to rate limiting alone causing loss of log data (delaying writing to server until the bufToFlush or bufFlushedAt conditions were exceeded). The following update was needed.

-       if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
-               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
+       if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
+               arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {

However, this required commenting out line 638 in crunchrun_test.rb . With this line in, TestWriteLogsLarge is failing (without this test, no issues and all other tests pass). The test suite panicking because there is no api.Calls in api.Content

c.Check(api.Content[api.Calls-1]["container"].(arvadosclient.Dict)["log"], NotNil)

Please take a look and comment as to how to proceed. Thanks

Updated by Radhika Chippada almost 8 years ago

Updated by Radhika Chippada almost 8 years ago

Applied in changeset arvados|commit:3b4325c210516d1f61838fb26e06e0a11c31ce6d.


