diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java b/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java index c9d621d3e0bb..bd54a165e354 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java @@ -18,7 +18,7 @@ import com.navercorp.pinpoint.collector.receiver.thrift.TCPReceiverBean; import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo; import com.navercorp.pinpoint.flink.cluster.FlinkServerRegister; -import com.navercorp.pinpoint.flink.config.FlinkConfiguration; +import com.navercorp.pinpoint.flink.config.FlinkProperties; import com.navercorp.pinpoint.flink.dao.hbase.ApplicationMetricDao; import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDao; import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDaoInterceptor; @@ -56,7 +56,7 @@ public class Bootstrap { private final ClassPathXmlApplicationContext applicationContext; private final TBaseFlatMapper tbaseFlatMapper; - private final FlinkConfiguration flinkConfiguration; + private final FlinkProperties flinkProperties; private final TcpDispatchHandler tcpDispatchHandler; private final TcpSourceFunction tcpSourceFunction; private final ApplicationCache applicationCache; @@ -72,7 +72,7 @@ private Bootstrap() { applicationContext = new ClassPathXmlApplicationContext("applicationContext-flink.xml"); tbaseFlatMapper = applicationContext.getBean("tbaseFlatMapper", TBaseFlatMapper.class); - flinkConfiguration = applicationContext.getBean("flinkConfiguration", FlinkConfiguration.class); + flinkProperties = applicationContext.getBean("flinkProperties", FlinkProperties.class); tcpDispatchHandler = applicationContext.getBean("tcpDispatchHandler", TcpDispatchHandler.class); tcpSourceFunction = applicationContext.getBean("tcpSourceFunction", TcpSourceFunction.class); applicationCache = applicationContext.getBean("applicationCache", ApplicationCache.class); @@ -131,12 +131,12 @@ public ApplicationCache getApplicationCache() { return applicationCache; } - public FlinkConfiguration getFlinkConfiguration() { - return flinkConfiguration; + public FlinkProperties getFlinkProperties() { + return flinkProperties; } public StreamExecutionEnvironment createStreamExecutionEnvironment() { - if (flinkConfiguration.isLocalforFlinkStreamExecutionEnvironment()) { + if (flinkProperties.isLocalforFlinkStreamExecutionEnvironment()) { LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment(); localEnvironment.setParallelism(1); return localEnvironment; diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/cluster/FlinkServerRegister.java b/flink/src/main/java/com/navercorp/pinpoint/flink/cluster/FlinkServerRegister.java index 96945717e905..68ed101b8f2d 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/cluster/FlinkServerRegister.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/cluster/FlinkServerRegister.java @@ -20,21 +20,20 @@ import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient; import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher; import com.navercorp.pinpoint.common.util.NetUtils; -import com.navercorp.pinpoint.flink.config.FlinkConfiguration; +import com.navercorp.pinpoint.flink.config.FlinkProperties; import com.navercorp.pinpoint.rpc.util.ClassUtils; import com.navercorp.pinpoint.rpc.util.TimerFactory; import com.navercorp.pinpoint.web.cluster.zookeeper.PushZnodeJob; import com.navercorp.pinpoint.web.cluster.zookeeper.ZookeeperClusterDataManagerHelper; - import org.apache.curator.utils.ZKPaths; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.Timeout; import org.jboss.netty.util.Timer; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -60,18 +59,18 @@ public class FlinkServerRegister implements ZookeeperEventWatcher { private Timer timer; - public FlinkServerRegister(FlinkConfiguration flinkConfiguration) { - Objects.requireNonNull(flinkConfiguration, "flinkConfiguration"); - this.clusterEnable = flinkConfiguration.isFlinkClusterEnable(); - this.connectAddress = flinkConfiguration.getFlinkClusterZookeeperAddress(); - this.sessionTimeout = flinkConfiguration.getFlinkClusterSessionTimeout(); - this.zookeeperPath = flinkConfiguration.getFlinkZNodePath(); + public FlinkServerRegister(FlinkProperties flinkProperties) { + Objects.requireNonNull(flinkProperties, "flinkConfiguration"); + this.clusterEnable = flinkProperties.isFlinkClusterEnable(); + this.connectAddress = flinkProperties.getFlinkClusterZookeeperAddress(); + this.sessionTimeout = flinkProperties.getFlinkClusterSessionTimeout(); + this.zookeeperPath = flinkProperties.getFlinkZNodePath(); - String zNodeName = getRepresentationLocalV4Ip() + ":" + flinkConfiguration.getFlinkClusterTcpPort(); + String zNodeName = getRepresentationLocalV4Ip() + ":" + flinkProperties.getFlinkClusterTcpPort(); String zNodeFullPath = ZKPaths.makePath(zookeeperPath, zNodeName); CreateNodeMessage createNodeMessage = new CreateNodeMessage(zNodeFullPath, new byte[0]); - int retryInterval = flinkConfiguration.getFlinkRetryInterval(); + int retryInterval = flinkProperties.getFlinkRetryInterval(); this.pushFlinkNodeJob = new PushFlinkNodeJob(createNodeMessage, retryInterval); } diff --git a/flink/src/main/java/com/navercorp/pinpoint/flink/config/FlinkConfiguration.java b/flink/src/main/java/com/navercorp/pinpoint/flink/config/FlinkProperties.java similarity index 95% rename from flink/src/main/java/com/navercorp/pinpoint/flink/config/FlinkConfiguration.java rename to flink/src/main/java/com/navercorp/pinpoint/flink/config/FlinkProperties.java index 98d8f20c0da4..92e70747489f 100644 --- a/flink/src/main/java/com/navercorp/pinpoint/flink/config/FlinkConfiguration.java +++ b/flink/src/main/java/com/navercorp/pinpoint/flink/config/FlinkProperties.java @@ -33,8 +33,8 @@ * @author minwoo.jung */ @Component -public class FlinkConfiguration { - private final Logger logger = LogManager.getLogger(FlinkConfiguration.class); +public class FlinkProperties { + private final Logger logger = LogManager.getLogger(FlinkProperties.class); @Qualifier("flinkClusterProperties") @Autowired @@ -52,7 +52,7 @@ public class FlinkConfiguration { @Value("${collector.l4.ip:}") private String[] l4IpList = new String[0]; - public FlinkConfiguration() { + public FlinkProperties() { } public boolean isFlinkClusterEnable() { diff --git a/flink/src/main/resources/applicationContext-flink.xml b/flink/src/main/resources/applicationContext-flink.xml index 4ed20a3882cd..0c9dd3092f78 100644 --- a/flink/src/main/resources/applicationContext-flink.xml +++ b/flink/src/main/resources/applicationContext-flink.xml @@ -17,7 +17,7 @@ com.navercorp.pinpoint.collector.mapper.thrift.stat"/> - + diff --git a/flink/src/test/java/com/navercorp/pinpoint/flink/config/FlinkPropertiesTest.java b/flink/src/test/java/com/navercorp/pinpoint/flink/config/FlinkPropertiesTest.java index 1ab5eddd1179..04a2002ce317 100644 --- a/flink/src/test/java/com/navercorp/pinpoint/flink/config/FlinkPropertiesTest.java +++ b/flink/src/test/java/com/navercorp/pinpoint/flink/config/FlinkPropertiesTest.java @@ -27,15 +27,15 @@ /** * @author Woonduk Kang(emeroad) */ -@ContextConfiguration(classes = {FlinkConfiguration.class, ClusterConfigurationFactory.class}) +@ContextConfiguration(classes = { FlinkProperties.class, ClusterConfigurationFactory.class}) @ExtendWith(SpringExtension.class) public class FlinkPropertiesTest { @Autowired - private FlinkConfiguration flinkConfiguration; + private FlinkProperties flinkProperties; @Test public void log() { - flinkConfiguration.log(); + flinkProperties.log(); } } \ No newline at end of file