diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java index ed171b5fffcde..fde0271f3ad55 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.couchbase; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -28,10 +30,11 @@ import com.couchbase.client.java.view.ViewResult; import com.couchbase.client.java.view.ViewRow; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Processor; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; -import org.apache.camel.support.DefaultScheduledPollConsumer; +import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.support.resume.ResumeStrategyHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +45,14 @@ import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY; import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME; -public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware { +public class CouchbaseConsumer extends ScheduledBatchPollingConsumer implements ResumeAware { private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class); private final Lock lock = new ReentrantLock(); private final CouchbaseEndpoint endpoint; - private final Bucket bucket; - private final Collection collection; + private Bucket bucket; + private Collection collection; private ViewOptions viewOptions; private ResumeStrategy resumeStrategy; @@ -58,22 +61,23 @@ public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor pr super(endpoint, processor); this.bucket = client; this.endpoint = endpoint; + } + + @Override + protected void doInit() { Scope scope; if (endpoint.getScope() != null) { - scope = client.scope(endpoint.getScope()); + scope = bucket.scope(endpoint.getScope()); } else { - scope = client.defaultScope(); + scope = bucket.defaultScope(); } if (endpoint.getCollection() != null) { this.collection = scope.collection(endpoint.getCollection()); } else { - this.collection = client.defaultCollection(); + this.collection = bucket.defaultCollection(); } - } - @Override - protected void doInit() { this.viewOptions = ViewOptions.viewOptions(); int limit = endpoint.getLimit(); if (limit > 0) { @@ -122,10 +126,12 @@ protected int poll() throws Exception { forceConsumerAsReady(); if (LOG.isTraceEnabled()) { - LOG.trace("ViewResponse = {}", result); + LOG.trace("ViewResponse: {}", result); } String consumerProcessedStrategy = endpoint.getConsumerProcessedStrategy(); + + Queue exchanges = new ArrayDeque<>(); for (ViewRow row : result.rows()) { Object doc; String id = row.id().get(); @@ -139,45 +145,60 @@ protected int poll() throws Exception { String designDocumentName = endpoint.getDesignDocumentName(); String viewName = endpoint.getViewName(); - Exchange exchange = createExchange(false); - try { - exchange.getIn().setBody(doc); - exchange.getIn().setHeader(HEADER_ID, id); - exchange.getIn().setHeader(HEADER_KEY, key); - exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); - exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); - - if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Deleting doc with ID {}", id); - } - CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(), - endpoint.getProducerRetryPause()); - } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Filtering out ID {}", id); - } - // add filter for already processed docs - } else { - LOG.trace("No strategy set for already processed docs, beware of duplicates!"); - } + Exchange exchange = createExchange(true); + exchange.getIn().setBody(doc); + exchange.getIn().setHeader(HEADER_ID, id); + exchange.getIn().setHeader(HEADER_KEY, key); + exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); + exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); - logDetails(id, doc, key, designDocumentName, viewName, exchange); - - getProcessor().process(exchange); - } catch (Exception e) { - this.getExceptionHandler().handleException("Error processing exchange.", exchange, e); - } finally { - releaseExchange(exchange, false); + if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Deleting doc with ID {}", id); + } + CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(), + endpoint.getProducerRetryPause()); + } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Filtering out ID {}", id); + } + // add filter for already processed docs + } else { + LOG.trace("No strategy set for already processed docs, beware of duplicates!"); } + + logDetails(id, doc, key, designDocumentName, viewName, exchange); + exchanges.add(exchange); } - return result.rows().size(); + return processBatch(exchanges); } finally { lock.unlock(); } } + @Override + public int processBatch(Queue exchanges) throws Exception { + int total = exchanges.size(); + int answer = total; + if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) { + LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", + this.maxMessagesPerPoll, total); + total = this.maxMessagesPerPoll; + } + + for (int index = 0; index < total && this.isBatchAllowed(); ++index) { + Exchange exchange = (Exchange) exchanges.poll(); + exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index); + exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total); + exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1); + this.pendingExchanges = total - index - 1; + getProcessor().process(exchange); + } + + return answer; + } + private void logDetails(String id, Object doc, String key, String designDocumentName, String viewName, Exchange exchange) { if (LOG.isTraceEnabled()) { LOG.trace("Created exchange = {}", exchange); @@ -188,7 +209,6 @@ private void logDetails(String id, Object doc, String key, String designDocument LOG.trace("Design Document Name = {}", designDocumentName); LOG.trace("View Name = {}", viewName); } - } @Override diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java index 4d9e3d80282c9..851cc87ade829 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java @@ -51,7 +51,7 @@ static void setUpCouchbase() { cluster.buckets().createBucket( BucketSettings.create(bucketName).bucketType(BucketType.COUCHBASE).flushEnabled(true)); - Bucket bucket = cluster.bucket(bucketName); + cluster.bucket(bucketName); DesignDocument designDoc = new DesignDocument( bucketName, Collections.singletonMap(bucketName, new View("function (doc, meta) { emit(meta.id, doc);}")));