From f31d66781813faebd3f58eba1635a4d133c46fe8 Mon Sep 17 00:00:00 2001 From: Mike Kobyakov Date: Thu, 4 Sep 2014 12:15:03 -0700 Subject: [PATCH 1/2] add getLogger(..., Sender) method to the factory; add an asynchronous wrapper around RawSocketSender --- .../fluentd/logger/FluentLoggerFactory.java | 20 +++- .../logger/sender/AsyncRawSocketSender.java | 94 +++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java diff --git a/src/main/java/org/fluentd/logger/FluentLoggerFactory.java b/src/main/java/org/fluentd/logger/FluentLoggerFactory.java index 9c2d5a5..932043e 100644 --- a/src/main/java/org/fluentd/logger/FluentLoggerFactory.java +++ b/src/main/java/org/fluentd/logger/FluentLoggerFactory.java @@ -19,6 +19,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.WeakHashMap; @@ -33,7 +34,7 @@ public class FluentLoggerFactory { private final Map loggers; public FluentLoggerFactory() { - loggers = new WeakHashMap(); + loggers = Collections.synchronizedMap(new WeakHashMap()); } public FluentLogger getLogger(String tagPrefix) { @@ -48,7 +49,7 @@ public FluentLogger getLogger(String tagPrefix, String host, int port, int timeo return getLogger(tagPrefix, host, port, timeout, bufferCapacity, new ExponentialDelayReconnector()); } - public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, + public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) { String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity }); if (loggers.containsKey(key)) { @@ -73,6 +74,21 @@ public synchronized FluentLogger getLogger(String tagPrefix, String host, int po } } + public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, + Sender sender) { + if (sender == null) { + return getLogger(tagPrefix, host, port, timeout, bufferCapacity); + } + String key = String.format("%s_%s_%d_%d_%d_%s", new Object[] { tagPrefix, host, port, timeout, bufferCapacity, sender == null ? "null" : sender .getName() }); + if (loggers.containsKey(key)) { + return loggers.get(key); + } else { + FluentLogger logger = new FluentLogger(tagPrefix, sender); + loggers.put(key, logger); + return logger; + } + } + @SuppressWarnings("unchecked") private Sender createSenderInstance(final String className, final Object[] params) throws ClassNotFoundException, SecurityException, NoSuchMethodException, IllegalArgumentException, InstantiationException, diff --git a/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java new file mode 100644 index 0000000..b1a96eb --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java @@ -0,0 +1,94 @@ + +package org.fluentd.logger.sender; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.fluentd.logger.sender.ExponentialDelayReconnector; +import org.fluentd.logger.sender.RawSocketSender; +import org.fluentd.logger.sender.Reconnector; +import org.fluentd.logger.sender.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author mkobyakov + * + */ +public class AsyncRawSocketSender implements Sender { + + private RawSocketSender sender; + private Reconnector reconnector; + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class); + + private static final ExecutorService flusher = Executors.newSingleThreadExecutor(); + + public AsyncRawSocketSender() { + this("localhost", 24224); + } + + public AsyncRawSocketSender(String host, int port) { + this(host, port, 3 * 1000, 8 * 1024 * 1024); + } + + public AsyncRawSocketSender(String host, int port, int timeout, + int bufferCapacity) { + this(host, port, timeout, bufferCapacity, + new ExponentialDelayReconnector()); + } + + public AsyncRawSocketSender(String host, int port, int timeout, + int bufferCapacity, Reconnector reconnector) { + this.reconnector = reconnector; + this.sender = new RawSocketSender(host, port, timeout, bufferCapacity, + reconnector); + } + + @Override + public synchronized void flush() { + final RawSocketSender sender = this.sender; + flusher.execute(new Runnable() { + @Override + public void run() { + sender.flush(); + } + }); + } + + @Override + public void close() { + sender.close(); + } + + @Override + public boolean emit(String tag, Map data) { + return emit(tag, System.currentTimeMillis() / 1000, data); + } + + @Override + public boolean emit(final String tag, final long timestamp, final Map data) { + final RawSocketSender sender = this.sender; + flusher.execute(new Runnable() { + @Override + public void run() { + sender.emit(tag, timestamp, data); + } + }); + + return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis()); + } + + @Override + public String getName() { + return sender.getName(); + } + + @Override + public boolean isConnected() { + return sender.isConnected(); + } +} From 1b22f2f046ca3c04d17ab1448f9fc5a0b4a6c20d Mon Sep 17 00:00:00 2001 From: Mike Kobyakov Date: Thu, 4 Sep 2014 12:15:03 -0700 Subject: [PATCH 2/2] add getLogger(..., Sender) method to the factory; add an asynchronous wrapper around RawSocketSender --- .../fluentd/logger/FluentLoggerFactory.java | 20 ++- .../logger/sender/AsyncRawSocketSender.java | 129 ++++++++++++++++++ 2 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java diff --git a/src/main/java/org/fluentd/logger/FluentLoggerFactory.java b/src/main/java/org/fluentd/logger/FluentLoggerFactory.java index 9c2d5a5..932043e 100644 --- a/src/main/java/org/fluentd/logger/FluentLoggerFactory.java +++ b/src/main/java/org/fluentd/logger/FluentLoggerFactory.java @@ -19,6 +19,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.WeakHashMap; @@ -33,7 +34,7 @@ public class FluentLoggerFactory { private final Map loggers; public FluentLoggerFactory() { - loggers = new WeakHashMap(); + loggers = Collections.synchronizedMap(new WeakHashMap()); } public FluentLogger getLogger(String tagPrefix) { @@ -48,7 +49,7 @@ public FluentLogger getLogger(String tagPrefix, String host, int port, int timeo return getLogger(tagPrefix, host, port, timeout, bufferCapacity, new ExponentialDelayReconnector()); } - public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, + public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) { String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity }); if (loggers.containsKey(key)) { @@ -73,6 +74,21 @@ public synchronized FluentLogger getLogger(String tagPrefix, String host, int po } } + public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, + Sender sender) { + if (sender == null) { + return getLogger(tagPrefix, host, port, timeout, bufferCapacity); + } + String key = String.format("%s_%s_%d_%d_%d_%s", new Object[] { tagPrefix, host, port, timeout, bufferCapacity, sender == null ? "null" : sender .getName() }); + if (loggers.containsKey(key)) { + return loggers.get(key); + } else { + FluentLogger logger = new FluentLogger(tagPrefix, sender); + loggers.put(key, logger); + return logger; + } + } + @SuppressWarnings("unchecked") private Sender createSenderInstance(final String className, final Object[] params) throws ClassNotFoundException, SecurityException, NoSuchMethodException, IllegalArgumentException, InstantiationException, diff --git a/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java new file mode 100644 index 0000000..118733e --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java @@ -0,0 +1,129 @@ + +package org.fluentd.logger.sender; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.fluentd.logger.errorhandler.ErrorHandler; +import org.fluentd.logger.sender.ExponentialDelayReconnector; +import org.fluentd.logger.sender.RawSocketSender; +import org.fluentd.logger.sender.Reconnector; +import org.fluentd.logger.sender.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An asynchronous wrapper around RawSocketSender + * + * @author mxk + * + */ +public class AsyncRawSocketSender implements Sender { + + private final class EmitRunnable implements Runnable { + private final String tag; + private final Map data; + private final RawSocketSender sender; + private final long timestamp; + + private EmitRunnable(String tag, Map data, + RawSocketSender sender, long timestamp) { + this.tag = tag; + this.data = data; + this.sender = sender; + this.timestamp = timestamp; + } + + @Override + public void run() { + sender.emit(tag, timestamp, data); + } + } + + private final class FlushRunnable implements Runnable { + private final RawSocketSender sender; + + private FlushRunnable(RawSocketSender sender) { + this.sender = sender; + } + + @Override + public void run() { + sender.flush(); + } + } + + private RawSocketSender sender; + private Reconnector reconnector; + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class); + + private static final ExecutorService flusher = Executors.newSingleThreadExecutor(); + + public AsyncRawSocketSender() { + this("localhost", 24224); + } + + public AsyncRawSocketSender(String host, int port) { + this(host, port, 3 * 1000, 8 * 1024 * 1024); + } + + public AsyncRawSocketSender(String host, int port, int timeout, + int bufferCapacity) { + this(host, port, timeout, bufferCapacity, + new ExponentialDelayReconnector()); + } + + public AsyncRawSocketSender(String host, int port, int timeout, + int bufferCapacity, Reconnector reconnector) { + this.reconnector = reconnector; + this.sender = new RawSocketSender(host, port, timeout, bufferCapacity, + reconnector); + } + + @Override + public synchronized void flush() { + final RawSocketSender sender = this.sender; + flusher.execute(new FlushRunnable(sender)); + } + + @Override + public void close() { + sender.close(); + } + + @Override + public boolean emit(String tag, Map data) { + return emit(tag, System.currentTimeMillis() / 1000, data); + } + + @Override + public boolean emit(final String tag, final long timestamp, final Map data) { + final RawSocketSender sender = this.sender; + flusher.execute(new EmitRunnable(tag, data, sender, timestamp)); + + return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis()); + } + + @Override + public String getName() { + return sender.getName(); + } + + @Override + public boolean isConnected() { + return sender.isConnected(); + } + + @Override + public void setErrorHandler(ErrorHandler errorHandler) { + sender.setErrorHandler(errorHandler); + } + + @Override + public void removeErrorHandler() { + sender.removeErrorHandler(); + } +}