Skip to content

Commit

Permalink
CAMEL-19665: camel-couchbase should be batch poll consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed Dec 27, 2024
1 parent 51f2706 commit d72465a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.camel.component.couchbase;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -28,10 +30,11 @@
import com.couchbase.client.java.view.ViewResult;
import com.couchbase.client.java.view.ViewRow;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultScheduledPollConsumer;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.support.resume.ResumeStrategyHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,14 +45,14 @@
import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY;
import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME;

public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
public class CouchbaseConsumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {

private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class);

private final Lock lock = new ReentrantLock();
private final CouchbaseEndpoint endpoint;
private final Bucket bucket;
private final Collection collection;
private Bucket bucket;
private Collection collection;
private ViewOptions viewOptions;

private ResumeStrategy resumeStrategy;
Expand All @@ -58,22 +61,23 @@ public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor pr
super(endpoint, processor);
this.bucket = client;
this.endpoint = endpoint;
}

@Override
protected void doInit() {
Scope scope;
if (endpoint.getScope() != null) {
scope = client.scope(endpoint.getScope());
scope = bucket.scope(endpoint.getScope());
} else {
scope = client.defaultScope();
scope = bucket.defaultScope();
}

if (endpoint.getCollection() != null) {
this.collection = scope.collection(endpoint.getCollection());
} else {
this.collection = client.defaultCollection();
this.collection = bucket.defaultCollection();
}
}

@Override
protected void doInit() {
this.viewOptions = ViewOptions.viewOptions();
int limit = endpoint.getLimit();
if (limit > 0) {
Expand Down Expand Up @@ -122,10 +126,12 @@ protected int poll() throws Exception {
forceConsumerAsReady();

if (LOG.isTraceEnabled()) {
LOG.trace("ViewResponse = {}", result);
LOG.trace("ViewResponse: {}", result);
}

String consumerProcessedStrategy = endpoint.getConsumerProcessedStrategy();

Queue<Object> exchanges = new ArrayDeque<>();
for (ViewRow row : result.rows()) {
Object doc;
String id = row.id().get();
Expand All @@ -139,45 +145,60 @@ protected int poll() throws Exception {
String designDocumentName = endpoint.getDesignDocumentName();
String viewName = endpoint.getViewName();

Exchange exchange = createExchange(false);
try {
exchange.getIn().setBody(doc);
exchange.getIn().setHeader(HEADER_ID, id);
exchange.getIn().setHeader(HEADER_KEY, key);
exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName);
exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);

if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Deleting doc with ID {}", id);
}
CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(),
endpoint.getProducerRetryPause());
} else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Filtering out ID {}", id);
}
// add filter for already processed docs
} else {
LOG.trace("No strategy set for already processed docs, beware of duplicates!");
}
Exchange exchange = createExchange(true);
exchange.getIn().setBody(doc);
exchange.getIn().setHeader(HEADER_ID, id);
exchange.getIn().setHeader(HEADER_KEY, key);
exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName);
exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);

logDetails(id, doc, key, designDocumentName, viewName, exchange);

getProcessor().process(exchange);
} catch (Exception e) {
this.getExceptionHandler().handleException("Error processing exchange.", exchange, e);
} finally {
releaseExchange(exchange, false);
if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Deleting doc with ID {}", id);
}
CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(),
endpoint.getProducerRetryPause());
} else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Filtering out ID {}", id);
}
// add filter for already processed docs
} else {
LOG.trace("No strategy set for already processed docs, beware of duplicates!");
}

logDetails(id, doc, key, designDocumentName, viewName, exchange);
exchanges.add(exchange);
}

return result.rows().size();
return processBatch(exchanges);
} finally {
lock.unlock();
}
}

@Override
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
int answer = total;
if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.",
this.maxMessagesPerPoll, total);
total = this.maxMessagesPerPoll;
}

for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
Exchange exchange = (Exchange) exchanges.poll();
exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1);
this.pendingExchanges = total - index - 1;
getProcessor().process(exchange);
}

return answer;
}

private void logDetails(String id, Object doc, String key, String designDocumentName, String viewName, Exchange exchange) {
if (LOG.isTraceEnabled()) {
LOG.trace("Created exchange = {}", exchange);
Expand All @@ -188,7 +209,6 @@ private void logDetails(String id, Object doc, String key, String designDocument
LOG.trace("Design Document Name = {}", designDocumentName);
LOG.trace("View Name = {}", viewName);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static void setUpCouchbase() {
cluster.buckets().createBucket(
BucketSettings.create(bucketName).bucketType(BucketType.COUCHBASE).flushEnabled(true));

Bucket bucket = cluster.bucket(bucketName);
cluster.bucket(bucketName);
DesignDocument designDoc = new DesignDocument(
bucketName,
Collections.singletonMap(bucketName, new View("function (doc, meta) { emit(meta.id, doc);}")));
Expand Down

0 comments on commit d72465a

Please # to comment.