Skip to content

Operations

Chris Riccomini edited this page Oct 7, 2016 · 1 revision

When running the Kafka connect BigQuery connector (KCBQ) at scale, you might see log messages like so:

[2016-10-04 20:14:20,611] ERROR Commit of WorkerSinkTask{id=some-id-7} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Kafka connect is telling you that the it is unable to call poll() fast enough. You might also see errors like this:

[2016-10-05 23:46:03,181] WARN Commit of WorkerSinkTask{id=some-id-7} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)

Kafka connect is telling you that the time it's taking to commit offsets is longer than expected.

To understand these issues, let's have a look at the iteration method in the WorkerSinkTask in Kafka connect.

protected void iteration() {
    long now = time.milliseconds();

    // Maybe commit
    if (!committing && now >= nextCommit) {
        commitOffsets(now, false);
        nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
    }

    // Check for timed out commits
    long commitTimeout = commitStarted + workerConfig.getLong(
            WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
    if (committing && now >= commitTimeout) {
        log.warn("Commit of {} offsets timed out", this);
        commitFailures++;
        committing = false;
    }

    // And process messages
    long timeoutMs = Math.max(nextCommit - now, 0);
    poll(timeoutMs);
}

This method does three things:

  1. Periodically tries to commit offsets.
  2. Checks if the commit took longer than the OFFSET_COMMIT_TIMEOUT_MS_CONFIG value (offset.flush.timeout.ms). If it does, it complains about it.
  3. Calls poll() to get more messages to process.

This loop is important because two KCBQ does two things during this loop. During the commitOffsets call, KCBQ's flush() method is called. This is when KCBQ sends all of its messages to BigQuery. During the poll() method, KCBQ's put() method is called for each record that it receives, and it buffers these messages until flush() is called.

The first error log ("Commit cannot be completed...") is logged when the entire loop takes longer than consumer.session.timeout.ms. The default is 30 seconds. The second error log ("Commit of WorkerSinkTask{id=some-id-7} offsets timed out...") is logged whenever the commitOffsets call takes longer than offset.flush.timeout.ms. The default is 5 seconds.

In the context of what KCBQ is doing, these settings can be too low if you have a large number of records to write. The defaults mean that KCBQ must write all messages to BigQuery within 30 seconds, or a rebalance is triggered (session.timeout.ms is exceeded). Additionally, if it takes longer than 5 seconds to write all messages to BigQuery, you'll see the WARN.

The recommended solution if you're seeing these errors is to increase the consumer.session.timeout.ms and offset.flush.timeout.ms. The cost of doing so will be that rebalances will take longer to occur, since the session timeout on a failure will be longer.

Note: notice the consumer prefix on session.timeout.ms above. The session.timeout.ms configuration parameter is a Kafka consumer configuration parameter. To set consumer configs in Kafka connect, you must prepend a consumer. prefix.

Clone this wiki locally