Skip to content

Commit f38dafb

Browse files
author
Mike Kobyakov
committed
add getLogger(..., Sender) method to the factory; add an asynchronous wrapper around RawSocketSender
1 parent cb20c25 commit f38dafb

File tree

2 files changed

+112
-2
lines changed

2 files changed

+112
-2
lines changed

src/main/java/org/fluentd/logger/FluentLoggerFactory.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.lang.reflect.Constructor;
2121
import java.lang.reflect.InvocationTargetException;
22+
import java.util.Collections;
2223
import java.util.Map;
2324
import java.util.Properties;
2425
import java.util.WeakHashMap;
@@ -33,7 +34,7 @@ public class FluentLoggerFactory {
3334
private final Map<String, FluentLogger> loggers;
3435

3536
public FluentLoggerFactory() {
36-
loggers = new WeakHashMap<String, FluentLogger>();
37+
loggers = Collections.synchronizedMap(new WeakHashMap<String, FluentLogger>());
3738
}
3839

3940
public FluentLogger getLogger(String tagPrefix) {
@@ -48,7 +49,7 @@ public FluentLogger getLogger(String tagPrefix, String host, int port, int timeo
4849
return getLogger(tagPrefix, host, port, timeout, bufferCapacity, new ExponentialDelayReconnector());
4950
}
5051

51-
public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
52+
public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
5253
Reconnector reconnector) {
5354
String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity });
5455
if (loggers.containsKey(key)) {
@@ -73,6 +74,21 @@ public synchronized FluentLogger getLogger(String tagPrefix, String host, int po
7374
}
7475
}
7576

77+
public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
78+
Sender sender) {
79+
if (sender == null) {
80+
return getLogger(tagPrefix, host, port, timeout, bufferCapacity);
81+
}
82+
String key = String.format("%s_%s_%d_%d_%d_%s", new Object[] { tagPrefix, host, port, timeout, bufferCapacity, sender == null ? "null" : sender .getName() });
83+
if (loggers.containsKey(key)) {
84+
return loggers.get(key);
85+
} else {
86+
FluentLogger logger = new FluentLogger(tagPrefix, sender);
87+
loggers.put(key, logger);
88+
return logger;
89+
}
90+
}
91+
7692
@SuppressWarnings("unchecked")
7793
private Sender createSenderInstance(final String className, final Object[] params) throws ClassNotFoundException,
7894
SecurityException, NoSuchMethodException, IllegalArgumentException, InstantiationException,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
2+
package org.fluentd.logger.sender;
3+
4+
import java.util.Map;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
8+
import org.fluentd.logger.sender.ExponentialDelayReconnector;
9+
import org.fluentd.logger.sender.RawSocketSender;
10+
import org.fluentd.logger.sender.Reconnector;
11+
import org.fluentd.logger.sender.Sender;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
/**
16+
*
17+
* @author mkobyakov
18+
*
19+
*/
20+
public class AsyncRawSocketSender implements Sender {
21+
22+
private RawSocketSender sender;
23+
private Reconnector reconnector;
24+
25+
@SuppressWarnings("unused")
26+
private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class);
27+
28+
private static final ExecutorService flusher = Executors.newSingleThreadExecutor();
29+
30+
public AsyncRawSocketSender() {
31+
this("localhost", 24224);
32+
}
33+
34+
public AsyncRawSocketSender(String host, int port) {
35+
this(host, port, 3 * 1000, 8 * 1024 * 1024);
36+
}
37+
38+
public AsyncRawSocketSender(String host, int port, int timeout,
39+
int bufferCapacity) {
40+
this(host, port, timeout, bufferCapacity,
41+
new ExponentialDelayReconnector());
42+
}
43+
44+
public AsyncRawSocketSender(String host, int port, int timeout,
45+
int bufferCapacity, Reconnector reconnector) {
46+
this.reconnector = reconnector;
47+
this.sender = new RawSocketSender(host, port, timeout, bufferCapacity,
48+
reconnector);
49+
}
50+
51+
@Override
52+
public synchronized void flush() {
53+
final RawSocketSender sender = this.sender;
54+
flusher.execute(new Runnable() {
55+
@Override
56+
public void run() {
57+
sender.flush();
58+
}
59+
});
60+
}
61+
62+
@Override
63+
public void close() {
64+
sender.close();
65+
}
66+
67+
@Override
68+
public boolean emit(String tag, Map<String, Object> data) {
69+
return emit(tag, System.currentTimeMillis() / 1000, data);
70+
}
71+
72+
@Override
73+
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) {
74+
final RawSocketSender sender = this.sender;
75+
flusher.execute(new Runnable() {
76+
@Override
77+
public void run() {
78+
sender.emit(tag, timestamp, data);
79+
}
80+
});
81+
82+
return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis());
83+
}
84+
85+
@Override
86+
public String getName() {
87+
return sender.getName();
88+
}
89+
90+
@Override
91+
public boolean isConnected() {
92+
return sender.isConnected();
93+
}
94+
}

0 commit comments

Comments
 (0)