Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 35866
b: refs/heads/autosynth-dataproc
c: 67668c1
h: refs/heads/master
  • Loading branch information
sduskis authored May 1, 2019
1 parent 9515460 commit c545f72
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ refs/tags/v0.78.0: 62d4bd30605ab3578f9a08d84487fb0b33ac2ff5
refs/tags/v0.79.0: 82287b570708748c411d05c40f3932cff9606feb
refs/tags/v0.80.0: f745e744d38e4fe636f34d0e04795ba3d014287d
refs/tags/v0.81.0: ed3a0c85339ea6b73560b9a570abfbb76b93a263
refs/heads/autosynth-dataproc: 652794a69a167d6223e9abdd512066c21835072d
refs/heads/autosynth-dataproc: 67668c1411169338374b050eae50ed650e318c54
refs/heads/autosynth-securitycenter: b24087060036e623e57d2454ba5dabeaf1e530c5
refs/heads/autosynth-talent: 4ca901879f86aab61091cea52e8a9b653639df24
refs/tags/v0.82.0: 7b9807d5d0a400c757b8905fee768be4c85eba25
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ public String getTopicNameString() {
* @return the message ID wrapped in a future.
*/
public ApiFuture<String> publish(PubsubMessage message) {
if (shutdown.get()) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");

final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));
Expand Down Expand Up @@ -288,23 +286,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
public void onSuccess(PublishResponse result) {
try {
if (result.getMessageIdsCount() != outstandingBatch.size()) {
Throwable t =
outstandingBatch.onFailure(
new IllegalStateException(
String.format(
"The publish result count %s does not match "
+ "the expected %s results. Please contact Cloud Pub/Sub support "
+ "if this frequently occurs",
result.getMessageIdsCount(), outstandingBatch.size()));
for (OutstandingPublish oustandingMessage : outstandingBatch.outstandingPublishes) {
oustandingMessage.publishResult.setException(t);
}
return;
}

Iterator<OutstandingPublish> messagesResultsIt =
outstandingBatch.outstandingPublishes.iterator();
for (String messageId : result.getMessageIdsList()) {
messagesResultsIt.next().publishResult.set(messageId);
result.getMessageIdsCount(), outstandingBatch.size())));
} else {
outstandingBatch.onSuccess(result.getMessageIdsList());
}
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
Expand All @@ -314,9 +304,7 @@ public void onSuccess(PublishResponse result) {
@Override
public void onFailure(Throwable t) {
try {
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
outstandingPublish.publishResult.setException(t);
}
outstandingBatch.onFailure(t);
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
}
Expand Down Expand Up @@ -350,6 +338,19 @@ private List<PubsubMessage> getMessages() {
}
return results;
}

private void onFailure(Throwable t) {
for (OutstandingPublish outstandingPublish : outstandingPublishes) {
outstandingPublish.publishResult.setException(t);
}
}

private void onSuccess(Iterable<String> results) {
Iterator<OutstandingPublish> messagesResultsIt = outstandingPublishes.iterator();
for (String messageId : results) {
messagesResultsIt.next().publishResult.set(messageId);
}
}
}

private static final class OutstandingPublish {
Expand All @@ -376,10 +377,9 @@ public BatchingSettings getBatchingSettings() {
* should be invoked prior to deleting the {@link Publisher} object in order to ensure that no
* pending messages are lost.
*/
public void shutdown() throws Exception {
if (shutdown.getAndSet(true)) {
throw new IllegalStateException("Cannot shut down a publisher already shut-down.");
}
public void shutdown() {
Preconditions.checkState(
!shutdown.getAndSet(true), "Cannot shut down a publisher already shut-down.");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
Expand Down

0 comments on commit c545f72

Please # to comment.