Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Producer OOM precaution #10

Closed
wants to merge 3 commits into from
Closed

Producer OOM precaution #10

wants to merge 3 commits into from

Conversation

ZhuYouzhi
Copy link
Contributor

Hi, Edenhill.

In current librdkafka, if producer produce data at a rate faster than broker can handle, then data will accumulate in rk->rk_op, will may cause OOM (This does happen in my case).

To prevent this, this patch adds a new parameter max_payload_size, if sum of payload exceeds max_payload_size, comming msgs will be dropped.

@edenhill
Copy link
Contributor

edenhill commented Jun 3, 2013

Hi Zhu!

Thanks for your contribution.

I added a similar setting to the 0.7 branch a couple of weeks ago.
There are two differences:

  • instead of limiting on total payload size it limits on the total number of messages.
  • it lets the application know (by -1 return from rd_kafka_produce() and errno ENOBUFS) that it has dropped the message, allowing the application to take appropriate actions, whatever it may be (spool, backpressure, log, ..).

See this commit:
06fd28e

Configuration: producer.max_outq_msg_cnt

I've merged the 0.7 branch down to master, including this commit.
Let me know if the existing solution (limit on messages, not size) is good enough for you.

@ZhuYouzhi
Copy link
Contributor Author

Hi, Edenhill.

"it lets the application know (by -1 return from rd_kafka_produce() and errno ENOBUFS) that it has dropped the message, allowing the application to take appropriate actions, " is cool. Thanks.

There are two little question I'd like to discuss:
(1)Limit only on queue len may be not enough, because msg size may vary. Limit on both queue size and queue length may be a good idea.
(2) As for droppping message(due to exceed max_outq_msg_cnt), current patch can cause memory allocated for payload leaks, if I get it right.

@edenhill
Copy link
Contributor

edenhill commented Jun 3, 2013

  1. This is true, but it will add the cost of an additional atomic variable barrier/sync.
    Dont you think the application can assume an average, or safe, messsage size and use that to calculate max_outq_msg_cnt accordingly?

  2. I dont think your patch leaks memory as op_destroy() will clean up allocated resources.
    But the max message size check is done too late, it should be checked before any memory has been allocated, like the max_outq_msg_cnt does.

@ZhuYouzhi
Copy link
Contributor Author

Hi, Edenhill. Thanks for you reply

@ZhuYouzhi ZhuYouzhi closed this Jun 4, 2013
@winbatch winbatch mentioned this pull request Feb 11, 2014
@vdeters vdeters mentioned this pull request Apr 12, 2016
@DavidLiuXh DavidLiuXh mentioned this pull request Apr 26, 2017
9 tasks
@vk-coder vk-coder mentioned this pull request May 16, 2018
7 tasks
kwdubuc pushed a commit to SolaceDev/librdkafka that referenced this pull request Apr 2, 2024
azat pushed a commit to azat-archive/librdkafka that referenced this pull request Jul 9, 2024
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants