From 0386e597afb2302c33e51914113026bd1cb8ddf6 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Tue, 14 Jun 2022 00:00:54 +0300 Subject: [PATCH 01/13] added Parameter object for searchStream and sampleStream methods in TweetsApi --- .../clientlib/HelloWorldStreaming.java | 12 +- .../com/twitter/clientlib/api/TweetsApi.java | 172 +++++++----------- .../query/StreamQueryParameters.java | 138 ++++++++++++++ .../clientlib/query/model/Expansion.java | 30 +++ .../clientlib/query/model/MediaField.java | 32 ++++ .../clientlib/query/model/PlaceField.java | 29 +++ .../clientlib/query/model/PollField.java | 26 +++ .../clientlib/query/model/TweetField.java | 39 ++++ .../clientlib/query/model/UserField.java | 35 ++++ 9 files changed, 397 insertions(+), 116 deletions(-) create mode 100644 src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java create mode 100644 src/main/java/com/twitter/clientlib/query/model/Expansion.java create mode 100644 src/main/java/com/twitter/clientlib/query/model/MediaField.java create mode 100644 src/main/java/com/twitter/clientlib/query/model/PlaceField.java create mode 100644 src/main/java/com/twitter/clientlib/query/model/PollField.java create mode 100644 src/main/java/com/twitter/clientlib/query/model/TweetField.java create mode 100644 src/main/java/com/twitter/clientlib/query/model/UserField.java diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 038e415..a91ea01 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -32,6 +32,8 @@ import com.twitter.clientlib.TweetsStreamListenersExecutor; import com.twitter.clientlib.api.TwitterApi; import com.twitter.clientlib.model.*; +import com.twitter.clientlib.query.StreamQueryParameters; +import com.twitter.clientlib.query.model.TweetField; public class HelloWorldStreaming { @@ -47,13 +49,11 @@ public static void main(String[] args) { TwitterApi apiInstance = new TwitterApi(); apiInstance.setTwitterCredentials(credentials); - Set tweetFields = new HashSet<>(); - tweetFields.add("author_id"); - tweetFields.add("id"); - tweetFields.add("created_at"); - try { - InputStream streamResult = apiInstance.tweets().sampleStream(null, tweetFields, null , null, null, null, 0); + InputStream streamResult = apiInstance.tweets().sampleStream( + new StreamQueryParameters.Builder() + .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) + .build()); // sampleStream with TweetsStreamListenersExecutor Responder responder = new Responder(); TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(streamResult); diff --git a/src/main/java/com/twitter/clientlib/api/TweetsApi.java b/src/main/java/com/twitter/clientlib/api/TweetsApi.java index bf6a641..78e9f3a 100644 --- a/src/main/java/com/twitter/clientlib/api/TweetsApi.java +++ b/src/main/java/com/twitter/clientlib/api/TweetsApi.java @@ -54,7 +54,6 @@ import java.time.OffsetDateTime; import com.twitter.clientlib.model.Problem; import com.twitter.clientlib.model.QuoteTweetLookupResponse; -import java.util.Set; import com.twitter.clientlib.model.SingleTweetLookupResponse; import com.twitter.clientlib.model.StreamingTweet; import com.twitter.clientlib.model.TweetCountsResponse; @@ -68,13 +67,14 @@ import com.twitter.clientlib.model.UsersRetweetsCreateRequest; import com.twitter.clientlib.model.UsersRetweetsCreateResponse; import com.twitter.clientlib.model.UsersRetweetsDeleteResponse; +import com.twitter.clientlib.query.StreamQueryParameters; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Arrays; +import java.util.Set; import java.io.InputStream; import javax.ws.rs.core.GenericType; @@ -1599,13 +1599,7 @@ public okhttp3.Call listsIdTweetsAsync(String id, Integer maxResults, String pag } /** * Build call for sampleStream - * @param expansions A comma separated list of fields to expand. (optional) - * @param tweetFields A comma separated list of Tweet fields to display. (optional) - * @param userFields A comma separated list of User fields to display. (optional) - * @param mediaFields A comma separated list of Media fields to display. (optional) - * @param placeFields A comma separated list of Place fields to display. (optional) - * @param pollFields A comma separated list of Poll fields to display. (optional) - * @param backfillMinutes The number of minutes of backfill requested (optional) + * @param streamParameters {@link StreamQueryParameters} contains the parameters for this request * @param _callback Callback for upload/download progress * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -1616,7 +1610,7 @@ public okhttp3.Call listsIdTweetsAsync(String id, Integer maxResults, String pag 0 The request has failed. - */ - public okhttp3.Call sampleStreamCall(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes, final ApiCallback _callback) throws ApiException { + public okhttp3.Call sampleStreamCall(StreamQueryParameters streamParameters, final ApiCallback _callback) throws ApiException { Object localVarPostBody = null; // create path and map variables @@ -1628,32 +1622,32 @@ public okhttp3.Call sampleStreamCall(Set expansions, Set tweetFi Map localVarCookieParams = new HashMap(); Map localVarFormParams = new HashMap(); - if (expansions != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "expansions", expansions)); + if (streamParameters.getExpansions() != null && !streamParameters.getExpansions().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "expansions", streamParameters.getExpansions())); } - if (tweetFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "tweet.fields", tweetFields)); + if (streamParameters.getTweetFields() != null && !streamParameters.getTweetFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "tweet.fields", streamParameters.getTweetFields())); } - if (userFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "user.fields", userFields)); + if (streamParameters.getUserFields() != null && !streamParameters.getUserFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "user.fields", streamParameters.getUserFields())); } - if (mediaFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "media.fields", mediaFields)); + if (streamParameters.getMediaFields() != null && !streamParameters.getMediaFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "media.fields", streamParameters.getMediaFields())); } - if (placeFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "place.fields", placeFields)); + if (streamParameters.getPlaceFields() != null && !streamParameters.getPlaceFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "place.fields", streamParameters.getPlaceFields())); } - if (pollFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "poll.fields", pollFields)); + if (streamParameters.getPollFields() != null && !streamParameters.getPollFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "poll.fields", streamParameters.getPollFields())); } - if (backfillMinutes != null) { - localVarQueryParams.addAll(localVarApiClient.parameterToPair("backfill_minutes", backfillMinutes)); + if (streamParameters.getBackFillMinutes() != null) { + localVarQueryParams.addAll(localVarApiClient.parameterToPair("backfill_minutes", streamParameters.getBackFillMinutes())); } final String[] localVarAccepts = { @@ -1677,10 +1671,10 @@ public okhttp3.Call sampleStreamCall(Set expansions, Set tweetFi } @SuppressWarnings("rawtypes") - private okhttp3.Call sampleStreamValidateBeforeCall(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes, final ApiCallback _callback) throws ApiException { + private okhttp3.Call sampleStreamValidateBeforeCall(StreamQueryParameters streamParameters, final ApiCallback _callback) throws ApiException { - okhttp3.Call localVarCall = sampleStreamCall(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes, _callback); + okhttp3.Call localVarCall = sampleStreamCall(streamParameters, _callback); return localVarCall; } @@ -1688,13 +1682,7 @@ private okhttp3.Call sampleStreamValidateBeforeCall(Set expansions, Set< /** * Sample stream * Streams a deterministic 1% of public Tweets. - * @param expansions A comma separated list of fields to expand. (optional) - * @param tweetFields A comma separated list of Tweet fields to display. (optional) - * @param userFields A comma separated list of User fields to display. (optional) - * @param mediaFields A comma separated list of Media fields to display. (optional) - * @param placeFields A comma separated list of Place fields to display. (optional) - * @param pollFields A comma separated list of Poll fields to display. (optional) - * @param backfillMinutes The number of minutes of backfill requested (optional) + * @param streamParameters {@link StreamQueryParameters} contains the parameters for this request * @return StreamingTweet * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body * @http.response.details @@ -1704,8 +1692,8 @@ private okhttp3.Call sampleStreamValidateBeforeCall(Set expansions, Set< 0 The request has failed. - */ - public InputStream sampleStream(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes) throws ApiException { - InputStream localVarResp = sampleStreamWithHttpInfo(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + public InputStream sampleStream(StreamQueryParameters streamParameters) throws ApiException { + InputStream localVarResp = sampleStreamWithHttpInfo(streamParameters); return localVarResp; } @@ -1713,14 +1701,14 @@ public InputStream sampleStream(Set expansions, Set tweetFields, * Calls the API using a retry mechanism to handle rate limits errors. * */ - public InputStream sampleStream(Integer retries, Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes) throws ApiException { + public InputStream sampleStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException { InputStream localVarResp; try{ - localVarResp = sampleStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + localVarResp = sampleStream(streamParameters); } catch (ApiException e) { if(handleRateLimit(e, retries)) { - return sampleStream(retries - 1, expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + return sampleStream(retries - 1, streamParameters); } else { throw e; } @@ -1732,13 +1720,7 @@ public InputStream sampleStream(Integer retries, Set expansions, Set expansions, Set 0 The request has failed. - */ - public InputStream sampleStreamWithHttpInfo(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes) throws ApiException { - okhttp3.Call localVarCall = sampleStreamValidateBeforeCall(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes, null); + public InputStream sampleStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException { + okhttp3.Call localVarCall = sampleStreamValidateBeforeCall(streamParameters, null); try { Type localVarReturnType = new TypeToken(){}.getType(); return localVarApiClient.executeStream(localVarCall, localVarReturnType); @@ -1762,13 +1744,7 @@ public InputStream sampleStreamWithHttpInfo(Set expansions, Set /** * Sample stream (asynchronously) * Streams a deterministic 1% of public Tweets. - * @param expansions A comma separated list of fields to expand. (optional) - * @param tweetFields A comma separated list of Tweet fields to display. (optional) - * @param userFields A comma separated list of User fields to display. (optional) - * @param mediaFields A comma separated list of Media fields to display. (optional) - * @param placeFields A comma separated list of Place fields to display. (optional) - * @param pollFields A comma separated list of Poll fields to display. (optional) - * @param backfillMinutes The number of minutes of backfill requested (optional) + * @param streamParameters {@link StreamQueryParameters} contains the parameters for this request * @param _callback The callback to be executed when the API call finishes * @return The request call * @throws ApiException If fail to process the API call, e.g. serializing the request body object @@ -1779,22 +1755,16 @@ public InputStream sampleStreamWithHttpInfo(Set expansions, Set 0 The request has failed. - */ - public okhttp3.Call sampleStreamAsync(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes, final ApiCallback _callback) throws ApiException { + public okhttp3.Call sampleStreamAsync(StreamQueryParameters streamParameters, final ApiCallback _callback) throws ApiException { - okhttp3.Call localVarCall = sampleStreamValidateBeforeCall(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes, _callback); + okhttp3.Call localVarCall = sampleStreamValidateBeforeCall(streamParameters, _callback); Type localVarReturnType = new TypeToken(){}.getType(); localVarApiClient.executeAsync(localVarCall, localVarReturnType, _callback); return localVarCall; } /** * Build call for searchStream - * @param expansions A comma separated list of fields to expand. (optional) - * @param tweetFields A comma separated list of Tweet fields to display. (optional) - * @param userFields A comma separated list of User fields to display. (optional) - * @param mediaFields A comma separated list of Media fields to display. (optional) - * @param placeFields A comma separated list of Place fields to display. (optional) - * @param pollFields A comma separated list of Poll fields to display. (optional) - * @param backfillMinutes The number of minutes of backfill requested (optional) + * @param streamParameters {@link StreamQueryParameters} contains the parameters for this request * @param _callback Callback for upload/download progress * @return Call to execute * @throws ApiException If fail to serialize the request body object @@ -1805,7 +1775,7 @@ public okhttp3.Call sampleStreamAsync(Set expansions, Set tweetF 0 The request has failed. - */ - public okhttp3.Call searchStreamCall(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes, final ApiCallback _callback) throws ApiException { + public okhttp3.Call searchStreamCall(StreamQueryParameters streamParameters, final ApiCallback _callback) throws ApiException { Object localVarPostBody = null; // create path and map variables @@ -1817,32 +1787,32 @@ public okhttp3.Call searchStreamCall(Set expansions, Set tweetFi Map localVarCookieParams = new HashMap(); Map localVarFormParams = new HashMap(); - if (expansions != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "expansions", expansions)); + if (streamParameters.getExpansions() != null && !streamParameters.getExpansions().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "expansions", streamParameters.getExpansions())); } - if (tweetFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "tweet.fields", tweetFields)); + if (streamParameters.getTweetFields() != null && !streamParameters.getTweetFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "tweet.fields", streamParameters.getTweetFields())); } - if (userFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "user.fields", userFields)); + if (streamParameters.getUserFields() != null && !streamParameters.getUserFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "user.fields", streamParameters.getUserFields())); } - if (mediaFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "media.fields", mediaFields)); + if (streamParameters.getMediaFields() != null && !streamParameters.getMediaFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "media.fields", streamParameters.getMediaFields())); } - if (placeFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "place.fields", placeFields)); + if (streamParameters.getPlaceFields() != null && !streamParameters.getPlaceFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "place.fields", streamParameters.getPlaceFields())); } - if (pollFields != null) { - localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "poll.fields", pollFields)); + if (streamParameters.getPollFields() != null && !streamParameters.getPollFields().isEmpty()) { + localVarCollectionQueryParams.addAll(localVarApiClient.parameterToPairs("csv", "poll.fields", streamParameters.getPollFields())); } - if (backfillMinutes != null) { - localVarQueryParams.addAll(localVarApiClient.parameterToPair("backfill_minutes", backfillMinutes)); + if (streamParameters.getBackFillMinutes() != null) { + localVarQueryParams.addAll(localVarApiClient.parameterToPair("backfill_minutes", streamParameters.getBackFillMinutes())); } final String[] localVarAccepts = { @@ -1866,10 +1836,10 @@ public okhttp3.Call searchStreamCall(Set expansions, Set tweetFi } @SuppressWarnings("rawtypes") - private okhttp3.Call searchStreamValidateBeforeCall(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes, final ApiCallback _callback) throws ApiException { + private okhttp3.Call searchStreamValidateBeforeCall(StreamQueryParameters streamParameters, final ApiCallback _callback) throws ApiException { - okhttp3.Call localVarCall = searchStreamCall(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes, _callback); + okhttp3.Call localVarCall = searchStreamCall(streamParameters, _callback); return localVarCall; } @@ -1877,13 +1847,7 @@ private okhttp3.Call searchStreamValidateBeforeCall(Set expansions, Set< /** * Filtered stream * Streams Tweets matching the stream's active rule set. - * @param expansions A comma separated list of fields to expand. (optional) - * @param tweetFields A comma separated list of Tweet fields to display. (optional) - * @param userFields A comma separated list of User fields to display. (optional) - * @param mediaFields A comma separated list of Media fields to display. (optional) - * @param placeFields A comma separated list of Place fields to display. (optional) - * @param pollFields A comma separated list of Poll fields to display. (optional) - * @param backfillMinutes The number of minutes of backfill requested (optional) + * @param streamParameters {@link StreamQueryParameters} contains the parameters for this request * @return FilteredStreamingTweet * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body * @http.response.details @@ -1893,8 +1857,8 @@ private okhttp3.Call searchStreamValidateBeforeCall(Set expansions, Set< 0 The request has failed. - */ - public InputStream searchStream(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes) throws ApiException { - InputStream localVarResp = searchStreamWithHttpInfo(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + public InputStream searchStream(StreamQueryParameters streamParameters) throws ApiException { + InputStream localVarResp = searchStreamWithHttpInfo(streamParameters); return localVarResp; } @@ -1902,14 +1866,14 @@ public InputStream searchStream(Set expansions, Set tweetFields, * Calls the API using a retry mechanism to handle rate limits errors. * */ - public InputStream searchStream(Integer retries, Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes) throws ApiException { + public InputStream searchStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException { InputStream localVarResp; try{ - localVarResp = searchStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + localVarResp = searchStream(streamParameters); } catch (ApiException e) { if(handleRateLimit(e, retries)) { - return searchStream(retries - 1, expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + return searchStream(retries - 1, streamParameters); } else { throw e; } @@ -1921,13 +1885,7 @@ public InputStream searchStream(Integer retries, Set expansions, Set expansions, Set 0 The request has failed. - */ - public InputStream searchStreamWithHttpInfo(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes) throws ApiException { - okhttp3.Call localVarCall = searchStreamValidateBeforeCall(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes, null); + public InputStream searchStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException { + okhttp3.Call localVarCall = searchStreamValidateBeforeCall(streamParameters, null); try { Type localVarReturnType = new TypeToken(){}.getType(); return localVarApiClient.executeStream(localVarCall, localVarReturnType); @@ -1951,13 +1909,7 @@ public InputStream searchStreamWithHttpInfo(Set expansions, Set /** * Filtered stream (asynchronously) * Streams Tweets matching the stream's active rule set. - * @param expansions A comma separated list of fields to expand. (optional) - * @param tweetFields A comma separated list of Tweet fields to display. (optional) - * @param userFields A comma separated list of User fields to display. (optional) - * @param mediaFields A comma separated list of Media fields to display. (optional) - * @param placeFields A comma separated list of Place fields to display. (optional) - * @param pollFields A comma separated list of Poll fields to display. (optional) - * @param backfillMinutes The number of minutes of backfill requested (optional) + * @param streamParameters {@link StreamQueryParameters} contains the parameters for this request * @param _callback The callback to be executed when the API call finishes * @return The request call * @throws ApiException If fail to process the API call, e.g. serializing the request body object @@ -1968,9 +1920,9 @@ public InputStream searchStreamWithHttpInfo(Set expansions, Set 0 The request has failed. - */ - public okhttp3.Call searchStreamAsync(Set expansions, Set tweetFields, Set userFields, Set mediaFields, Set placeFields, Set pollFields, Integer backfillMinutes, final ApiCallback _callback) throws ApiException { + public okhttp3.Call searchStreamAsync(StreamQueryParameters streamParameters, final ApiCallback _callback) throws ApiException { - okhttp3.Call localVarCall = searchStreamValidateBeforeCall(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes, _callback); + okhttp3.Call localVarCall = searchStreamValidateBeforeCall(streamParameters, _callback); Type localVarReturnType = new TypeToken(){}.getType(); localVarApiClient.executeAsync(localVarCall, localVarReturnType, _callback); return localVarCall; diff --git a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java new file mode 100644 index 0000000..028aa3a --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java @@ -0,0 +1,138 @@ +package com.twitter.clientlib.query; + +import com.twitter.clientlib.query.model.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class StreamQueryParameters { + private List tweetFields; + private List expansions; + private List mediaFields; + private List pollFields; + private List userFields; + private List placeFields; + private Integer backFillMinutes; + + private StreamQueryParameters(Builder builder) { + this.tweetFields = builder.tweetFields; + this.expansions = builder.expansions; + this.mediaFields = builder.mediaFields; + this.pollFields = builder.pollFields; + this.userFields = builder.userFields; + this.placeFields = builder.placeFields; + this.backFillMinutes = builder.backFillMinutes; + } + + public List getTweetFields() { + return tweetFields.stream().map(TweetField::getName).collect(Collectors.toList()); + } + + public void setTweetFields(List tweetFields) { + this.tweetFields = tweetFields; + } + + public List getExpansions() { + return expansions.stream().map(Expansion::getName).collect(Collectors.toList()); + } + + public void setExpansions(List expansions) { + this.expansions = expansions; + } + + public List getMediaFields() { + return mediaFields.stream().map(MediaField::getName).collect(Collectors.toList()); + } + + public void setMediaFields(List mediaFields) { + this.mediaFields = mediaFields; + } + + public List getPollFields() { + return pollFields.stream().map(PollField::getName).collect(Collectors.toList()); + } + + public void setPollFields(List pollFields) { + this.pollFields = pollFields; + } + + public List getUserFields() { + return userFields.stream().map(UserField::getName).collect(Collectors.toList()); + } + + public void setUserFields(List userFields) { + this.userFields = userFields; + } + + public List getPlaceFields() { + return placeFields.stream().map(PlaceField::getName).collect(Collectors.toList()); + } + + public void setPlaceFields(List placeFields) { + this.placeFields = placeFields; + } + + public Integer getBackFillMinutes() { + return backFillMinutes; + } + + public void setBackFillMinutes(Integer backFillMinutes) { + this.backFillMinutes = backFillMinutes; + } + + public static class Builder { + private final List tweetFields; + private final List expansions; + private final List mediaFields; + private final List pollFields; + private final List userFields; + private final List placeFields; + private Integer backFillMinutes = 0; + + public Builder() { + this.tweetFields = new ArrayList<>(); + this.expansions = new ArrayList<>(); + this.mediaFields = new ArrayList<>(); + this.pollFields = new ArrayList<>(); + this.userFields = new ArrayList<>(); + this.placeFields = new ArrayList<>(); + } + + public Builder withTweetFields(TweetField... tweetFields) { + this.tweetFields.addAll(Arrays.asList(tweetFields)); + return this; + } + + public Builder withMediaFields(MediaField... mediaFields) { + this.mediaFields.addAll(Arrays.asList(mediaFields)); + return this; + } + + public Builder withPollFields(PollField... pollFields) { + this.pollFields.addAll(Arrays.asList(pollFields)); + return this; + } + + public Builder withUserFields(UserField... userFields) { + this.userFields.addAll(Arrays.asList(userFields)); + return this; + } + + public Builder withPlaceFields(PlaceField... placeFields) { + this.placeFields.addAll(Arrays.asList(placeFields)); + return this; + } + + public Builder withBackfillMinutes(Integer backFillMinutes) { + this.backFillMinutes = backFillMinutes; + return this; + } + + public StreamQueryParameters build() { + return new StreamQueryParameters(this); + } + } +} diff --git a/src/main/java/com/twitter/clientlib/query/model/Expansion.java b/src/main/java/com/twitter/clientlib/query/model/Expansion.java new file mode 100644 index 0000000..be746f4 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/model/Expansion.java @@ -0,0 +1,30 @@ +package com.twitter.clientlib.query.model; + +import java.util.Arrays; +import java.util.List; + +public enum Expansion { + ATTACHMENTS_MEDIA_KEYS("attachments.media_keys"), + ATTACHMENTS_POLL_IDS("attachments.poll_ids"), + AUTHOR_ID("author_id"), + ENTITIES_MENTIONS_USERNAME("entities.mentions.username"), + GEO_PLACE_ID("geo.place_id"), + IN_REPLY_TO_USER_ID("in_reply_to_user_id"), + REFERENCED_TWEETS_ID("referenced_tweets.id"), + REFERENCED_TWEETS_ID_AUTHOR_ID("referenced_tweets.id.author_id"); + + private final String name; + + Expansion(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static List all() { + return Arrays.asList(Expansion.values()); + } + +} diff --git a/src/main/java/com/twitter/clientlib/query/model/MediaField.java b/src/main/java/com/twitter/clientlib/query/model/MediaField.java new file mode 100644 index 0000000..d92863e --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/model/MediaField.java @@ -0,0 +1,32 @@ +package com.twitter.clientlib.query.model; + +import java.util.Arrays; +import java.util.List; + +public enum MediaField { + ALT_TEXT("alt_text"), + DURATION_MS("duration_ms"), + HEIGHT("height"), + MEDIA_KEY("media_key"), + PREVIEW_IMAGE_URL("preview_image_url"), + PUBLIC_METRICS("public_metrics"), + TYPE("type"), + URL("url"), + VARIANTS("variants"), + WIDTH("width"),; + + private final String name; + + MediaField(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static List all() { + return Arrays.asList(MediaField.values()); + } + +} diff --git a/src/main/java/com/twitter/clientlib/query/model/PlaceField.java b/src/main/java/com/twitter/clientlib/query/model/PlaceField.java new file mode 100644 index 0000000..1d439ea --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/model/PlaceField.java @@ -0,0 +1,29 @@ +package com.twitter.clientlib.query.model; + +import java.util.Arrays; +import java.util.List; + +public enum PlaceField { + CONTAINED_WITHIN("contained_within"), + COUNTRY("country"), + COUNTRY_CODE("country_code"), + FULL_NAME("full_name"), + GEO("geo"), + ID("id"), + NAME("name"), + PLACE_TYPE("place_type"); + + private final String name; + + PlaceField(String name) { + this.name = name; + } + + public static List all() { + return Arrays.asList(PlaceField.values()); + } + + public String getName() { + return name; + } +} diff --git a/src/main/java/com/twitter/clientlib/query/model/PollField.java b/src/main/java/com/twitter/clientlib/query/model/PollField.java new file mode 100644 index 0000000..d6a8dfa --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/model/PollField.java @@ -0,0 +1,26 @@ +package com.twitter.clientlib.query.model; + +import java.util.Arrays; +import java.util.List; + +public enum PollField { + DURATION_MINUTES("duration_minutes"), + END_DATETIME("end_datetime"), + ID("id"), + OPTIONS("options"), + VOTING_STATUS("voting_status"); + + private final String name; + + PollField(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static List all() { + return Arrays.asList(PollField.values()); + } +} diff --git a/src/main/java/com/twitter/clientlib/query/model/TweetField.java b/src/main/java/com/twitter/clientlib/query/model/TweetField.java new file mode 100644 index 0000000..88e43e4 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/model/TweetField.java @@ -0,0 +1,39 @@ +package com.twitter.clientlib.query.model; + +import java.util.Arrays; +import java.util.List; + +public enum TweetField { + ATTACHMENTS("attachments"), + AUTHOR_ID("author_id"), + CONTEXT_ANNOTATIONS("context_annotations"), + CONVERSATION_ID("conversation_id"), + CREATED_AT("created_at"), + ENTITIES("entities"), + GEO("geo"), + ID("id"), + IN_REPLY_TO_USER_ID("in_reply_to_user_id"), + LANG("lang"), + POSSIBLE_SENSITIVE("possibly_sensitive"), + PUBLIC_METRICS("public_metrics"), + REFERENCED_TWEETS("referenced_tweets"), + REPLY_SETTINGS("reply_settings"), + SOURCE("source"), + TEXT("text"), + WITHHELD("withheld"); + + private final String name; + + TweetField(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static List all() { + return Arrays.asList(TweetField.values()); + } + +} diff --git a/src/main/java/com/twitter/clientlib/query/model/UserField.java b/src/main/java/com/twitter/clientlib/query/model/UserField.java new file mode 100644 index 0000000..c22fe92 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/query/model/UserField.java @@ -0,0 +1,35 @@ +package com.twitter.clientlib.query.model; + +import java.util.Arrays; +import java.util.List; + +public enum UserField { + CREATED_AT("created_at"), + DESCRIPTION("description"), + ENTITIES("entities"), + ID("id"), + LOCATION("location"), + NAME("name"), + PINNED_TWEET_ID("pinned_tweet_id"), + PROFILE_IMAGE_URL("profile_image_url"), + PROTECTED("protected"), + PUBLIC_METRICS("public_metrics"), + URL("url"), + USERNAME("username"), + VERIFIED("verified"), + WITHHELD("withheld"); + + private final String name; + + UserField(String name) { + this.name = name; + } + + public static List all() { + return Arrays.asList(UserField.values()); + } + + public String getName() { + return name; + } +} From 5eb7ddd30293347bc0f5092f47e853b425370a01 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Wed, 15 Jun 2022 00:15:04 +0300 Subject: [PATCH 02/13] created TwitterStream and moved executor from examples Also, blocking queues were added and started parsing the raw responses to objects in parallel --- .../clientlib/HelloWorldStreaming.java | 65 ++--------- .../stream/TweetsStreamExecutor.java | 106 ++++++++++-------- .../stream}/TweetsStreamListener.java | 2 +- .../clientlib/stream/TwitterStream.java | 52 +++++++++ 4 files changed, 123 insertions(+), 102 deletions(-) rename examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java => src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java (56%) rename {examples/src/main/java/com/twitter/clientlib => src/main/java/com/twitter/clientlib/stream}/TweetsStreamListener.java (95%) create mode 100644 src/main/java/com/twitter/clientlib/stream/TwitterStream.java diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index a91ea01..8208447 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -23,17 +23,11 @@ package com.twitter.clientlib; -import java.util.HashSet; -import java.util.Set; -import java.io.InputStream; - -import com.twitter.clientlib.ApiException; -import com.twitter.clientlib.TwitterCredentialsBearer; -import com.twitter.clientlib.TweetsStreamListenersExecutor; -import com.twitter.clientlib.api.TwitterApi; import com.twitter.clientlib.model.*; import com.twitter.clientlib.query.StreamQueryParameters; import com.twitter.clientlib.query.model.TweetField; +import com.twitter.clientlib.stream.TweetsStreamListener; +import com.twitter.clientlib.stream.TwitterStream; public class HelloWorldStreaming { @@ -46,58 +40,17 @@ public static void main(String[] args) { * to use the right credential object. */ TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); - TwitterApi apiInstance = new TwitterApi(); - apiInstance.setTwitterCredentials(credentials); - - try { - InputStream streamResult = apiInstance.tweets().sampleStream( - new StreamQueryParameters.Builder() - .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) - .build()); - // sampleStream with TweetsStreamListenersExecutor - Responder responder = new Responder(); - TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(streamResult); - tsle.addListener(responder); - tsle.executeListeners(); + TwitterStream twitterStream = new TwitterStream(); + twitterStream.setTwitterCredentials(credentials); + twitterStream.addListener(new Responder()); + twitterStream.sampleStream(new StreamQueryParameters.Builder() + .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) + .build()); -// // Shutdown TweetsStreamListenersExecutor -// try { -// Thread.sleep(20000); -// tsle.shutdown(); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } - -// // An example of how to use the streaming directly using the InputStream result (Without TweetsStreamListenersExecutor) -// try{ -// JSON json = new JSON(); -// Type localVarReturnType = new TypeToken(){}.getType(); -// BufferedReader reader = new BufferedReader(new InputStreamReader(streamResult)); -// String line = reader.readLine(); -// while (line != null) { -// if(line.isEmpty()) { -// System.err.println("==> " + line.isEmpty()); -// line = reader.readLine(); -// continue; -// } -// Object jsonObject = json.getGson().fromJson(line, localVarReturnType); -// System.out.println(jsonObject != null ? jsonObject.toString() : "Null object"); -// line = reader.readLine(); -// } -// }catch (Exception e) { -// e.printStackTrace(); -// System.out.println(e); -// } - } catch (ApiException e) { - System.err.println("Status code: " + e.getCode()); - System.err.println("Reason: " + e.getResponseBody()); - System.err.println("Response headers: " + e.getResponseHeaders()); - e.printStackTrace(); - } } } -class Responder implements com.twitter.clientlib.TweetsStreamListener { +class Responder implements TweetsStreamListener { @Override public void actionOnTweetsStream(StreamingTweet streamingTweet) { if(streamingTweet == null) { diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java similarity index 56% rename from examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java rename to src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 9883263..ac592d5 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -20,36 +20,39 @@ */ -package com.twitter.clientlib; +package com.twitter.clientlib.stream; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; -import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.stream.IntStream; -import com.google.gson.reflect.TypeToken; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.clientlib.model.StreamingTweet; -import com.twitter.clientlib.model.SingleTweetLookupResponse; -public class TweetsStreamListenersExecutor { - private final ITweetsQueue tweetsQueue; - private final List listeners = new ArrayList<>(); - private final InputStream stream; +public class TweetsStreamExecutor { + private volatile BlockingQueue rawTweets; + private volatile BlockingQueue tweets; private volatile boolean isRunning = true; - public TweetsStreamListenersExecutor(InputStream stream) { - this.tweetsQueue = new LinkedListTweetsQueue(); - this.stream = stream; - } + private ExecutorService executorService; + private final List listeners = new ArrayList<>(); + private final InputStream stream; - public TweetsStreamListenersExecutor(ITweetsQueue tweetsQueue, InputStream stream) { - this.tweetsQueue = tweetsQueue; + public TweetsStreamExecutor(InputStream stream) { + this.rawTweets = new LinkedBlockingDeque<>(); + this.tweets = new LinkedBlockingDeque<>(); this.stream = stream; } @@ -57,23 +60,30 @@ public void addListener(TweetsStreamListener toAdd) { listeners.add(toAdd); } - public void executeListeners() { + public void removeListener(TweetsStreamListener toRemove) { + listeners.remove(toRemove); + } + + public void start() { if (stream == null) { System.out.println("Error: stream is null."); return; - } else if (this.tweetsQueue == null) { - System.out.println("Error: tweetsQueue is null."); - return; } - TweetsQueuer tweetsQueuer = new TweetsQueuer(); + RawTweetsQueuer rawTweetsQueuer = new RawTweetsQueuer(); TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor(); + rawTweetsQueuer.start(); + int threads = 5; //TODO parametrize this + executorService = Executors.newFixedThreadPool(threads); + for (int i = 0; i < threads; i++) { + executorService.submit(new ParseTweetsTask()); + } tweetsListenersExecutor.start(); - tweetsQueuer.start(); } public synchronized void shutdown() { isRunning = false; + executorService.shutdown(); System.out.println("TweetsStreamListenersExecutor is shutting down."); } @@ -87,7 +97,7 @@ private void processTweets() { StreamingTweet streamingTweet; try { while (isRunning) { - streamingTweet = tweetsQueue.poll(); + streamingTweet = tweets.poll(); if (streamingTweet == null) { Thread.sleep(100); continue; @@ -102,15 +112,39 @@ private void processTweets() { } } - private class TweetsQueuer extends Thread { + private class ParseTweetsTask implements Runnable { + private final ObjectMapper objectMapper; + private ParseTweetsTask() { + this.objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public void run() { + while (isRunning) { + try { + String rawTweet = rawTweets.take(); + StreamingTweet tweet = objectMapper.readValue(rawTweet, StreamingTweet.class); + tweets.put(tweet); + } catch (InterruptedException e) { + System.out.println("Fail 1"); + } catch (JsonMappingException e) { + System.out.println("Fail 2"); + } catch (JsonProcessingException e) { + System.out.println("Fail 3"); + } + } + } + } + + private class RawTweetsQueuer extends Thread { + @Override public void run() { queueTweets(); } public void queueTweets() { - JSON json = new JSON(); - Type localVarReturnType = new TypeToken() {}.getType(); String line = null; try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { @@ -121,7 +155,7 @@ public void queueTweets() { continue; } try { - tweetsQueue.add(StreamingTweet.fromJson(line)); + rawTweets.put(line); } catch (Exception interExcep) { interExcep.printStackTrace(); } @@ -134,21 +168,3 @@ public void queueTweets() { } } -interface ITweetsQueue { - StreamingTweet poll(); - void add(StreamingTweet streamingTweet); -} - -class LinkedListTweetsQueue implements ITweetsQueue { - private final Queue tweetsQueue = new LinkedList<>(); - - @Override - public StreamingTweet poll() { - return tweetsQueue.poll(); - } - - @Override - public void add(StreamingTweet streamingTweet) { - tweetsQueue.add(streamingTweet); - } -} diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java similarity index 95% rename from examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java rename to src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java index a928b3c..33084cc 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java @@ -20,7 +20,7 @@ */ -package com.twitter.clientlib; +package com.twitter.clientlib.stream; import com.twitter.clientlib.model.StreamingTweet; diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java new file mode 100644 index 0000000..6bc6438 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -0,0 +1,52 @@ +package com.twitter.clientlib.stream; + +import com.twitter.clientlib.ApiClient; +import com.twitter.clientlib.ApiException; +import com.twitter.clientlib.TwitterCredentialsBearer; +import com.twitter.clientlib.api.TweetsApi; +import com.twitter.clientlib.query.StreamQueryParameters; + +import java.io.InputStream; +import java.util.LinkedList; +import java.util.List; + +public class TwitterStream { + private final TweetsApi tweets = new TweetsApi(); + + private final ApiClient apiClient = new ApiClient(); + + private final List listeners = new LinkedList<>(); + + public TwitterStream() { + initBasePath(); + tweets.setClient(apiClient); + } + + public void setTwitterCredentials(TwitterCredentialsBearer credentials) { + apiClient.setTwitterCredentials(credentials); + } + + public void addListener(TweetsStreamListener listener) { + listeners.add(listener); + } + + public void sampleStream(StreamQueryParameters streamParameters) { + try { + InputStream streamResult = tweets.sampleStream(streamParameters); + TweetsStreamExecutor executor = new TweetsStreamExecutor(streamResult); + listeners.forEach(executor::addListener); + executor.start(); + } catch (ApiException e) { + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + } + + private void initBasePath() { + String basePath = System.getenv("TWITTER_API_BASE_PATH"); + apiClient.setBasePath(basePath != null ? basePath : "https://api.twitter.com"); + } + +} From 2d016d317995e7945560ad284fb5323d707e92cf Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Thu, 16 Jun 2022 00:09:59 +0300 Subject: [PATCH 03/13] checking permorfance for different number of threads --- .../clientlib/HelloWorldStreaming.java | 20 ++- .../stream/TweetsStreamExecutor.java | 114 ++++++++++++------ .../clientlib/stream/TwitterStream.java | 2 +- 3 files changed, 96 insertions(+), 40 deletions(-) diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 8208447..b8c3bde 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -25,7 +25,11 @@ import com.twitter.clientlib.model.*; import com.twitter.clientlib.query.StreamQueryParameters; +import com.twitter.clientlib.query.model.MediaField; +import com.twitter.clientlib.query.model.PlaceField; +import com.twitter.clientlib.query.model.PollField; import com.twitter.clientlib.query.model.TweetField; +import com.twitter.clientlib.query.model.UserField; import com.twitter.clientlib.stream.TweetsStreamListener; import com.twitter.clientlib.stream.TwitterStream; @@ -39,20 +43,28 @@ public static void main(String[] args) { * Check the 'security' tag of the required APIs in https://api.twitter.com/2/openapi.json in order * to use the right credential object. */ - TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); + TwitterCredentialsBearer credentials = new TwitterCredentialsBearer("AAAAAAAAAAAAAAAAAAAAAJotdgEAAAAAmXZex2w6l1D6TcwC8KfripI3ADY%3DkKTjcNPxea1aAL8yYXJTdbAX1NkgEJii7SpXetxuP9GXXhSnOa"); TwitterStream twitterStream = new TwitterStream(); twitterStream.setTwitterCredentials(credentials); twitterStream.addListener(new Responder()); +// twitterStream.sampleStream(new StreamQueryParameters.Builder() +// .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) +// .build()); + twitterStream.sampleStream(new StreamQueryParameters.Builder() - .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) - .build()); + .withTweetFields(TweetField.all().toArray(new TweetField[0])) + .withMediaFields(MediaField.all().toArray(new MediaField[0])) + .withUserFields(UserField.all().toArray(new UserField[0])) + .withPollFields(PollField.all().toArray(new PollField[0])) + .withPlaceFields(PlaceField.all().toArray(new PlaceField[0])) + .build()); } } class Responder implements TweetsStreamListener { @Override - public void actionOnTweetsStream(StreamingTweet streamingTweet) { + public synchronized void actionOnTweetsStream(StreamingTweet streamingTweet) { if(streamingTweet == null) { System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); return; diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index ac592d5..6780a10 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -32,13 +32,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; -import java.util.stream.IntStream; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.twitter.clientlib.JSON; import com.twitter.clientlib.model.StreamingTweet; public class TweetsStreamExecutor { @@ -46,7 +47,15 @@ public class TweetsStreamExecutor { private volatile BlockingQueue tweets; private volatile boolean isRunning = true; - private ExecutorService executorService; + private long startTime; + private int tweetsCount = 0; + private final int tweetsLimit = 80000; + private final int threads = 1; //TODO parametrize this + + private ExecutorService rawTweetsQueuerService; + private ExecutorService deserializationService; + private ExecutorService listenersService; + private final List listeners = new ArrayList<>(); private final InputStream stream; @@ -69,51 +78,83 @@ public void start() { System.out.println("Error: stream is null."); return; } + startTime = System.currentTimeMillis(); - RawTweetsQueuer rawTweetsQueuer = new RawTweetsQueuer(); - TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor(); - rawTweetsQueuer.start(); - int threads = 5; //TODO parametrize this - executorService = Executors.newFixedThreadPool(threads); + rawTweetsQueuerService = Executors.newSingleThreadExecutor(); + rawTweetsQueuerService.submit(new RawTweetsQueuer()); + + deserializationService = Executors.newFixedThreadPool(threads); for (int i = 0; i < threads; i++) { - executorService.submit(new ParseTweetsTask()); + deserializationService.submit(new ParseTweetsTask()); } - tweetsListenersExecutor.start(); + + listenersService = Executors.newSingleThreadExecutor(); + listenersService.submit(new TweetsListenersTask()); } public synchronized void shutdown() { isRunning = false; - executorService.shutdown(); + rawTweetsQueuerService.shutdown(); + deserializationService.shutdown(); + listenersService.shutdown(); + try { + if (!rawTweetsQueuerService.awaitTermination(3, TimeUnit.SECONDS)) { + rawTweetsQueuerService.shutdownNow(); + if (!rawTweetsQueuerService.awaitTermination(3, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + if (!deserializationService.awaitTermination(3, TimeUnit.SECONDS)) { + deserializationService.shutdownNow(); + if (!deserializationService.awaitTermination(3, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + if (!listenersService.awaitTermination(3, TimeUnit.SECONDS)) { + listenersService.shutdownNow(); + if (!listenersService.awaitTermination(3, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + rawTweetsQueuerService.shutdown(); + deserializationService.shutdown(); + listenersService.shutdown(); + Thread.currentThread().interrupt(); + } System.out.println("TweetsStreamListenersExecutor is shutting down."); } - private class TweetsListenersExecutor extends Thread { + private class RawTweetsQueuer implements Runnable { + @Override public void run() { - processTweets(); + queueTweets(); } - private void processTweets() { - StreamingTweet streamingTweet; - try { + public void queueTweets() { + + String line = null; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { while (isRunning) { - streamingTweet = tweets.poll(); - if (streamingTweet == null) { - Thread.sleep(100); + line = reader.readLine(); + if(line == null || line.isEmpty()) { + Thread.sleep(10); continue; } - for (TweetsStreamListener listener : listeners) { - listener.actionOnTweetsStream(streamingTweet); + try { + rawTweets.put(line); + } catch (Exception interExcep) { + interExcep.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); + shutdown(); } } } private class ParseTweetsTask implements Runnable { private final ObjectMapper objectMapper; + private ParseTweetsTask() { this.objectMapper = new ObjectMapper(); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -137,32 +178,35 @@ public void run() { } } - private class RawTweetsQueuer extends Thread { - + private class TweetsListenersTask implements Runnable { @Override public void run() { - queueTweets(); + processTweets(); } - public void queueTweets() { - - String line = null; - try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { + private void processTweets() { + StreamingTweet streamingTweet; + try { while (isRunning) { - line = reader.readLine(); - if(line == null || line.isEmpty()) { - Thread.sleep(100); + streamingTweet = tweets.poll(); + if (streamingTweet == null) { + Thread.sleep(10); continue; } - try { - rawTweets.put(line); - } catch (Exception interExcep) { - interExcep.printStackTrace(); + for (TweetsStreamListener listener : listeners) { + listener.actionOnTweetsStream(streamingTweet); + } + tweetsCount++; + if(tweetsCount == tweetsLimit) { + long stopTime = System.currentTimeMillis(); + long durationInMillis = stopTime - startTime; + double seconds = durationInMillis / 1000.0; + System.out.println("Total duration in seconds: " + seconds); + shutdown(); } } } catch (Exception e) { e.printStackTrace(); - shutdown(); } } } diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 6bc6438..199f185 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -45,7 +45,7 @@ public void sampleStream(StreamQueryParameters streamParameters) { } private void initBasePath() { - String basePath = System.getenv("TWITTER_API_BASE_PATH"); + String basePath = "http://localhost:8080"; apiClient.setBasePath(basePath != null ? basePath : "https://api.twitter.com"); } From 09f86fcbb71af304a2052f6d42efd1e68f4ebbe1 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Fri, 17 Jun 2022 21:01:56 +0300 Subject: [PATCH 04/13] minor improvements --- .../stream/TweetsStreamExecutor.java | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 6780a10..a19ce5a 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -39,7 +39,6 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.twitter.clientlib.JSON; import com.twitter.clientlib.model.StreamingTweet; public class TweetsStreamExecutor { @@ -49,15 +48,15 @@ public class TweetsStreamExecutor { private long startTime; private int tweetsCount = 0; - private final int tweetsLimit = 80000; - private final int threads = 1; //TODO parametrize this + private final int tweetsLimit = 250000; + private final int deserializationThreads = 1; //TODO parametrize this private ExecutorService rawTweetsQueuerService; private ExecutorService deserializationService; private ExecutorService listenersService; private final List listeners = new ArrayList<>(); - private final InputStream stream; + private InputStream stream; public TweetsStreamExecutor(InputStream stream) { this.rawTweets = new LinkedBlockingDeque<>(); @@ -83,9 +82,9 @@ public void start() { rawTweetsQueuerService = Executors.newSingleThreadExecutor(); rawTweetsQueuerService.submit(new RawTweetsQueuer()); - deserializationService = Executors.newFixedThreadPool(threads); - for (int i = 0; i < threads; i++) { - deserializationService.submit(new ParseTweetsTask()); + deserializationService = Executors.newFixedThreadPool(deserializationThreads); + for (int i = 0; i < deserializationThreads; i++) { + deserializationService.submit(new DeserializeTweetsTask()); } listenersService = Executors.newSingleThreadExecutor(); @@ -136,7 +135,6 @@ public void queueTweets() { while (isRunning) { line = reader.readLine(); if(line == null || line.isEmpty()) { - Thread.sleep(10); continue; } try { @@ -152,10 +150,10 @@ public void queueTweets() { } } - private class ParseTweetsTask implements Runnable { + private class DeserializeTweetsTask implements Runnable { private final ObjectMapper objectMapper; - private ParseTweetsTask() { + private DeserializeTweetsTask() { this.objectMapper = new ObjectMapper(); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @@ -186,28 +184,27 @@ public void run() { private void processTweets() { StreamingTweet streamingTweet; - try { + while (isRunning) { - streamingTweet = tweets.poll(); - if (streamingTweet == null) { - Thread.sleep(10); - continue; - } - for (TweetsStreamListener listener : listeners) { - listener.actionOnTweetsStream(streamingTweet); - } - tweetsCount++; - if(tweetsCount == tweetsLimit) { - long stopTime = System.currentTimeMillis(); - long durationInMillis = stopTime - startTime; - double seconds = durationInMillis / 1000.0; - System.out.println("Total duration in seconds: " + seconds); - shutdown(); + try { + streamingTweet = tweets.take(); + for (TweetsStreamListener listener : listeners) { + listener.actionOnTweetsStream(streamingTweet); + } + tweetsCount++; + if(tweetsCount == tweetsLimit) { + long stopTime = System.currentTimeMillis(); + long durationInMillis = stopTime - startTime; + double seconds = durationInMillis / 1000.0; + System.out.println("Total duration in seconds: " + seconds); + shutdown(); + } + } catch (InterruptedException e) { + System.out.println("processTweets: Fail 1"); } + } - } catch (Exception e) { - e.printStackTrace(); - } + } } } From bab514d664d3180e4d51b2c53a67af16f91b4ff7 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Fri, 17 Jun 2022 21:34:06 +0300 Subject: [PATCH 05/13] switched from InputStream and BufferedReader to okio BufferedSource --- .../java/com/twitter/clientlib/ApiClient.java | 6 ++-- .../com/twitter/clientlib/api/TweetsApi.java | 29 +++++++++---------- .../stream/TweetsStreamExecutor.java | 9 +++--- .../clientlib/stream/TwitterStream.java | 3 +- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/twitter/clientlib/ApiClient.java b/src/main/java/com/twitter/clientlib/ApiClient.java index 771e9cd..88f037f 100644 --- a/src/main/java/com/twitter/clientlib/ApiClient.java +++ b/src/main/java/com/twitter/clientlib/ApiClient.java @@ -29,6 +29,7 @@ import okhttp3.logging.HttpLoggingInterceptor.Level; import okio.Buffer; import okio.BufferedSink; +import okio.BufferedSource; import okio.Okio; import org.apache.oltu.oauth2.client.request.OAuthClientRequest.TokenRequestBuilder; import org.apache.oltu.oauth2.common.message.types.GrantType; @@ -1099,11 +1100,10 @@ public ApiResponse execute(Call call, Type returnType) throws ApiExceptio *

Execute stream.

* * @param call a {@link okhttp3.Call} object - * @param returnType a {@link java.lang.reflect.Type} object * @return a {@link java.io.InputStream} object * @throws com.twitter.clientlib.ApiException if any. */ - public InputStream executeStream(Call call, Type returnType) throws ApiException { + public BufferedSource executeStream(Call call) throws ApiException { try { Response response = call.execute(); if (!response.isSuccessful()) { @@ -1112,7 +1112,7 @@ public InputStream executeStream(Call call, Type returnType) throws ApiException if (response.body() == null) { return null; } - return response.body().byteStream(); + return response.body().source(); } catch (IOException e) { throw new ApiException(e); } diff --git a/src/main/java/com/twitter/clientlib/api/TweetsApi.java b/src/main/java/com/twitter/clientlib/api/TweetsApi.java index 78e9f3a..74a7cae 100644 --- a/src/main/java/com/twitter/clientlib/api/TweetsApi.java +++ b/src/main/java/com/twitter/clientlib/api/TweetsApi.java @@ -78,6 +78,7 @@ import java.io.InputStream; import javax.ws.rs.core.GenericType; +import okio.BufferedSource; import org.apache.commons.lang3.StringUtils; public class TweetsApi extends ApiCommon { @@ -1692,17 +1693,16 @@ private okhttp3.Call sampleStreamValidateBeforeCall(StreamQueryParameters stream 0 The request has failed. - */ - public InputStream sampleStream(StreamQueryParameters streamParameters) throws ApiException { - InputStream localVarResp = sampleStreamWithHttpInfo(streamParameters); - return localVarResp; + public BufferedSource sampleStream(StreamQueryParameters streamParameters) throws ApiException { + return sampleStreamWithHttpInfo(streamParameters); } /** * Calls the API using a retry mechanism to handle rate limits errors. * */ - public InputStream sampleStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException { - InputStream localVarResp; + public BufferedSource sampleStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException { + BufferedSource localVarResp; try{ localVarResp = sampleStream(streamParameters); } @@ -1730,11 +1730,10 @@ public InputStream sampleStream(Integer retries, StreamQueryParameters streamPar 0 The request has failed. - */ - public InputStream sampleStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException { + public BufferedSource sampleStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException { okhttp3.Call localVarCall = sampleStreamValidateBeforeCall(streamParameters, null); try { - Type localVarReturnType = new TypeToken(){}.getType(); - return localVarApiClient.executeStream(localVarCall, localVarReturnType); + return localVarApiClient.executeStream(localVarCall); } catch (ApiException e) { e.setErrorObject(localVarApiClient.getJSON().getGson().fromJson(e.getResponseBody(), new TypeToken(){}.getType())); throw e; @@ -1857,17 +1856,16 @@ private okhttp3.Call searchStreamValidateBeforeCall(StreamQueryParameters stream 0 The request has failed. - */ - public InputStream searchStream(StreamQueryParameters streamParameters) throws ApiException { - InputStream localVarResp = searchStreamWithHttpInfo(streamParameters); - return localVarResp; + public BufferedSource searchStream(StreamQueryParameters streamParameters) throws ApiException { + return searchStreamWithHttpInfo(streamParameters); } /** * Calls the API using a retry mechanism to handle rate limits errors. * */ - public InputStream searchStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException { - InputStream localVarResp; + public BufferedSource searchStream(Integer retries, StreamQueryParameters streamParameters) throws ApiException { + BufferedSource localVarResp; try{ localVarResp = searchStream(streamParameters); } @@ -1895,11 +1893,10 @@ public InputStream searchStream(Integer retries, StreamQueryParameters streamPar 0 The request has failed. - */ - public InputStream searchStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException { + public BufferedSource searchStreamWithHttpInfo(StreamQueryParameters streamParameters) throws ApiException { okhttp3.Call localVarCall = searchStreamValidateBeforeCall(streamParameters, null); try { - Type localVarReturnType = new TypeToken(){}.getType(); - return localVarApiClient.executeStream(localVarCall, localVarReturnType); + return localVarApiClient.executeStream(localVarCall); } catch (ApiException e) { e.setErrorObject(localVarApiClient.getJSON().getGson().fromJson(e.getResponseBody(), new TypeToken(){}.getType())); throw e; diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index a19ce5a..60eccc8 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -40,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.clientlib.model.StreamingTweet; +import okio.BufferedSource; public class TweetsStreamExecutor { private volatile BlockingQueue rawTweets; @@ -56,9 +57,9 @@ public class TweetsStreamExecutor { private ExecutorService listenersService; private final List listeners = new ArrayList<>(); - private InputStream stream; + private BufferedSource stream; - public TweetsStreamExecutor(InputStream stream) { + public TweetsStreamExecutor(BufferedSource stream) { this.rawTweets = new LinkedBlockingDeque<>(); this.tweets = new LinkedBlockingDeque<>(); this.stream = stream; @@ -131,9 +132,9 @@ public void run() { public void queueTweets() { String line = null; - try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { + try { while (isRunning) { - line = reader.readLine(); + line = stream.readUtf8Line(); if(line == null || line.isEmpty()) { continue; } diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 199f185..5455ef0 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -5,6 +5,7 @@ import com.twitter.clientlib.TwitterCredentialsBearer; import com.twitter.clientlib.api.TweetsApi; import com.twitter.clientlib.query.StreamQueryParameters; +import okio.BufferedSource; import java.io.InputStream; import java.util.LinkedList; @@ -32,7 +33,7 @@ public void addListener(TweetsStreamListener listener) { public void sampleStream(StreamQueryParameters streamParameters) { try { - InputStream streamResult = tweets.sampleStream(streamParameters); + BufferedSource streamResult = tweets.sampleStream(streamParameters); TweetsStreamExecutor executor = new TweetsStreamExecutor(streamResult); listeners.forEach(executor::addListener); executor.start(); From 9d28fe1597f0b04585ea7d6b9df94f64e3a1a40b Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Fri, 17 Jun 2022 21:53:13 +0300 Subject: [PATCH 06/13] reduced duplication for closing executor service and switched to single thread deserialization layer --- .../clientlib/HelloWorldStreaming.java | 13 +++-- .../query/StreamQueryParameters.java | 25 +++++++++ .../stream/TweetsStreamExecutor.java | 55 ++++++++----------- 3 files changed, 56 insertions(+), 37 deletions(-) diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index b8c3bde..8f0ca48 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -43,20 +43,21 @@ public static void main(String[] args) { * Check the 'security' tag of the required APIs in https://api.twitter.com/2/openapi.json in order * to use the right credential object. */ - TwitterCredentialsBearer credentials = new TwitterCredentialsBearer("AAAAAAAAAAAAAAAAAAAAAJotdgEAAAAAmXZex2w6l1D6TcwC8KfripI3ADY%3DkKTjcNPxea1aAL8yYXJTdbAX1NkgEJii7SpXetxuP9GXXhSnOa"); + TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); TwitterStream twitterStream = new TwitterStream(); twitterStream.setTwitterCredentials(credentials); twitterStream.addListener(new Responder()); + // twitterStream.sampleStream(new StreamQueryParameters.Builder() // .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) // .build()); twitterStream.sampleStream(new StreamQueryParameters.Builder() - .withTweetFields(TweetField.all().toArray(new TweetField[0])) - .withMediaFields(MediaField.all().toArray(new MediaField[0])) - .withUserFields(UserField.all().toArray(new UserField[0])) - .withPollFields(PollField.all().toArray(new PollField[0])) - .withPlaceFields(PlaceField.all().toArray(new PlaceField[0])) + .withTweetFields(TweetField.all()) + .withMediaFields(MediaField.all()) + .withUserFields(UserField.all()) + .withPollFields(PollField.all()) + .withPlaceFields(PlaceField.all()) .build()); } diff --git a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java index 028aa3a..5c8f842 100644 --- a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java +++ b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java @@ -106,26 +106,51 @@ public Builder withTweetFields(TweetField... tweetFields) { return this; } + public Builder withTweetFields(List tweetFields) { + this.tweetFields.addAll(tweetFields); + return this; + } + public Builder withMediaFields(MediaField... mediaFields) { this.mediaFields.addAll(Arrays.asList(mediaFields)); return this; } + public Builder withMediaFields(List mediaFields) { + this.mediaFields.addAll(mediaFields); + return this; + } + public Builder withPollFields(PollField... pollFields) { this.pollFields.addAll(Arrays.asList(pollFields)); return this; } + public Builder withPollFields(List pollFields) { + this.pollFields.addAll(pollFields); + return this; + } + public Builder withUserFields(UserField... userFields) { this.userFields.addAll(Arrays.asList(userFields)); return this; } + public Builder withUserFields(List userFields) { + this.userFields.addAll(userFields); + return this; + } + public Builder withPlaceFields(PlaceField... placeFields) { this.placeFields.addAll(Arrays.asList(placeFields)); return this; } + public Builder withPlaceFields(List placeFields) { + this.placeFields.addAll(placeFields); + return this; + } + public Builder withBackfillMinutes(Integer backFillMinutes) { this.backFillMinutes = backFillMinutes; return this; diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 60eccc8..2d0046e 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -23,9 +23,6 @@ package com.twitter.clientlib.stream; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -50,8 +47,6 @@ public class TweetsStreamExecutor { private long startTime; private int tweetsCount = 0; private final int tweetsLimit = 250000; - private final int deserializationThreads = 1; //TODO parametrize this - private ExecutorService rawTweetsQueuerService; private ExecutorService deserializationService; private ExecutorService listenersService; @@ -83,10 +78,8 @@ public void start() { rawTweetsQueuerService = Executors.newSingleThreadExecutor(); rawTweetsQueuerService.submit(new RawTweetsQueuer()); - deserializationService = Executors.newFixedThreadPool(deserializationThreads); - for (int i = 0; i < deserializationThreads; i++) { - deserializationService.submit(new DeserializeTweetsTask()); - } + deserializationService = Executors.newSingleThreadExecutor(); + deserializationService.submit(new DeserializeTweetsTask()); listenersService = Executors.newSingleThreadExecutor(); listenersService.submit(new TweetsListenersTask()); @@ -94,34 +87,35 @@ public void start() { public synchronized void shutdown() { isRunning = false; - rawTweetsQueuerService.shutdown(); - deserializationService.shutdown(); - listenersService.shutdown(); + shutDownServices(); try { - if (!rawTweetsQueuerService.awaitTermination(3, TimeUnit.SECONDS)) { - rawTweetsQueuerService.shutdownNow(); - if (!rawTweetsQueuerService.awaitTermination(3, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - if (!deserializationService.awaitTermination(3, TimeUnit.SECONDS)) { - deserializationService.shutdownNow(); - if (!deserializationService.awaitTermination(3, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - if (!listenersService.awaitTermination(3, TimeUnit.SECONDS)) { - listenersService.shutdownNow(); - if (!listenersService.awaitTermination(3, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } + terminateServices(); } catch (InterruptedException ie) { - rawTweetsQueuerService.shutdown(); - deserializationService.shutdown(); - listenersService.shutdown(); + shutDownServices(); Thread.currentThread().interrupt(); } System.out.println("TweetsStreamListenersExecutor is shutting down."); } + private void shutDownServices() { + rawTweetsQueuerService.shutdown(); + deserializationService.shutdown(); + listenersService.shutdown(); + } + + private void terminateServices() throws InterruptedException { + terminateService(rawTweetsQueuerService); + terminateService(deserializationService); + terminateService(listenersService); + } + private void terminateService(ExecutorService executorService) throws InterruptedException { + if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } + private class RawTweetsQueuer implements Runnable { @Override @@ -130,7 +124,6 @@ public void run() { } public void queueTweets() { - String line = null; try { while (isRunning) { From 39e326193ac3ff21e822f922478ad41816b09169 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Sat, 18 Jun 2022 00:55:07 +0300 Subject: [PATCH 07/13] added disconnect functionality when stream is empty for 20 secs and some exception handling --- .../clientlib/HelloWorldStreaming.java | 2 +- .../com/twitter/clientlib/api/TweetsApi.java | 14 +----- .../exceptions/AuthenticationException.java | 12 +++++ .../EmptyStreamTimeoutException.java | 7 +++ .../stream/TweetsStreamExecutor.java | 47 ++++++++++++++----- .../stream/TweetsStreamListener.java | 2 +- .../clientlib/stream/TwitterStream.java | 11 ++--- 7 files changed, 61 insertions(+), 34 deletions(-) create mode 100644 src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java create mode 100644 src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 8f0ca48..fac3cc3 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -65,7 +65,7 @@ public static void main(String[] args) { class Responder implements TweetsStreamListener { @Override - public synchronized void actionOnTweetsStream(StreamingTweet streamingTweet) { + public synchronized void onTweetArrival(StreamingTweet streamingTweet) { if(streamingTweet == null) { System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); return; diff --git a/src/main/java/com/twitter/clientlib/api/TweetsApi.java b/src/main/java/com/twitter/clientlib/api/TweetsApi.java index 74a7cae..0fb11dd 100644 --- a/src/main/java/com/twitter/clientlib/api/TweetsApi.java +++ b/src/main/java/com/twitter/clientlib/api/TweetsApi.java @@ -23,25 +23,16 @@ package com.twitter.clientlib.api; import com.twitter.clientlib.ApiCallback; -import com.twitter.clientlib.ApiClient; -import com.twitter.clientlib.auth.*; import com.twitter.clientlib.ApiException; import com.twitter.clientlib.ApiResponse; -import com.twitter.clientlib.Configuration; import com.twitter.clientlib.Pair; -import com.twitter.clientlib.ProgressRequestBody; -import com.twitter.clientlib.ProgressResponseBody; -import com.github.scribejava.core.model.OAuth2AccessToken; import com.google.gson.reflect.TypeToken; -import java.io.IOException; - import com.twitter.clientlib.model.AddOrDeleteRulesRequest; import com.twitter.clientlib.model.AddOrDeleteRulesResponse; import com.twitter.clientlib.model.CreateTweetRequest; -import com.twitter.clientlib.model.Error; import com.twitter.clientlib.model.FilteredStreamingTweet; import com.twitter.clientlib.model.GenericTweetsTimelineResponse; import com.twitter.clientlib.model.GetRulesResponse; @@ -52,7 +43,7 @@ import com.twitter.clientlib.model.MultiTweetLookupResponse; import com.twitter.clientlib.model.MultiUserLookupResponse; import java.time.OffsetDateTime; -import com.twitter.clientlib.model.Problem; + import com.twitter.clientlib.model.QuoteTweetLookupResponse; import com.twitter.clientlib.model.SingleTweetLookupResponse; import com.twitter.clientlib.model.StreamingTweet; @@ -75,11 +66,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.io.InputStream; -import javax.ws.rs.core.GenericType; import okio.BufferedSource; -import org.apache.commons.lang3.StringUtils; public class TweetsApi extends ApiCommon { diff --git a/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java b/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java new file mode 100644 index 0000000..beafac5 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java @@ -0,0 +1,12 @@ +package com.twitter.clientlib.exceptions; + +public class AuthenticationException extends RuntimeException { + + public AuthenticationException(String message) { + super(message); + } + + public AuthenticationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java b/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java new file mode 100644 index 0000000..96a2164 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java @@ -0,0 +1,7 @@ +package com.twitter.clientlib.exceptions; + +public class EmptyStreamTimeoutException extends RuntimeException { + public EmptyStreamTimeoutException(String message) { + super(message); + } +} diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 2d0046e..31ec7da 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -23,6 +23,7 @@ package com.twitter.clientlib.stream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -36,10 +37,14 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.twitter.clientlib.exceptions.EmptyStreamTimeoutException; import com.twitter.clientlib.model.StreamingTweet; import okio.BufferedSource; public class TweetsStreamExecutor { + + private static final long EMPTY_STREAM_TIMEOUT = 20000; + private static final int POLL_WAIT = 5; private volatile BlockingQueue rawTweets; private volatile BlockingQueue tweets; private volatile boolean isRunning = true; @@ -90,9 +95,12 @@ public synchronized void shutdown() { shutDownServices(); try { terminateServices(); + stream.close(); } catch (InterruptedException ie) { shutDownServices(); Thread.currentThread().interrupt(); + } catch (IOException e) { + } System.out.println("TweetsStreamListenersExecutor is shutting down."); } @@ -109,9 +117,9 @@ private void terminateServices() throws InterruptedException { terminateService(listenersService); } private void terminateService(ExecutorService executorService) throws InterruptedException { - if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) { + if (!executorService.awaitTermination(1500, TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); - if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) + if (!executorService.awaitTermination(1500, TimeUnit.MILLISECONDS)) System.err.println("Pool did not terminate"); } } @@ -126,19 +134,32 @@ public void run() { public void queueTweets() { String line = null; try { + boolean emptyResponse = false; + long firstEmptyResponseMillis = 0; + long lastEmptyReponseMillis; while (isRunning) { line = stream.readUtf8Line(); if(line == null || line.isEmpty()) { + if(!emptyResponse) { + firstEmptyResponseMillis = System.currentTimeMillis(); + emptyResponse = true; + } else { + lastEmptyReponseMillis = System.currentTimeMillis(); + if(lastEmptyReponseMillis - firstEmptyResponseMillis > EMPTY_STREAM_TIMEOUT) { + throw new EmptyStreamTimeoutException(String.format("Stream was empty for %d seconds consecutively", EMPTY_STREAM_TIMEOUT)); + } + } continue; } + emptyResponse = false; try { rawTweets.put(line); - } catch (Exception interExcep) { - interExcep.printStackTrace(); + } catch (Exception ignore) { + } } } catch (Exception e) { - e.printStackTrace(); + System.out.println("Something went wrong. Closing stream... " + e.getMessage()); shutdown(); } } @@ -156,15 +177,14 @@ private DeserializeTweetsTask() { public void run() { while (isRunning) { try { - String rawTweet = rawTweets.take(); + String rawTweet = rawTweets.poll(POLL_WAIT, TimeUnit.MILLISECONDS); + if (rawTweet == null) continue; StreamingTweet tweet = objectMapper.readValue(rawTweet, StreamingTweet.class); tweets.put(tweet); } catch (InterruptedException e) { - System.out.println("Fail 1"); - } catch (JsonMappingException e) { - System.out.println("Fail 2"); + } catch (JsonProcessingException e) { - System.out.println("Fail 3"); + System.out.println("debug log here"); } } } @@ -181,9 +201,10 @@ private void processTweets() { while (isRunning) { try { - streamingTweet = tweets.take(); + streamingTweet = tweets.poll(POLL_WAIT, TimeUnit.MILLISECONDS); + if(streamingTweet == null) continue; for (TweetsStreamListener listener : listeners) { - listener.actionOnTweetsStream(streamingTweet); + listener.onTweetArrival(streamingTweet); } tweetsCount++; if(tweetsCount == tweetsLimit) { @@ -194,7 +215,7 @@ private void processTweets() { shutdown(); } } catch (InterruptedException e) { - System.out.println("processTweets: Fail 1"); + } } diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java index 33084cc..b042008 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java @@ -25,5 +25,5 @@ import com.twitter.clientlib.model.StreamingTweet; public interface TweetsStreamListener { - void actionOnTweetsStream(StreamingTweet streamingTweet); + void onTweetArrival(StreamingTweet streamingTweet); } diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 5455ef0..700a734 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -4,10 +4,10 @@ import com.twitter.clientlib.ApiException; import com.twitter.clientlib.TwitterCredentialsBearer; import com.twitter.clientlib.api.TweetsApi; +import com.twitter.clientlib.exceptions.AuthenticationException; import com.twitter.clientlib.query.StreamQueryParameters; import okio.BufferedSource; -import java.io.InputStream; import java.util.LinkedList; import java.util.List; @@ -38,15 +38,14 @@ public void sampleStream(StreamQueryParameters streamParameters) { listeners.forEach(executor::addListener); executor.start(); } catch (ApiException e) { - System.err.println("Status code: " + e.getCode()); - System.err.println("Reason: " + e.getResponseBody()); - System.err.println("Response headers: " + e.getResponseHeaders()); - e.printStackTrace(); + if(e.getCode() == 401) { + throw new AuthenticationException("Not authenticated. Please check the credentials", e); + } } } private void initBasePath() { - String basePath = "http://localhost:8080"; + String basePath = System.getenv("TWITTER_API_BASE_PATH"); apiClient.setBasePath(basePath != null ? basePath : "https://api.twitter.com"); } From f7f5c2d73219c1d4ef23376125a948edb7b820e7 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Sat, 18 Jun 2022 14:26:33 +0300 Subject: [PATCH 08/13] adding logging instead of system.out or .err --- examples/pom.xml | 5 ++++ .../clientlib/HelloWorldStreaming.java | 16 ++---------- .../clientlib/exceptions/StreamException.java | 12 +++++++++ .../stream/TweetsStreamExecutor.java | 25 +++++++++++++------ .../clientlib/stream/TwitterStream.java | 16 +++++++++++- .../twitter/clientlib/api/TweetsApiTest.java | 20 +++------------ 6 files changed, 55 insertions(+), 39 deletions(-) create mode 100644 src/main/java/com/twitter/clientlib/exceptions/StreamException.java diff --git a/examples/pom.xml b/examples/pom.xml index 812c90e..5a967e4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -19,5 +19,10 @@ 1.2.4 compile + + org.slf4j + slf4j-simple + 1.7.5 + diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index fac3cc3..279caed 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -25,11 +25,7 @@ import com.twitter.clientlib.model.*; import com.twitter.clientlib.query.StreamQueryParameters; -import com.twitter.clientlib.query.model.MediaField; -import com.twitter.clientlib.query.model.PlaceField; -import com.twitter.clientlib.query.model.PollField; import com.twitter.clientlib.query.model.TweetField; -import com.twitter.clientlib.query.model.UserField; import com.twitter.clientlib.stream.TweetsStreamListener; import com.twitter.clientlib.stream.TwitterStream; @@ -48,17 +44,9 @@ public static void main(String[] args) { twitterStream.setTwitterCredentials(credentials); twitterStream.addListener(new Responder()); -// twitterStream.sampleStream(new StreamQueryParameters.Builder() -// .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) -// .build()); - twitterStream.sampleStream(new StreamQueryParameters.Builder() - .withTweetFields(TweetField.all()) - .withMediaFields(MediaField.all()) - .withUserFields(UserField.all()) - .withPollFields(PollField.all()) - .withPlaceFields(PlaceField.all()) - .build()); + .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) + .build()); } } diff --git a/src/main/java/com/twitter/clientlib/exceptions/StreamException.java b/src/main/java/com/twitter/clientlib/exceptions/StreamException.java new file mode 100644 index 0000000..7e9d91d --- /dev/null +++ b/src/main/java/com/twitter/clientlib/exceptions/StreamException.java @@ -0,0 +1,12 @@ +package com.twitter.clientlib.exceptions; + +public class StreamException extends RuntimeException{ + + public StreamException(String message) { + super(message); + } + + public StreamException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 31ec7da..4afd66c 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -34,15 +34,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.clientlib.exceptions.EmptyStreamTimeoutException; import com.twitter.clientlib.model.StreamingTweet; import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TweetsStreamExecutor { + private static final Logger logger = LoggerFactory.getLogger(TweetsStreamExecutor.class); + private static final long EMPTY_STREAM_TIMEOUT = 20000; private static final int POLL_WAIT = 5; private volatile BlockingQueue rawTweets; @@ -75,7 +78,7 @@ public void removeListener(TweetsStreamListener toRemove) { public void start() { if (stream == null) { - System.out.println("Error: stream is null."); + logger.error("Stream is null. Exiting..."); return; } startTime = System.currentTimeMillis(); @@ -91,6 +94,7 @@ public void start() { } public synchronized void shutdown() { + logger.info("TweetsStreamListenersExecutor is shutting down."); isRunning = false; shutDownServices(); try { @@ -102,7 +106,6 @@ public synchronized void shutdown() { } catch (IOException e) { } - System.out.println("TweetsStreamListenersExecutor is shutting down."); } private void shutDownServices() { @@ -120,12 +123,14 @@ private void terminateService(ExecutorService executorService) throws Interrupte if (!executorService.awaitTermination(1500, TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); if (!executorService.awaitTermination(1500, TimeUnit.MILLISECONDS)) - System.err.println("Pool did not terminate"); + logger.error("Thread pool did not terminate"); } } private class RawTweetsQueuer implements Runnable { + private final Logger logger = LoggerFactory.getLogger(RawTweetsQueuer.class); + @Override public void run() { queueTweets(); @@ -159,13 +164,15 @@ public void queueTweets() { } } } catch (Exception e) { - System.out.println("Something went wrong. Closing stream... " + e.getMessage()); + logger.error("Something went wrong. Closing stream... {}", e.getMessage()); shutdown(); } } } private class DeserializeTweetsTask implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(DeserializeTweetsTask.class); private final ObjectMapper objectMapper; private DeserializeTweetsTask() { @@ -181,16 +188,18 @@ public void run() { if (rawTweet == null) continue; StreamingTweet tweet = objectMapper.readValue(rawTweet, StreamingTweet.class); tweets.put(tweet); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { } catch (JsonProcessingException e) { - System.out.println("debug log here"); + logger.debug("Json could not be parsed"); } } } } private class TweetsListenersTask implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(TweetsListenersTask.class); @Override public void run() { processTweets(); @@ -211,7 +220,7 @@ private void processTweets() { long stopTime = System.currentTimeMillis(); long durationInMillis = stopTime - startTime; double seconds = durationInMillis / 1000.0; - System.out.println("Total duration in seconds: " + seconds); + logger.info("Total duration in seconds: {}", seconds); shutdown(); } } catch (InterruptedException e) { diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 700a734..01157f0 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -5,19 +5,27 @@ import com.twitter.clientlib.TwitterCredentialsBearer; import com.twitter.clientlib.api.TweetsApi; import com.twitter.clientlib.exceptions.AuthenticationException; +import com.twitter.clientlib.exceptions.StreamException; import com.twitter.clientlib.query.StreamQueryParameters; + import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.List; public class TwitterStream { + + private static final Logger logger = LoggerFactory.getLogger(TwitterStream.class); private final TweetsApi tweets = new TweetsApi(); private final ApiClient apiClient = new ApiClient(); private final List listeners = new LinkedList<>(); + private TweetsStreamExecutor executor; + public TwitterStream() { initBasePath(); tweets.setClient(apiClient); @@ -34,16 +42,22 @@ public void addListener(TweetsStreamListener listener) { public void sampleStream(StreamQueryParameters streamParameters) { try { BufferedSource streamResult = tweets.sampleStream(streamParameters); - TweetsStreamExecutor executor = new TweetsStreamExecutor(streamResult); + executor = new TweetsStreamExecutor(streamResult); listeners.forEach(executor::addListener); executor.start(); } catch (ApiException e) { if(e.getCode() == 401) { + logger.error("Authentication didn't work"); throw new AuthenticationException("Not authenticated. Please check the credentials", e); } + throw new StreamException("An exception occurred during stream execution", e); } } + public void shutdown() { + executor.shutdown(); + } + private void initBasePath() { String basePath = System.getenv("TWITTER_API_BASE_PATH"); apiClient.setBasePath(basePath != null ? basePath : "https://api.twitter.com"); diff --git a/src/test/java/com/twitter/clientlib/api/TweetsApiTest.java b/src/test/java/com/twitter/clientlib/api/TweetsApiTest.java index 55940aa..3ac6548 100644 --- a/src/test/java/com/twitter/clientlib/api/TweetsApiTest.java +++ b/src/test/java/com/twitter/clientlib/api/TweetsApiTest.java @@ -54,6 +54,8 @@ import com.twitter.clientlib.model.UsersRetweetsCreateRequest; import com.twitter.clientlib.model.UsersRetweetsCreateResponse; import com.twitter.clientlib.model.UsersRetweetsDeleteResponse; +import com.twitter.clientlib.query.StreamQueryParameters; +import okio.BufferedSource; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -248,14 +250,7 @@ public void listsIdTweetsTest() throws ApiException { */ @Test public void sampleStreamTest() throws ApiException { - Set expansions = null; - Set tweetFields = null; - Set userFields = null; - Set mediaFields = null; - Set placeFields = null; - Set pollFields = null; - Integer backfillMinutes = null; - InputStream response = apiInstance.tweets().sampleStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + BufferedSource response = apiInstance.tweets().sampleStream(new StreamQueryParameters.Builder().build()); // TODO: test validations } @@ -269,14 +264,7 @@ public void sampleStreamTest() throws ApiException { */ @Test public void searchStreamTest() throws ApiException { - Set expansions = null; - Set tweetFields = null; - Set userFields = null; - Set mediaFields = null; - Set placeFields = null; - Set pollFields = null; - Integer backfillMinutes = null; - InputStream response = apiInstance.tweets().searchStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + BufferedSource response = apiInstance.tweets().searchStream(new StreamQueryParameters.Builder().build()); // TODO: test validations } From 00bfeeaa1b5d5f5d67ee1dbbc9d16d1cf7d008cd Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Sat, 18 Jun 2022 18:27:38 +0300 Subject: [PATCH 09/13] added tests and licenses --- .../exceptions/AuthenticationException.java | 17 +++ .../EmptyStreamTimeoutException.java | 18 +++ .../clientlib/exceptions/StreamException.java | 18 +++ .../exceptions/TooManyRequestsException.java | 30 +++++ .../query/StreamQueryParameters.java | 18 +++ .../clientlib/query/model/Expansion.java | 18 +++ .../clientlib/query/model/MediaField.java | 18 +++ .../clientlib/query/model/PlaceField.java | 18 +++ .../clientlib/query/model/PollField.java | 17 +++ .../clientlib/query/model/TweetField.java | 17 +++ .../clientlib/query/model/UserField.java | 17 +++ .../stream/TweetsStreamExecutor.java | 3 - .../stream/TweetsStreamListener.java | 3 - .../clientlib/stream/TwitterStream.java | 37 +++++- .../clientlib/stream/TwitterStreamTest.java | 116 ++++++++++++++++++ 15 files changed, 358 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/twitter/clientlib/exceptions/TooManyRequestsException.java create mode 100644 src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java diff --git a/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java b/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java index beafac5..11f0313 100644 --- a/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java +++ b/src/main/java/com/twitter/clientlib/exceptions/AuthenticationException.java @@ -1,3 +1,20 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ package com.twitter.clientlib.exceptions; public class AuthenticationException extends RuntimeException { diff --git a/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java b/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java index 96a2164..bf422c3 100644 --- a/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java +++ b/src/main/java/com/twitter/clientlib/exceptions/EmptyStreamTimeoutException.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.exceptions; public class EmptyStreamTimeoutException extends RuntimeException { diff --git a/src/main/java/com/twitter/clientlib/exceptions/StreamException.java b/src/main/java/com/twitter/clientlib/exceptions/StreamException.java index 7e9d91d..f921dd8 100644 --- a/src/main/java/com/twitter/clientlib/exceptions/StreamException.java +++ b/src/main/java/com/twitter/clientlib/exceptions/StreamException.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.exceptions; public class StreamException extends RuntimeException{ diff --git a/src/main/java/com/twitter/clientlib/exceptions/TooManyRequestsException.java b/src/main/java/com/twitter/clientlib/exceptions/TooManyRequestsException.java new file mode 100644 index 0000000..705d4b0 --- /dev/null +++ b/src/main/java/com/twitter/clientlib/exceptions/TooManyRequestsException.java @@ -0,0 +1,30 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +package com.twitter.clientlib.exceptions; + +public class TooManyRequestsException extends RuntimeException{ + + public TooManyRequestsException(String message) { + super(message); + } + + public TooManyRequestsException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java index 5c8f842..902f17b 100644 --- a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java +++ b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.query; import com.twitter.clientlib.query.model.*; diff --git a/src/main/java/com/twitter/clientlib/query/model/Expansion.java b/src/main/java/com/twitter/clientlib/query/model/Expansion.java index be746f4..e7c0fa8 100644 --- a/src/main/java/com/twitter/clientlib/query/model/Expansion.java +++ b/src/main/java/com/twitter/clientlib/query/model/Expansion.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.query.model; import java.util.Arrays; diff --git a/src/main/java/com/twitter/clientlib/query/model/MediaField.java b/src/main/java/com/twitter/clientlib/query/model/MediaField.java index d92863e..d27d7f0 100644 --- a/src/main/java/com/twitter/clientlib/query/model/MediaField.java +++ b/src/main/java/com/twitter/clientlib/query/model/MediaField.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.query.model; import java.util.Arrays; diff --git a/src/main/java/com/twitter/clientlib/query/model/PlaceField.java b/src/main/java/com/twitter/clientlib/query/model/PlaceField.java index 1d439ea..08ddd3a 100644 --- a/src/main/java/com/twitter/clientlib/query/model/PlaceField.java +++ b/src/main/java/com/twitter/clientlib/query/model/PlaceField.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.query.model; import java.util.Arrays; diff --git a/src/main/java/com/twitter/clientlib/query/model/PollField.java b/src/main/java/com/twitter/clientlib/query/model/PollField.java index d6a8dfa..d779f6e 100644 --- a/src/main/java/com/twitter/clientlib/query/model/PollField.java +++ b/src/main/java/com/twitter/clientlib/query/model/PollField.java @@ -1,3 +1,20 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ package com.twitter.clientlib.query.model; import java.util.Arrays; diff --git a/src/main/java/com/twitter/clientlib/query/model/TweetField.java b/src/main/java/com/twitter/clientlib/query/model/TweetField.java index 88e43e4..7644022 100644 --- a/src/main/java/com/twitter/clientlib/query/model/TweetField.java +++ b/src/main/java/com/twitter/clientlib/query/model/TweetField.java @@ -1,3 +1,20 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ package com.twitter.clientlib.query.model; import java.util.Arrays; diff --git a/src/main/java/com/twitter/clientlib/query/model/UserField.java b/src/main/java/com/twitter/clientlib/query/model/UserField.java index c22fe92..639d026 100644 --- a/src/main/java/com/twitter/clientlib/query/model/UserField.java +++ b/src/main/java/com/twitter/clientlib/query/model/UserField.java @@ -1,3 +1,20 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ package com.twitter.clientlib.query.model; import java.util.Arrays; diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 4afd66c..bde9f9e 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -14,9 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. -NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). -https://openapi-generator.tech -Do not edit the class manually. */ diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java index b042008..d2e3ca3 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamListener.java @@ -14,9 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. -NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). -https://openapi-generator.tech -Do not edit the class manually. */ diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 01157f0..79d3dad 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -1,3 +1,21 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + package com.twitter.clientlib.stream; import com.twitter.clientlib.ApiClient; @@ -6,12 +24,14 @@ import com.twitter.clientlib.api.TweetsApi; import com.twitter.clientlib.exceptions.AuthenticationException; import com.twitter.clientlib.exceptions.StreamException; +import com.twitter.clientlib.exceptions.TooManyRequestsException; import com.twitter.clientlib.query.StreamQueryParameters; import okio.BufferedSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.PreDestroy; import java.util.LinkedList; import java.util.List; @@ -39,6 +59,10 @@ public void addListener(TweetsStreamListener listener) { listeners.add(listener); } + public void removeListener(TweetsStreamListener listener) { + listeners.remove(listener); + } + public void sampleStream(StreamQueryParameters streamParameters) { try { BufferedSource streamResult = tweets.sampleStream(streamParameters); @@ -50,7 +74,13 @@ public void sampleStream(StreamQueryParameters streamParameters) { logger.error("Authentication didn't work"); throw new AuthenticationException("Not authenticated. Please check the credentials", e); } - throw new StreamException("An exception occurred during stream execution", e); + if(e.getCode() == 429){ + /* for this error twitter indicates that to implement a reconnection mechanism + * see: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections + */ + throw new TooManyRequestsException("Too many requests. Service responded with 429 status code"); + } + throw new StreamException("An exception occurred during stream execution ",e); } } @@ -58,6 +88,11 @@ public void shutdown() { executor.shutdown(); } + @PreDestroy + public void preDestroy() { + executor.shutdown(); + } + private void initBasePath() { String basePath = System.getenv("TWITTER_API_BASE_PATH"); apiClient.setBasePath(basePath != null ? basePath : "https://api.twitter.com"); diff --git a/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java b/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java new file mode 100644 index 0000000..8f1e0ff --- /dev/null +++ b/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java @@ -0,0 +1,116 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +package com.twitter.clientlib.stream; + +import com.twitter.clientlib.TwitterCredentialsBearer; +import com.twitter.clientlib.integration.ApiTester; +import com.twitter.clientlib.model.StreamingTweet; +import com.twitter.clientlib.query.StreamQueryParameters; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +@Disabled +class TwitterStreamTest extends ApiTester { + + private static TwitterStream twitterStream; + + private TweetsStreamListener courier; + private List tweets; + + @BeforeAll + public static void beforeAll() { + twitterStream = new TwitterStream(); + twitterStream.setTwitterCredentials(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); + } + + @BeforeEach + public void setUp() { + tweets = new LinkedList<>(); + courier = new TweetCourier(tweets); + twitterStream.addListener(courier); + } + + @AfterEach + public void tearDown() { + twitterStream.removeListener(courier); + twitterStream.shutdown(); + } + + @Test + public void shutdown() throws InterruptedException { + twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + TimeUnit.SECONDS.sleep(5); + assertTrue(tweets.size() > 1); + twitterStream.shutdown(); + } + + @Test + public void sampleStream() throws InterruptedException { + twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + TimeUnit.SECONDS.sleep(5); + assertTrue(tweets.size() > 1); + } + + @Test + public void sampleStreamAddSecondListener() throws InterruptedException { + List tweetsDuplicate = new LinkedList<>(); + TweetsStreamListener courierDuplicate = new TweetCourier(tweetsDuplicate); + twitterStream.addListener(courierDuplicate); + twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + TimeUnit.SECONDS.sleep(5); + assertTrue(tweets.size() > 1); + assertTrue(tweetsDuplicate.size() > 1); + assertEquals(tweets.size(), tweetsDuplicate.size()); + } + + @Test + public void sampleStreamAddSecondListenerAndRemove() throws InterruptedException { + List tweetsDuplicate = new LinkedList<>(); + TweetsStreamListener courierDuplicate = new TweetCourier(tweetsDuplicate); + twitterStream.addListener(courierDuplicate); + twitterStream.removeListener(courierDuplicate); + twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + TimeUnit.SECONDS.sleep(5); + assertTrue(tweets.size() > 1); + assertEquals(0, tweetsDuplicate.size()); + } + + private class TweetCourier implements TweetsStreamListener { + + private List tweetsList; + + private TweetCourier(List tweetsList) { + this.tweetsList = tweetsList; + } + @Override + public void onTweetArrival(StreamingTweet tweet) { + tweetsList.add(tweet); + } + } +} \ No newline at end of file From c1c27c65d6cca65a147ceafb53e6faaf45506479 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Sat, 18 Jun 2022 19:48:15 +0300 Subject: [PATCH 10/13] added retries --- .../clientlib/stream/TwitterStream.java | 19 +++++++++++++++++-- .../clientlib/stream/TwitterStreamTest.java | 1 + 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 79d3dad..1eb1fe4 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -46,7 +46,18 @@ public class TwitterStream { private TweetsStreamExecutor executor; + private int retries; + public TwitterStream() { + init(); + } + + public TwitterStream(int retries) { + this.retries = retries; + init(); + } + + private void init() { initBasePath(); tweets.setClient(apiClient); } @@ -55,6 +66,10 @@ public void setTwitterCredentials(TwitterCredentialsBearer credentials) { apiClient.setTwitterCredentials(credentials); } + public void setRetries(int retries) { + this.retries = retries; + } + public void addListener(TweetsStreamListener listener) { listeners.add(listener); } @@ -65,7 +80,7 @@ public void removeListener(TweetsStreamListener listener) { public void sampleStream(StreamQueryParameters streamParameters) { try { - BufferedSource streamResult = tweets.sampleStream(streamParameters); + BufferedSource streamResult = tweets.sampleStream(retries == 0 ? 1 : retries, streamParameters); executor = new TweetsStreamExecutor(streamResult); listeners.forEach(executor::addListener); executor.start(); @@ -78,7 +93,7 @@ public void sampleStream(StreamQueryParameters streamParameters) { /* for this error twitter indicates that to implement a reconnection mechanism * see: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections */ - throw new TooManyRequestsException("Too many requests. Service responded with 429 status code"); + throw new TooManyRequestsException("Too many requests. Service responded with 429 status code. Consider setting 'retries' or increasing its value"); } throw new StreamException("An exception occurred during stream execution ",e); } diff --git a/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java b/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java index 8f1e0ff..dcb7c9d 100644 --- a/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java +++ b/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java @@ -47,6 +47,7 @@ class TwitterStreamTest extends ApiTester { public static void beforeAll() { twitterStream = new TwitterStream(); twitterStream.setTwitterCredentials(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); + twitterStream.setRetries(4); } @BeforeEach From 70867758995db09459d5af535b5532a90acbf20a Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Sun, 19 Jun 2022 01:17:58 +0300 Subject: [PATCH 11/13] added docs and some minor changes --- README.md | 13 +- docs/StreamQueryParameters.md | 114 ++++++++ docs/TweetsApi.md | 251 +++++++++--------- docs/TwitterStream.md | 63 +++++ .../clientlib/HelloWorldStreaming.java | 6 +- .../query/StreamQueryParameters.java | 65 +++++ .../stream/TweetsStreamExecutor.java | 15 +- .../clientlib/stream/TwitterStream.java | 31 ++- .../clientlib/stream/TwitterStreamTest.java | 11 +- 9 files changed, 411 insertions(+), 158 deletions(-) create mode 100644 docs/StreamQueryParameters.md create mode 100644 docs/TwitterStream.md diff --git a/README.md b/README.md index 4237d37..dbf7dc8 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ You can find examples of using the SDK under the [examples/](examples/) director **Note: Only Twitter API V2 is supported** -- API version: 2.45 +- API version: 2.46 Twitter API v2 available endpoints @@ -228,8 +228,8 @@ In order to use the retry mechanism call the APIs with an additional parameter ` ```java int retries = 4; - streamResult = apiInstance.tweets()apiInstance.tweets().sampleStream() - .tweetFields(tweetFields) + streamResult = apiInstance.tweets().sampleStream() + .parameters(new StreamQueryParameters.Builder.withTweetFields(tweetFields).build()) .execute(retries); ``` @@ -530,6 +530,13 @@ Class | Method | HTTP request | Description - [VideoAllOfPromotedMetrics](docs/VideoAllOfPromotedMetrics.md) - [VideoAllOfPublicMetrics](docs/VideoAllOfPublicMetrics.md) +## Documentation for Stream Functionality + + +Class | Method | HTTP request | Description +------------ |--------------------------------------------------------| ------------- | ------------- +*TwitterStream* | [**sampleStream**](docs/TwitterStream.md#sampleStream) | **GET** /2/tweets/sample/stream | Processing stream with multiple threads accepts listeners with the produced tweets + diff --git a/docs/StreamQueryParameters.md b/docs/StreamQueryParameters.md new file mode 100644 index 0000000..a9921df --- /dev/null +++ b/docs/StreamQueryParameters.md @@ -0,0 +1,114 @@ + + +# StreamQueryParameters + + +## Properties + + +| Name | Type | Description | Notes | +|------------- |----------------------------------------------| ------------- | -------------| +| **backfillMinutes** | **Integer** | The number of minutes of backfill requested. | [optional] | +| **tweetFields** | [**Collection<TweetField>**](#TweetField) | A comma separated list of Tweet fields to display. | [optional] [enum: attachments, author_id, context_annotations, conversation_id, created_at, entities, geo, id, in_reply_to_user_id, lang, non_public_metrics, organic_metrics, possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets, reply_settings, source, text, withheld] | +| **expansions** | [**Collection<Expansion>**](#Expansion) | A comma separated list of fields to expand. | [optional] [enum: attachments.media_keys, attachments.poll_ids, author_id, entities.mentions.username, geo.place_id, in_reply_to_user_id, referenced_tweets.id, referenced_tweets.id.author_id] | +| **mediaFields** | [**Collection<MediaField>**](#MediaField) | A comma separated list of Media fields to display. | [optional] [enum: alt_text, duration_ms, height, media_key, non_public_metrics, organic_metrics, preview_image_url, promoted_metrics, public_metrics, type, url, variants, width] | +| **pollFields** | [**Collection<PollField>**](#PollField) | A comma separated list of Poll fields to display. | [optional] [enum: duration_minutes, end_datetime, id, options, voting_status] | +| **userFields** | [**Collection<UserField>**](#UserField) | A comma separated list of User fields to display. | [optional] [enum: created_at, description, entities, id, location, name, pinned_tweet_id, profile_image_url, protected, public_metrics, url, username, verified, withheld] | +| **placeFields** | [**Collection<PlaceField>**](#PlaceField) | A comma separated list of Place fields to display. | [optional] [enum: contained_within, country, country_code, full_name, geo, id, name, place_type] | + + + + +## Enum: TweetField + +| Name | Value | +|---- | -----| +|ATTACHMENTS|"attachments"| +|AUTHOR_ID|"author_id"| +|CONTEXT_ANNOTATIONS|"context_annotations"| +|CONVERSATION_ID|"conversation_id"| +|CREATED_AT|"created_at"| +|ENTITIES|"entities"| +|GEO|"geo"| +|ID|"id"| +|IN_REPLY_TO_USER_ID|"in_reply_to_user_id"| +|LANG|"lang"| +|POSSIBLE_SENSITIVE|"possibly_sensitive"| +|PUBLIC_METRICS|"public_metrics"| +|REFERENCED_TWEETS|"referenced_tweets"| +|REPLY_SETTINGS|"reply_settings"| +|SOURCE|"source"| +|TEXT|"text"| +|WITHHELD|"withheld"| + + +## Enum: Expansion + +| Name | Value | +|---- | -----| +|ATTACHMENTS_MEDIA_KEYS|"attachments.media_keys"| +|ATTACHMENTS_POLL_IDS|"attachments.poll_ids"| +|AUTHOR_ID|"author_id"| +|ENTITIES_MENTIONS_USERNAME|"entities.mentions.username"| +|GEO_PLACE_ID|"geo.place_id"| +|IN_REPLY_TO_USER_ID|"in_reply_to_user_id"| +|REFERENCED_TWEETS_ID|"referenced_tweets.id"| +|REFERENCED_TWEETS_ID_AUTHOR_ID|"referenced_tweets.id.author_id"| + +## Enum: MediaField + +| Name | Value | +|---- | -----| +|ALT_TEXT|"alt_text"| +|DURATION_MS|"duration_ms"| +|HEIGHT|"height"| +|MEDIA_KEY|"media_key"| +|PREVIEW_IMAGE_URL|"preview_image_url"| +|PUBLIC_METRICS|"public_metrics"| +|TYPE|"type"| +|URL|"url"| +|VARIANTS|"variants"| +|WIDTH|"width"| + + +## Enum: PollField + +| Name | Value | +|---- | -----| +|DURATION_MINUTES|"duration_minutes"| +|END_DATETIME|"end_datetime"| +|ID|"id"| +|OPTIONS|"options"| +|VOTING_STATUS|"voting_status"| + +## Enum: UserField + +| Name | Value | +|---- | -----| +|CREATED_AT|"created_at"| +|DESCRIPTION|"description"| +|ENTITIES|"entities"| +|ID|"id"| +|LOCATION|"location"| +|NAME|"name"| +|PINNED_TWEET_ID|"pinned_tweet_id"| +|PROFILE_IMAGE_URL|"profile_image_url"| +|PROTECTED|"protected"| +|PUBLIC_METRICS|"public_metrics"| +|URL|"url"| +|USERNAME|"username"| +|VERIFIED|"verified"| +|WITHHELD|"withheld"| + +## Enum: PlaceField + +| Name | Value | +|---- | -----| +|CONTAINED_WITHIN|"contained_within"| +|COUNTRY|"country"| +|COUNTRY_CODE|"country_code"| +|FULL_NAME|"full_name"| +|GEO|"geo"| +|ID|"id"| +|NAME|"name"| +|PLACE_TYPE|"place_type"| \ No newline at end of file diff --git a/docs/TweetsApi.md b/docs/TweetsApi.md index bfec075..133e967 100644 --- a/docs/TweetsApi.md +++ b/docs/TweetsApi.md @@ -895,15 +895,29 @@ public class Example { # **sampleStream** -> StreamingTweetResponse sampleStream().backfillMinutes(backfillMinutes).tweetFields(tweetFields).expansions(expansions).mediaFields(mediaFields).pollFields(pollFields).userFields(userFields).placeFields(placeFields).execute(); +```java +BufferedSource sampleStream() + .parameters(new StreamQueryParameters.Builder() + .withBackfillMinutes(backfillMinutes) + .withTweetFields(tweetFields) + .withExpansions(expansions) + .withMediaFields(mediaFields) + .withPollFields(pollFields) + .withUserFields(userFields) + .withPlaceFields(placeFields) + .build()) + .execute(); +``` Sample stream Streams a deterministic 1% of public Tweets. ### Example + ```java // Import classes: + import com.twitter.clientlib.ApiClient; import com.twitter.clientlib.ApiException; import com.twitter.clientlib.Configuration; @@ -914,10 +928,15 @@ import com.twitter.clientlib.TwitterCredentialsBearer; import com.twitter.clientlib.api.TwitterApi; import com.twitter.clientlib.api.TweetsApi; + import java.io.InputStream; + import com.google.common.reflect.TypeToken; -import java.io.BufferedReader; -import java.io.InputStreamReader; + +import com.twitter.clientlib.query.StreamQueryParameters; +import com.twitter.clientlib.query.model.TweetField; +import okio.BufferedSource; + import java.lang.reflect.Type; import java.util.List; import java.util.Set; @@ -926,76 +945,59 @@ import java.util.HashSet; import java.time.OffsetDateTime; public class Example { - public static void main(String[] args) { - // Set the credentials based on the API's "security" tag values. - // Check the API definition in https://api.twitter.com/2/openapi.json - // When multiple options exist, the SDK supports only "OAuth2UserToken" or "BearerToken" + public static void main(String[] args) { + // Set the credentials based on the API's "security" tag values. + // Check the API definition in https://api.twitter.com/2/openapi.json + // When multiple options exist, the SDK supports only "OAuth2UserToken" or "BearerToken" - // Uncomment and set the credentials configuration - - // Configure HTTP bearer authorization: - // TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); + // Uncomment and set the credentials configuration + + // Configure HTTP bearer authorization: + // TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); TwitterApi apiInstance = new TwitterApi(credentials); - // Set the params values - Integer backfillMinutes = 56; // Integer | The number of minutes of backfill requested. - Set tweetFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Tweet fields to display. - Set expansions = new HashSet<>(Arrays.asList()); // Set | A comma separated list of fields to expand. - Set mediaFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Media fields to display. - Set pollFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Poll fields to display. - Set userFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of User fields to display. - Set placeFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Place fields to display. - try { - InputStream result = apiInstance.tweets().sampleStream() - .backfillMinutes(backfillMinutes) - .tweetFields(tweetFields) - .expansions(expansions) - .mediaFields(mediaFields) - .pollFields(pollFields) - .userFields(userFields) - .placeFields(placeFields) - .execute(); - try{ - JSON json = new JSON(); - Type localVarReturnType = new TypeToken(){}.getType(); - BufferedReader reader = new BufferedReader(new InputStreamReader(result)); - String line = reader.readLine(); - while (line != null) { - if(line.isEmpty()) { - System.out.println("==> Empty line"); - line = reader.readLine(); - continue; - } - Object jsonObject = json.getGson().fromJson(line, localVarReturnType); - System.out.println(jsonObject != null ? jsonObject.toString() : "Null object"); - line = reader.readLine(); - } - }catch (Exception e) { - e.printStackTrace(); - System.out.println(e); + try { + BufferedSource result = apiInstance.tweets().sampleStream() + .parameters(new StreamQueryParameters.Builder() + .withBackfillMinutes(56) + .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) + .build()) + .execute(); + try { + JSON json = new JSON(); + Type localVarReturnType = new TypeToken() { + }.getType(); + String line = result.readUtf8Line(); + while (line != null) { + if (line.isEmpty()) { + System.out.println("==> Empty line"); + line = result.readUtf8Line(); + continue; + } + Object jsonObject = json.getGson().fromJson(line, localVarReturnType); + System.out.println(jsonObject != null ? jsonObject.toString() : "Null object"); + line = result.readUtf8Line(); + } + } catch (Exception e) { + e.printStackTrace(); + System.out.println(e); } - } catch (ApiException e) { - System.err.println("Exception when calling TweetsApi#sampleStream"); - System.err.println("Status code: " + e.getCode()); - System.err.println("Reason: " + e.getResponseBody()); - System.err.println("Response headers: " + e.getResponseHeaders()); - e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling TweetsApi#sampleStream"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } } - } } ``` ### Parameters -| Name | Type | Description | Notes | -|------------- | ------------- | ------------- | -------------| -| **backfillMinutes** | **Integer**| The number of minutes of backfill requested. | [optional] | -| **tweetFields** | [**Set<String>**](String.md)| A comma separated list of Tweet fields to display. | [optional] [enum: attachments, author_id, context_annotations, conversation_id, created_at, entities, geo, id, in_reply_to_user_id, lang, non_public_metrics, organic_metrics, possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets, reply_settings, source, text, withheld] | -| **expansions** | [**Set<String>**](String.md)| A comma separated list of fields to expand. | [optional] [enum: attachments.media_keys, attachments.poll_ids, author_id, entities.mentions.username, geo.place_id, in_reply_to_user_id, referenced_tweets.id, referenced_tweets.id.author_id] | -| **mediaFields** | [**Set<String>**](String.md)| A comma separated list of Media fields to display. | [optional] [enum: alt_text, duration_ms, height, media_key, non_public_metrics, organic_metrics, preview_image_url, promoted_metrics, public_metrics, type, url, variants, width] | -| **pollFields** | [**Set<String>**](String.md)| A comma separated list of Poll fields to display. | [optional] [enum: duration_minutes, end_datetime, id, options, voting_status] | -| **userFields** | [**Set<String>**](String.md)| A comma separated list of User fields to display. | [optional] [enum: created_at, description, entities, id, location, name, pinned_tweet_id, profile_image_url, protected, public_metrics, url, username, verified, withheld] | -| **placeFields** | [**Set<String>**](String.md)| A comma separated list of Place fields to display. | [optional] [enum: contained_within, country, country_code, full_name, geo, id, name, place_type] | +| Name | Type | Description | Notes | +|-------|-----------------------------------------------------------------|------------|-------| +| **parameters** | [**StreamQueryParameters**](StreamQueryParameters.md) | | | ### Return type @@ -1018,15 +1020,29 @@ public class Example { # **searchStream** -> FilteredStreamingTweetResponse searchStream().backfillMinutes(backfillMinutes).tweetFields(tweetFields).expansions(expansions).mediaFields(mediaFields).pollFields(pollFields).userFields(userFields).placeFields(placeFields).execute(); +```java +FilteredStreamingTweetResponse searchStream() + .parameters(new StreamQueryParameters.Builder() + .withBackfillMinutes(backfillMinutes) + .withTweetFields(tweetFields) + .withExpansions(expansions) + .withMediaFields(mediaFields) + .withPollFields(pollFields) + .withUserFields(userFields) + .withPlaceFields(placeFields) + .build()) + .execute(); +``` Filtered stream Streams Tweets matching the stream's active rule set. ### Example + ```java // Import classes: + import com.twitter.clientlib.ApiClient; import com.twitter.clientlib.ApiException; import com.twitter.clientlib.Configuration; @@ -1037,10 +1053,14 @@ import com.twitter.clientlib.TwitterCredentialsBearer; import com.twitter.clientlib.api.TwitterApi; import com.twitter.clientlib.api.TweetsApi; + import java.io.InputStream; + import com.google.common.reflect.TypeToken; -import java.io.BufferedReader; -import java.io.InputStreamReader; +import com.twitter.clientlib.query.StreamQueryParameters; +import com.twitter.clientlib.query.model.TweetField; +import okio.BufferedSource; + import java.lang.reflect.Type; import java.util.List; import java.util.Set; @@ -1049,62 +1069,51 @@ import java.util.HashSet; import java.time.OffsetDateTime; public class Example { - public static void main(String[] args) { - // Set the credentials based on the API's "security" tag values. - // Check the API definition in https://api.twitter.com/2/openapi.json - // When multiple options exist, the SDK supports only "OAuth2UserToken" or "BearerToken" + public static void main(String[] args) { + // Set the credentials based on the API's "security" tag values. + // Check the API definition in https://api.twitter.com/2/openapi.json + // When multiple options exist, the SDK supports only "OAuth2UserToken" or "BearerToken" - // Uncomment and set the credentials configuration - - // Configure HTTP bearer authorization: - // TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); + // Uncomment and set the credentials configuration + + // Configure HTTP bearer authorization: + // TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); TwitterApi apiInstance = new TwitterApi(credentials); - // Set the params values - Integer backfillMinutes = 56; // Integer | The number of minutes of backfill requested. - Set tweetFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Tweet fields to display. - Set expansions = new HashSet<>(Arrays.asList()); // Set | A comma separated list of fields to expand. - Set mediaFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Media fields to display. - Set pollFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Poll fields to display. - Set userFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of User fields to display. - Set placeFields = new HashSet<>(Arrays.asList()); // Set | A comma separated list of Place fields to display. - try { - InputStream result = apiInstance.tweets().searchStream() - .backfillMinutes(backfillMinutes) - .tweetFields(tweetFields) - .expansions(expansions) - .mediaFields(mediaFields) - .pollFields(pollFields) - .userFields(userFields) - .placeFields(placeFields) - .execute(); - try{ - JSON json = new JSON(); - Type localVarReturnType = new TypeToken(){}.getType(); - BufferedReader reader = new BufferedReader(new InputStreamReader(result)); - String line = reader.readLine(); - while (line != null) { - if(line.isEmpty()) { - System.out.println("==> Empty line"); - line = reader.readLine(); - continue; - } - Object jsonObject = json.getGson().fromJson(line, localVarReturnType); - System.out.println(jsonObject != null ? jsonObject.toString() : "Null object"); - line = reader.readLine(); - } - }catch (Exception e) { - e.printStackTrace(); - System.out.println(e); + try { + BufferedSource result = apiInstance.tweets().searchStream() + .parameters(new StreamQueryParameters.Builder() + .withBackfillMinutes(56) + .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) + .build()) + .execute(); + try { + JSON json = new JSON(); + Type localVarReturnType = new TypeToken() { + }.getType(); + String line = result.readUtf8Line(); + while (line != null) { + if (line.isEmpty()) { + System.out.println("==> Empty line"); + line = result.readUtf8Line(); + continue; + } + Object jsonObject = json.getGson().fromJson(line, localVarReturnType); + System.out.println(jsonObject != null ? jsonObject.toString() : "Null object"); + line = result.readUtf8Line(); + } + } catch (Exception e) { + e.printStackTrace(); + System.out.println(e); } - } catch (ApiException e) { - System.err.println("Exception when calling TweetsApi#searchStream"); - System.err.println("Status code: " + e.getCode()); - System.err.println("Reason: " + e.getResponseBody()); - System.err.println("Response headers: " + e.getResponseHeaders()); - e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling TweetsApi#searchStream"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } } - } } ``` @@ -1112,13 +1121,7 @@ public class Example { | Name | Type | Description | Notes | |------------- | ------------- | ------------- | -------------| -| **backfillMinutes** | **Integer**| The number of minutes of backfill requested. | [optional] | -| **tweetFields** | [**Set<String>**](String.md)| A comma separated list of Tweet fields to display. | [optional] [enum: attachments, author_id, context_annotations, conversation_id, created_at, entities, geo, id, in_reply_to_user_id, lang, non_public_metrics, organic_metrics, possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets, reply_settings, source, text, withheld] | -| **expansions** | [**Set<String>**](String.md)| A comma separated list of fields to expand. | [optional] [enum: attachments.media_keys, attachments.poll_ids, author_id, entities.mentions.username, geo.place_id, in_reply_to_user_id, referenced_tweets.id, referenced_tweets.id.author_id] | -| **mediaFields** | [**Set<String>**](String.md)| A comma separated list of Media fields to display. | [optional] [enum: alt_text, duration_ms, height, media_key, non_public_metrics, organic_metrics, preview_image_url, promoted_metrics, public_metrics, type, url, variants, width] | -| **pollFields** | [**Set<String>**](String.md)| A comma separated list of Poll fields to display. | [optional] [enum: duration_minutes, end_datetime, id, options, voting_status] | -| **userFields** | [**Set<String>**](String.md)| A comma separated list of User fields to display. | [optional] [enum: created_at, description, entities, id, location, name, pinned_tweet_id, profile_image_url, protected, public_metrics, url, username, verified, withheld] | -| **placeFields** | [**Set<String>**](String.md)| A comma separated list of Place fields to display. | [optional] [enum: contained_within, country, country_code, full_name, geo, id, name, place_type] | +| **parameters** | [**StreamQueryParameters**](StreamQueryParameters.md) | | | ### Return type diff --git a/docs/TwitterStream.md b/docs/TwitterStream.md new file mode 100644 index 0000000..332e75c --- /dev/null +++ b/docs/TwitterStream.md @@ -0,0 +1,63 @@ +# TwitterStream + +A wrapper for calling the stream endpoints reading the BufferedSource and producing deserialized responses from every line of the BufferedSource. + + + +# **sampleStream** + +```java +twitterStream.addListener(new TweetListener()); +twitterStream.startSampleStream(new StreamQueryParameters.Builder().build()); +``` + +The listener must implement this method +```java +void onTweetArrival(StreamingTweetResponse streamingTweet); +``` + +### Example + +```java +import com.twitter.clientlib.model.*; +import com.twitter.clientlib.query.StreamQueryParameters; +import com.twitter.clientlib.query.model.TweetField; +import com.twitter.clientlib.stream.TweetsStreamListener; +import com.twitter.clientlib.stream.TwitterStream; + + +public class HelloWorldStreaming { + + public static void main(String[] args) { + /** + * Set the credentials for the required APIs. + * The Java SDK supports TwitterCredentialsOAuth2 & TwitterCredentialsBearer. + * Check the 'security' tag of the required APIs in https://api.twitter.com/2/openapi.json in order + * to use the right credential object. + */ + TwitterStream twitterStream = new TwitterStream(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); + twitterStream.addListener(new Responder()); + + twitterStream.startSampleStream(new StreamQueryParameters.Builder() + .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) + .build()); + + } +} + +class Responder implements TweetsStreamListener { + @Override + public void onTweetArrival(StreamingTweetResponse streamingTweet) { + if (streamingTweet == null) { + System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); + return; + } + + if (streamingTweet.getErrors() != null) { + streamingTweet.getErrors().forEach(System.out::println); + } else if (streamingTweet.getData() != null) { + System.out.println("New streaming tweet: " + streamingTweet.getData().getText()); + } + } +} +``` diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 61717e0..61f1f92 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -39,12 +39,10 @@ public static void main(String[] args) { * Check the 'security' tag of the required APIs in https://api.twitter.com/2/openapi.json in order * to use the right credential object. */ - TwitterCredentialsBearer credentials = new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")); - TwitterStream twitterStream = new TwitterStream(); - twitterStream.setTwitterCredentials(credentials); + TwitterStream twitterStream = new TwitterStream(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); twitterStream.addListener(new Responder()); - twitterStream.sampleStream(new StreamQueryParameters.Builder() + twitterStream.startSampleStream(new StreamQueryParameters.Builder() .withTweetFields(TweetField.AUTHOR_ID, TweetField.ID, TweetField.CREATED_AT) .build()); diff --git a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java index cb19d0c..9f74a59 100644 --- a/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java +++ b/src/main/java/com/twitter/clientlib/query/StreamQueryParameters.java @@ -120,66 +120,131 @@ public Builder() { this.placeFields = new ArrayList<>(); } + /** + * Set tweetFields + * @param tweetFields A comma separated list of Tweet fields to display. (optional) + * @return Builder + */ public Builder withTweetFields(TweetField... tweetFields) { this.tweetFields.addAll(Arrays.asList(tweetFields)); return this; } + /** + * Set tweetFields + * @param tweetFields A comma separated list of Tweet fields to display. (optional) + * @return Builder + */ public Builder withTweetFields(Collection tweetFields) { this.tweetFields.addAll(tweetFields); return this; } + /** + * Set mediaFields + * @param mediaFields A comma separated list of Media fields to display. (optional) + * @return Builder + */ public Builder withMediaFields(MediaField... mediaFields) { this.mediaFields.addAll(Arrays.asList(mediaFields)); return this; } + /** + * Set mediaFields + * @param mediaFields A comma separated list of Media fields to display. (optional) + * @return Builder + */ public Builder withMediaFields(Collection mediaFields) { this.mediaFields.addAll(mediaFields); return this; } + /** + * Set pollFields + * @param pollFields A comma separated list of Poll fields to display. (optional) + * @return Builder + */ public Builder withPollFields(PollField... pollFields) { this.pollFields.addAll(Arrays.asList(pollFields)); return this; } + /** + * Set pollFields + * @param pollFields A comma separated list of Poll fields to display. (optional) + * @return Builder + */ public Builder withPollFields(Collection pollFields) { this.pollFields.addAll(pollFields); return this; } + /** + * Set userFields + * @param userFields A comma separated list of User fields to display. (optional) + * @return Builder + */ public Builder withUserFields(UserField... userFields) { this.userFields.addAll(Arrays.asList(userFields)); return this; } + /** + * Set userFields + * @param userFields A comma separated list of User fields to display. (optional) + * @return Builder + */ public Builder withUserFields(Collection userFields) { this.userFields.addAll(userFields); return this; } + /** + * Set placeFields + * @param placeFields A comma separated list of Place fields to display. (optional) + * @return Builder + */ public Builder withPlaceFields(PlaceField... placeFields) { this.placeFields.addAll(Arrays.asList(placeFields)); return this; } + /** + * Set placeFields + * @param placeFields A comma separated list of Place fields to display. (optional) + * @return Builder + */ public Builder withPlaceFields(Collection placeFields) { this.placeFields.addAll(placeFields); return this; } + /** + * Set expansions + * @param expansions A comma separated list of fields to expand. (optional) + * @return Builder + */ public Builder withExpansions(Expansion... expansions) { this.expansions.addAll(Arrays.asList(expansions)); return this; } + /** + * Set expansions + * @param expansions A comma separated list of fields to expand. (optional) + * @return Builder + */ public Builder withExpansions(Collection expansions) { this.expansions.addAll(expansions); return this; } + /** + * Set backfillMinutes + * @param backfillMinutes The number of minutes of backfill requested. (optional) + * @return Builder + */ public Builder withBackfillMinutes(Integer backFillMinutes) { this.backFillMinutes = backFillMinutes; return this; diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index e6b15c9..40dcdb8 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -45,17 +45,13 @@ public class TweetsStreamExecutor { private static final long EMPTY_STREAM_TIMEOUT = 20000; private static final int POLL_WAIT = 5; + private volatile BlockingQueue rawTweets; private volatile BlockingQueue tweets; private volatile boolean isRunning = true; - - private long startTime; - private int tweetsCount = 0; - private final int tweetsLimit = 250000; private ExecutorService rawTweetsQueuerService; private ExecutorService deserializationService; private ExecutorService listenersService; - private final List listeners = new ArrayList<>(); private BufferedSource stream; @@ -78,7 +74,6 @@ public void start() { logger.error("Stream is null. Exiting..."); return; } - startTime = System.currentTimeMillis(); rawTweetsQueuerService = Executors.newSingleThreadExecutor(); rawTweetsQueuerService.submit(new RawTweetsQueuer()); @@ -212,14 +207,6 @@ private void processTweets() { for (TweetsStreamListener listener : listeners) { listener.onTweetArrival(streamingTweet); } - tweetsCount++; - if(tweetsCount == tweetsLimit) { - long stopTime = System.currentTimeMillis(); - long durationInMillis = stopTime - startTime; - double seconds = durationInMillis / 1000.0; - logger.info("Total duration in seconds: {}", seconds); - shutdown(); - } } catch (InterruptedException e) { } diff --git a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java index 8a7c92f..ac0f312 100644 --- a/src/main/java/com/twitter/clientlib/stream/TwitterStream.java +++ b/src/main/java/com/twitter/clientlib/stream/TwitterStream.java @@ -48,8 +48,9 @@ public class TwitterStream { private int retries; - public TwitterStream() { + public TwitterStream(TwitterCredentialsBearer credentials) { init(); + apiClient.setTwitterCredentials(credentials); } public TwitterStream(int retries) { @@ -62,25 +63,38 @@ private void init() { tweets.setClient(apiClient); } - public void setTwitterCredentials(TwitterCredentialsBearer credentials) { - apiClient.setTwitterCredentials(credentials); - } - + /** + * Set retries + * @param retries the retries for reconnections on 429 status code errors + */ public void setRetries(int retries) { this.retries = retries; } + /** + * Add listener + * @param listener {@link TweetsStreamListener} for 'listening' the tweets returned from the stream + */ public void addListener(TweetsStreamListener listener) { listeners.add(listener); } + /** + * Remove listener + * @param listener {@link TweetsStreamListener} to be removed + */ public void removeListener(TweetsStreamListener listener) { listeners.remove(listener); } - public void sampleStream(StreamQueryParameters streamParameters) { + /** + * Start streaming for sampleSearch + * @param streamParameters {@link StreamQueryParameters} the parameters for the request + * @return the results are returns through the listeners {@link #addListener(TweetsStreamListener)} + */ + public void startSampleStream(StreamQueryParameters streamParameters) { try { - BufferedSource streamResult = tweets.sampleStream().parameters(streamParameters).execute(retries == 0 ? 1 : retries); + BufferedSource streamResult = tweets.sampleStream().parameters(streamParameters).execute(retries <= 0 ? 1 : retries); executor = new TweetsStreamExecutor(streamResult); listeners.forEach(executor::addListener); executor.start(); @@ -99,6 +113,9 @@ public void sampleStream(StreamQueryParameters streamParameters) { } } + /** + * Closes the threads and performs any other required clean up + */ public void shutdown() { executor.shutdown(); } diff --git a/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java b/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java index 9923c97..d41c35f 100644 --- a/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java +++ b/src/test/java/com/twitter/clientlib/stream/TwitterStreamTest.java @@ -44,8 +44,7 @@ class TwitterStreamTest extends ApiTester { @BeforeAll public static void beforeAll() { - twitterStream = new TwitterStream(); - twitterStream.setTwitterCredentials(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); + twitterStream = new TwitterStream(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); twitterStream.setRetries(4); } @@ -64,7 +63,7 @@ public void tearDown() { @Test public void shutdown() throws InterruptedException { - twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + twitterStream.startSampleStream(new StreamQueryParameters.Builder().build()); TimeUnit.SECONDS.sleep(5); assertTrue(tweets.size() > 1); twitterStream.shutdown(); @@ -72,7 +71,7 @@ public void shutdown() throws InterruptedException { @Test public void sampleStream() throws InterruptedException { - twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + twitterStream.startSampleStream(new StreamQueryParameters.Builder().build()); TimeUnit.SECONDS.sleep(5); assertTrue(tweets.size() > 1); } @@ -82,7 +81,7 @@ public void sampleStreamAddSecondListener() throws InterruptedException { List tweetsDuplicate = new LinkedList<>(); TweetsStreamListener courierDuplicate = new TweetCourier(tweetsDuplicate); twitterStream.addListener(courierDuplicate); - twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + twitterStream.startSampleStream(new StreamQueryParameters.Builder().build()); TimeUnit.SECONDS.sleep(5); assertTrue(tweets.size() > 1); assertTrue(tweetsDuplicate.size() > 1); @@ -95,7 +94,7 @@ public void sampleStreamAddSecondListenerAndRemove() throws InterruptedException TweetsStreamListener courierDuplicate = new TweetCourier(tweetsDuplicate); twitterStream.addListener(courierDuplicate); twitterStream.removeListener(courierDuplicate); - twitterStream.sampleStream(new StreamQueryParameters.Builder().build()); + twitterStream.startSampleStream(new StreamQueryParameters.Builder().build()); TimeUnit.SECONDS.sleep(5); assertTrue(tweets.size() > 1); assertEquals(0, tweetsDuplicate.size()); From 96f1b740397f5697a300d6dd840e889ecb8b1923 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Sun, 19 Jun 2022 16:10:32 +0300 Subject: [PATCH 12/13] java date time instead of millis vars for empty stream timeout mechanism --- .../clientlib/stream/TweetsStreamExecutor.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index 40dcdb8..c84fd3f 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -21,6 +21,9 @@ import java.io.IOException; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -43,7 +46,7 @@ public class TweetsStreamExecutor { private static final Logger logger = LoggerFactory.getLogger(TweetsStreamExecutor.class); - private static final long EMPTY_STREAM_TIMEOUT = 20000; + private static final long EMPTY_STREAM_TIMEOUT = 20; private static final int POLL_WAIT = 5; private volatile BlockingQueue rawTweets; @@ -132,17 +135,15 @@ public void queueTweets() { String line = null; try { boolean emptyResponse = false; - long firstEmptyResponseMillis = 0; - long lastEmptyReponseMillis; + LocalDateTime firstEmpty = LocalDateTime.now(); while (isRunning) { line = stream.readUtf8Line(); if(line == null || line.isEmpty()) { if(!emptyResponse) { - firstEmptyResponseMillis = System.currentTimeMillis(); + firstEmpty = LocalDateTime.now(); emptyResponse = true; } else { - lastEmptyReponseMillis = System.currentTimeMillis(); - if(lastEmptyReponseMillis - firstEmptyResponseMillis > EMPTY_STREAM_TIMEOUT) { + if(LocalDateTime.now().minus(EMPTY_STREAM_TIMEOUT, ChronoUnit.SECONDS).isAfter(firstEmpty)) { throw new EmptyStreamTimeoutException(String.format("Stream was empty for %d seconds consecutively", EMPTY_STREAM_TIMEOUT)); } } From 18068c8ef21a4e6e5c8528b2e5bf607329a19e60 Mon Sep 17 00:00:00 2001 From: Vasilis Gakias Date: Tue, 21 Jun 2022 23:33:44 +0300 Subject: [PATCH 13/13] switched back to gson for deserialization thread --- .../clientlib/stream/TweetsStreamExecutor.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java index c84fd3f..6ee2d1a 100644 --- a/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java +++ b/src/main/java/com/twitter/clientlib/stream/TweetsStreamExecutor.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -33,8 +32,6 @@ import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.clientlib.exceptions.EmptyStreamTimeoutException; import com.twitter.clientlib.model.StreamingTweetResponse; @@ -77,7 +74,6 @@ public void start() { logger.error("Stream is null. Exiting..."); return; } - rawTweetsQueuerService = Executors.newSingleThreadExecutor(); rawTweetsQueuerService.submit(new RawTweetsQueuer()); @@ -166,12 +162,6 @@ public void queueTweets() { private class DeserializeTweetsTask implements Runnable { private final Logger logger = LoggerFactory.getLogger(DeserializeTweetsTask.class); - private final ObjectMapper objectMapper; - - private DeserializeTweetsTask() { - this.objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } @Override public void run() { @@ -179,12 +169,14 @@ public void run() { try { String rawTweet = rawTweets.poll(POLL_WAIT, TimeUnit.MILLISECONDS); if (rawTweet == null) continue; - StreamingTweetResponse tweet = objectMapper.readValue(rawTweet, StreamingTweetResponse.class); + StreamingTweetResponse tweet = StreamingTweetResponse.fromJson(rawTweet); tweets.put(tweet); } catch (InterruptedException ignore) { } catch (JsonProcessingException e) { logger.debug("Json could not be parsed"); + } catch (Exception e) { + logger.debug("Exception in deserialization thread: ", e); } } }