Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Wrong offset in a message on gzipped topic using simple Consumer/CosumerStream #1225

Closed
wips opened this issue Apr 6, 2019 · 20 comments
Closed
Assignees
Labels

Comments

@wips
Copy link

wips commented Apr 6, 2019

Bug Report

When I use simple Consumer on gzipped topic I get wrong offsets in messages, the offsets are about twice as big as they should be. After the last message I get the offsetOutOfRange error. Same setup works fine on a non-compressed topic.

Is there any way to overcome or fast-patch the issue? We need simple Consumer to make each application node consume its own partition (there's only one partition in the code I provided here, but we use 3 partitions in production).

Update: tried the same setup with ConsumerStream - same issue only without offsetOutOfRange error

Environment

  • Node version: v10.10.0
  • Kafka-node version: 4.0.4
  • Kafka version: 2.12-2.1.1

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

Client code

// kafka-node-test.js file

const kafkaLogging = require('kafka-node/logging')
kafkaLogging.setLoggerProvider((name) => {
  return {
    debug: (message) => console.debug(`[${name}] ${message}`),
    info: (message) => console.log(`[${name}] ${message}`),
    warn: (message) => console.warn(`[${name}] ${message}`),
    error: (message) => console.error(`[${name}] ${message}`),
  }
})

const { KafkaClient, Consumer } = require('kafka-node')
const clientId = 'a-client-id'
const groupId = 'a-group-id'

const client = new KafkaClient({ clientId })
const payload = [{
  topic: 'a-raw-topic',
  partition: 0
}, {
  topic: 'a-gzip-topic',
  partition: 0
}]
const consumer = new Consumer(client, payload, {
  autoCommit: false,
  fromOffset: false,
  groupId,
})

consumer.on('message', (message) => {
  console.log(`[INFO] Message: ${JSON.stringify(message)}`)
})

consumer.on('error', (error) => {
  console.error(`[ERR] ${JSON.stringify(error)}`)
})

consumer.on('offsetOutOfRange', (error) => {
  console.error(`[ERR] Offset out of range: ${JSON.stringify(error)}`)
})

Set up and run code

# creating a non-compressed topic
aMac:kafka_2.12-2.1.1 viktor_molokostov$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a-raw-topic --partitions 1 --replication-factor 1 --config retention.ms=86400000
Created topic "a-raw-topic".

# creating a gzipped topic
aMac:kafka_2.12-2.1.1 viktor_molokostov$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a-gzip-topic --partitions 1 --replication-factor 1 --config retention.ms=86400000 --config compression.type=gzip
Created topic "a-gzip-topic".

# adding 10 messages to the non-compressed topic
aMac:kafka_2.12-2.1.1 viktor_molokostov$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic a-raw-topic
>1
>2
>3
>4
>5
>6
>7
>8
>9
>10
>^C

# adding 10 messages to the gzipped topic
aMac:kafka_2.12-2.1.1 viktor_molokostov$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic a-gzip-topic
>11
>12
>13
>14
>15
>16
>17
>18
>19
>20
>^C

# running the code
aMac:local-poc viktor_molokostov$ export DEBUG=kafka-node:* && node kafka-node-test.js

Include output with Debug turned on

[kafka-node:KafkaClient] Connect attempt 1
[kafka-node:KafkaClient] Trying to connect to host: localhost port: 9092
[kafka-node:KafkaClient] a-client-id createBroker localhost:9092
[kafka-node:KafkaClient] a-client-id sending versions request to localhost:9092
[kafka-node:KafkaClient] broker socket connected %j
[kafka-node:KafkaClient] connected to socket, trying to load initial metadata
[kafka-node:KafkaClient] missing apiSupport waiting until broker is ready...
[kafka-node:KafkaClient] waitUntilReady [BrokerWrapper localhost:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)]
[kafka-node:KafkaClient] Received versions response from localhost:9092
[kafka-node:KafkaClient] setting api support to %j
[kafka-node:KafkaClient] broker is now ready
[kafka-node:KafkaClient] a-client-id updated internal metadata
[kafka-node:Consumer] consumer ready
[kafka-node:KafkaClient] a-client-id updated internal metadata
[kafka-node:KafkaClient] a-client-id createBroker 192.168.0.101:9092
[kafka-node:KafkaClient] waitUntilReady [BrokerWrapper 192.168.0.101:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)]
[kafka-node:KafkaClient] a-client-id sending versions request to 192.168.0.101:9092
[kafka-node:KafkaClient] Received versions response from 192.168.0.101:9092
[kafka-node:KafkaClient] setting api support to %j
[kafka-node:KafkaClient] broker is now ready
[kafka-node:KafkaClient] a-client-id createBroker 192.168.0.101:9092
[kafka-node:KafkaClient] waitUntilReady [BrokerWrapper 192.168.0.101:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)]
[kafka-node:KafkaClient] a-client-id sending versions request to 192.168.0.101:9092
[kafka-node:KafkaClient] Received versions response from 192.168.0.101:9092
[kafka-node:KafkaClient] setting api support to %j
[kafka-node:KafkaClient] broker is now ready
[INFO] Message: {"topic":"a-raw-topic","value":"1","offset":0,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"2","offset":1,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"3","offset":2,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"4","offset":3,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"5","offset":4,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"6","offset":5,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"7","offset":6,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"8","offset":7,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"9","offset":8,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-raw-topic","value":"10","offset":9,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"11","offset":1,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"12","offset":2,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"13","offset":5,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"14","offset":6,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"15","offset":8,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"16","offset":10,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"17","offset":12,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"18","offset":14,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"19","offset":16,"partition":0,"highWaterOffset":10,"key":null}
[INFO] Message: {"topic":"a-gzip-topic","value":"20","offset":18,"partition":0,"highWaterOffset":10,"key":null}
[ERR] Offset out of range: {"topic":"a-gzip-topic","partition":0}
[ERR] Offset out of range: {"topic":"a-gzip-topic","partition":0}
[ERR] Offset out of range: {"topic":"a-gzip-topic","partition":0}
[ERR] Offset out of range: {"topic":"a-gzip-topic","partition":0}
# ... and it keeps going this way like forever

Thanks for the lib anyway!

@wips wips changed the title Wrong offset in a message on gzipped topic using simple Consumer Wrong offset in a message on gzipped topic using simple Consumer/CosumerStream Apr 6, 2019
@hyperlink hyperlink self-assigned this Apr 6, 2019
@hyperlink hyperlink added the bug label Apr 6, 2019
@wips
Copy link
Author

wips commented Apr 8, 2019

Wow, that was fast! Thank you!

@aikar
Copy link
Contributor

aikar commented Apr 8, 2019

@hyperlink I'm not sure your fix is correct at all. I implemented it that way because the offset is relative within the compressed message according to the protcol.

See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
and
zendesk/ruby-kafka@42821e9

Pretty sure this 'fix' just caused even more problems. Should instead figure out why the reporters offsets were not already coming back relative.

@hyperlink
Copy link
Collaborator

I will do some more testing. Maybe this has to do with the message format version setting.

@aikar
Copy link
Contributor

aikar commented Apr 8, 2019

@hyperlink I think I see the issue here, you should revert that commit ASAP!
91e361d#diff-1b68ab5292618edd9d1e0e4ff2d8cf50R370
In my original commit for reference back to previous code.

The difference here is the reporter is using a compressed topic when I was testing with compressed messages within an uncompressed topic.

because the entire topic is compressed, the flag appears to be getting set as compressed, yet since EVERYTHING is compressed, the message itself is not using relative offsets because it was committed as a single message.

I think the fix would be if first message in the recursive decode is NOT 0, set base to 0.

@aikar
Copy link
Contributor

aikar commented Apr 8, 2019

Or if you can identify correctly if its a wrapper message or not. I think theres 3 message formats:
Uncompressed
Compressed Single standard message
Compressed Set of Messages where the children messages use relative offsets.

@hyperlink
Copy link
Collaborator

I have failed to reproduce the issue @aikar . Here's the branch I created in it I've added a test that sends compressed messages on an uncompressed topic. I updated the test to use a larger payload and verify the offsets (a symptom the reporter had was offset was skipped). I have tested against 2.1 kafka.

@aikar
Copy link
Contributor

aikar commented Apr 8, 2019

@hyperlink your test is only covering gzip compressed topics. the issue is you just reverted a key fix in #1072 for handling compressed MESSAGES.

You need to test the following scenarios:
1 normal topic, send some uncompressed messages, followed by 5 messages in a single compressed message set, followed by more uncompressed messages
1 gzipped topic, same as above. send a few single messages,followed by 5 in same compressed message set (I'm actually curious what happens when the client compresses before sending to broker not knowing if the topic is set to compress?), then a few singles again

@hyperlink
Copy link
Collaborator

my initial test sent a batch of messages to be compressed in one request and it passes. I can add a more comprehensive test to verify.

@aikar
Copy link
Contributor

aikar commented Apr 10, 2019

@hyperlink b9395ed#diff-3b0ab5649e17ef136651e7bbcedddb6aR150

It only uses a gzipped topic. Need to test with a non gzipped topic.

The kafka protocol docs clearly document that offsets will be relative. It just seems there is some different behavior for already gzipped topics though.

I think the proper fix is to revert your change, then here:
91e361d#diff-1b68ab5292618edd9d1e0e4ff2d8cf50R367

Do a message count, and if we are the FIRST message in this message set, and our offset is NOT 0, then change base to 0 and assume we are using exact offsets
but if the first message in the message set IS zero, we are encountering relative offsets and the original base should be added to the offset.

@hyperlink
Copy link
Collaborator

It only uses a gzipped topic. Need to test with a non gzipped topic.

The similar test right below that one creates a non gzipped topic and it passes.

How did compressed offsets work before relative offsets were added?

@aikar
Copy link
Contributor

aikar commented Apr 10, 2019

@hyperlink the server had to decompress and manipulate the message, then recompress it, see the KIP link above, but to quote:

Motivation Today the broker needs to decompress compressed messages, assign offsets to each message and recompress the messages again. This causes additional CPU cost. This KIP is trying to avoid server side recompression.

@hyperlink
Copy link
Collaborator

@aikar I read more of the documentation you linked and it mentions that relative offsets for compressed messages only applies when the magicByte is 1.

So far I have only observed magicByte value of 0. Perhaps this explains why the tests are passing. Maybe there's some backwards compatibility kicking in on the broker?

I don't know how to trigger a higher magicByte. Maybe increase version of our fetch requests?

I have tested log.message.format.version setting on the broker to no effect.

@aikar
Copy link
Contributor

aikar commented Apr 10, 2019

@hyperlink b9395ed#diff-66813b7e60ac69d8b3b1abdcd27c72fdR15

you are only sending 1 message per .send() request.

This is expected that 1 message per .send() will be 1 offset per.

It's when you send multiple messages in a request like {topic, messages: [message1, message2, message3]} that the issue will be encountered.

@aikar
Copy link
Contributor

aikar commented Apr 10, 2019

Actually I guess the better fix is simply:
const offset = vars.offset < base ? base + vars.offset : vars.offset;

That one line change (after reverting your commit) should resolve the issue.

@aikar
Copy link
Contributor

aikar commented Apr 10, 2019

actually no, because base might be 4 and the compressed payload has 20 messages in it, blarg.

using base according to magic byte is the best I guess?

@hyperlink
Copy link
Collaborator

It's when you send multiple messages in a request like {topic, messages: [message1, message2, message3]} that the issue will be encountered.

I initially had my test send all messages in a batch and it passed after several runs.

I am curious to know how the broker decides what magicByte value to use in these responses.

@aikar
Copy link
Contributor

aikar commented Apr 10, 2019

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
MagicByte | This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 1.
Somehow your test are generating legacy messages

@hyperlink
Copy link
Collaborator

Somehow your test are generating legacy messages

I have tried with both the included console producer and kafkacat and still magicByte 0.

@aikar
Copy link
Contributor

aikar commented Apr 11, 2019

I'm really not sure what was different in my environment then. I very clearly saw relative offsets as they caused absolute devastation to us resetting a partition that was offset 100k back to 0, resetting the entire partition :/

@aikar
Copy link
Contributor

aikar commented Apr 11, 2019

note, that risk is now back in this library in its current state. We may not be able to identify the triggering factor in how relative offsets get triggered, but we know it can happen.

i strongly suggest reverting that commit and apply some of the fix ideas I've said above.

checking if first message offset is 0 or not and switching back to absolute for entire set if its not seems like a reasonable approach.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants