Skip to content

Commit

Permalink
Update docs (#443)
Browse files Browse the repository at this point in the history
* Resolves #337 add more info about how pause works

* Resolve #349, resolve #284
  • Loading branch information
hyperlink authored Aug 11, 2016
1 parent 197cb01 commit 383651c
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1
- [For a new consumer how do I start consuming from the latest message in a partition?](#for-a-new-consumer-how-do-i-start-consuming-from-the-latest-message-in-a-partition)
- [FailedToRebalanceConsumerError: Exception: NODE_EXISTS[-110]](#failedtorebalanceconsumererror-exception-node_exists-110)
- [HighLevelConsumer does not consume on all partitions](#highlevelconsumer-does-not-consume-on-all-partitions)
- [How to throttle messages / control the concurrency of processing messages](#how-to-throttle-messages--control-the-concurrency-of-processing-messages)
- [Running Tests](#running-tests)
- [LICENSE - "MIT"](#license---mit)

Expand Down Expand Up @@ -361,10 +362,10 @@ consumer.setOffset('topic', 0, 0);
```

### pause()
Pause the consumer
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).

### resume()
Resume the consumer
Resume the consumer. Resumes the fetch loop.

### pauseTopics(topics)
Pause specify topics
Expand Down Expand Up @@ -522,10 +523,10 @@ consumer.setOffset('topic', 0, 0);
```

### pause()
Pause the consumer
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).

### resume()
Resume the consumer
Resume the consumer. Resumes the fetch loop.

### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Expand Down Expand Up @@ -697,6 +698,12 @@ Your partition will be stuck if the `fetchMaxBytes` is smaller than the message

Reference to issue [#339](https://github.com/SOHU-Co/kafka-node/issues/339)

## How to throttle messages / control the concurrency of processing messages

1. Create a `async.queue` with message processor and concurrency of one (the message processor itself is wrapped with `setImmediate` so it will not freeze up the event loop)
2. Set the `queue.drain` to resume the consumer
3. The handler for consumer's `message` event pauses the consumer and pushes the message to the queue.

# Running Tests

### Install Docker
Expand Down

0 comments on commit 383651c

Please # to comment.