-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTwitterProducerOptimized.java
146 lines (123 loc) · 5.76 KB
/
TwitterProducerOptimized.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package csw.kafka.study.twitter;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TwitterProducerOptimized {
final Logger logger = LoggerFactory.getLogger(TwitterProducerOptimized.class.getName());
public TwitterProducerOptimized() {
// pass
}
public static void main(String[] args) {
new TwitterProducerOptimized().run();
}
public void run() {
logger.info("Setting up...");
// 트위터 클라이언트
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);
Client client = createTwitterProducer(msgQueue);
client.connect();
// Kafka Producer
KafkaProducer<String, String> producer = createKafkaProducer();
// 종료 hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping application...");
client.stop();
producer.close();
logger.info("Bye!");
}));
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null) {
logger.info(msg);
// Kafka에 트윗 전송하기
// kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1
producer.send(new ProducerRecord<>("twitter-tweets", null, msg), (recordMetadata, e) -> {
if (e != null) {
logger.info("Something bad happended", e);
}
});
}
}
logger.info("End of application.");
}
private KafkaProducer<String, String> createKafkaProducer() {
String myServer = "192.168.137.232:9093"; // My Hyper-V Server
// Kafka Producer 설정
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Safe Producer 설정
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
// High throughput Producer 설정
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024));
// Kafka Producer 만들기
return new KafkaProducer<>(properties);
}
public Client createTwitterProducer(BlockingQueue<String> msgQueue) {
// Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) //
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("kafka", "covid", "korea"); // tweets about kafka
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
final File fileName = new File("src/main/java/csw/kafka/study/twitter/app.config");
final Properties prop = new Properties();
InputStream is = null;
try {
is = new FileInputStream(fileName.getAbsolutePath());
} catch (FileNotFoundException ex) {
logger.error("app.config not found." + fileName.getAbsolutePath());
// do nothing
}
try {
prop.load(is);
} catch (IOException ex) {
logger.error("app.config load failed.");
// do nothing
}
String consumerSecret = prop.getProperty("consumerSecret");
String consumerKey = prop.getProperty("consumerKey");
String token = prop.getProperty("token");
String tokenSecret = prop.getProperty("tokenSecret");
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
return builder.build();
}
}