From 58a2a0412ec7939ffc15a00ad1bcbb67d27a1f27 Mon Sep 17 00:00:00 2001 From: Alejandro Revilla Date: Thu, 3 Aug 2017 21:33:14 -0300 Subject: [PATCH] Upgrade to JGroups 4.0.4.Final default to replicate=false --- libraries.gradle | 2 +- .../java/org/jpos/space/ReplicatedSpace.java | 34 +++++++----------- .../jpos/space/ReplicatedSpaceAdaptor.java | 3 +- .../META-INF/q2/installs/cfg/udp.xml | 35 +++++-------------- 4 files changed, 23 insertions(+), 51 deletions(-) diff --git a/libraries.gradle b/libraries.gradle index 83ed0c6e54..fa9dfb0689 100644 --- a/libraries.gradle +++ b/libraries.gradle @@ -7,7 +7,7 @@ ext { jettyVersion = '9.3.19.v20170502' servletApiVersion = '3.1.0' websocketApiVersion = '1.1' - jgroupsVersion = '3.6.9.Final' + jgroupsVersion = '4.0.4.Final' jaxrsVersion = '2.0.1' jsonSchemaVersion = '2.2.6' jacksonVersion = '2.7.4' diff --git a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java index 346a93ae19..7fd940da63 100644 --- a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java +++ b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java @@ -27,20 +27,18 @@ import org.jpos.util.Log; import org.jpos.util.Logger; import org.jpos.util.LogEvent; -import org.jgroups.Channel; import org.jgroups.JChannel; import org.jgroups.View; import org.jgroups.Message; import org.jgroups.Address; import org.jgroups.Receiver; -import org.jgroups.conf.XmlConfigurator; @SuppressWarnings("unchecked") public class ReplicatedSpace extends Log implements LocalSpace, Receiver { - Channel channel; + JChannel channel; String nodeName; String nodePrefix; String seqName; @@ -92,7 +90,7 @@ public void out (Object key, Object value, long timeout) { getCoordinator(); try { Request r = new Request (Request.OUT, key, value, timeout); - channel.send (new Message (null, null, r)); + channel.send (new Message (null, r)); Object o = sp.in (r.getUUID(), MAX_OUT_WAIT); if (o == null) throw new SpaceError ("Could not out " + key); @@ -104,10 +102,10 @@ public void push (Object key, Object value) { push(key, value, 0L); } public void push (Object key, Object value, long timeout) { - Address coordinator = getCoordinator(); + getCoordinator(); try { Request r = new Request (Request.PUSH, key, value, timeout); - channel.send (new Message (null, null, r)); + channel.send (new Message (null, r)); Object o = sp.in (r.getUUID(), MAX_OUT_WAIT); if (o == null) throw new SpaceError ("Could not push " + key); @@ -122,7 +120,7 @@ public void put (Object key, Object value, long timeout) { getCoordinator(); try { Request r = new Request (Request.PUT, key, value, timeout); - channel.send (new Message (null, null, r)); + channel.send (new Message (null, r)); Object o = sp.in (r.getUUID(), MAX_OUT_WAIT); if (o == null) throw new SpaceError ("Could not put " + key); @@ -161,7 +159,7 @@ public void receive (Message msg) { Request r = (Request) obj; switch (r.type) { case Request.OUT: - if (r.timeout != 0) + if (r.timeout != 0) sp.out (r.key, r.value, r.timeout + TIMEOUT); else sp.out (r.key, r.value); @@ -216,11 +214,12 @@ public void receive (Message msg) { case Request.INP: Object v = sp.inp (r.key); if (v != null) { + MD5Template tmpl = new MD5Template(r.key, v); send (null, new Request ( Request.INP_NOTIFICATION, r.key, - new MD5Template (r.key, v) + tmpl ) ); } @@ -238,9 +237,7 @@ public void receive (Message msg) { sp.out (r.key, r.value, MAX_WAIT); break; case Request.INP_NOTIFICATION: - // if not self notification - if (!channel.getAddress().equals (msg.getSrc())) - sp.inp (r.value); + sp.inp (r.value); break; case Request.SPACE_COPY: if (replicate && !isCoordinator() && sp instanceof TSpace) { @@ -427,7 +424,7 @@ public void run() { send (null, new Request ( Request.SPACE_COPY, - null, // value is ref key for response + null, ((TSpace)sp).getEntries() ) ); @@ -461,7 +458,7 @@ private void commitOn() { private void send (Address destination, Request r) { try { - channel.send (new Message (destination, null, r)); + channel.send (new Message (destination, r)); } catch (Exception e) { error (e); } @@ -471,7 +468,7 @@ private void sendToCoordinator (Request r) while (true) { Address coordinator = getCoordinator(); try { - channel.send (new Message (coordinator, null, r)); + channel.send (new Message (coordinator, r)); break; } catch (Exception e) { error ("error " + e.getMessage() + ", retrying"); @@ -484,12 +481,7 @@ private void sendToCoordinator (Request r) private void initChannel (String groupName, String configFile) throws Exception { - InputStream config = new FileInputStream (configFile); - XmlConfigurator conf = XmlConfigurator.getInstance (config); - String props = conf.getProtocolStackString(); - channel = new JChannel (props); - // channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); - // channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); + channel = new JChannel (configFile); channel.setReceiver(this); channel.connect (groupName); info ("member: " + channel.getAddress().toString()); diff --git a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java index f77848c794..eef807b669 100644 --- a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java +++ b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java @@ -36,7 +36,6 @@ public ReplicatedSpaceAdaptor () { super (); } public void initService() throws ConfigurationException { - Element e = getPersist (); Space sp = SpaceFactory.getSpace (cfg.get ("space", "")); rspaceUri = cfg.get ("rspace", "rspace"); try { @@ -47,7 +46,7 @@ public void initService() throws ConfigurationException { getLog().getLogger(), getLog().getRealm(), cfg.getBoolean ("trace"), - cfg.getBoolean ("replicate", sp instanceof TSpace) + cfg.getBoolean ("replicate", false) ); NameRegistrar.register (rspaceUri, rs); } catch (Throwable t) { diff --git a/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml b/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml index 24fb61de73..6918818a47 100644 --- a/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml +++ b/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml @@ -17,31 +17,12 @@ mcast_recv_buf_size="5M" mcast_send_buf_size="5M" max_bundle_size="64K" - max_bundle_timeout="30" enable_diagnostics="true" thread_naming_pattern="cl" - timer_type="new3" - timer.min_threads="2" - timer.max_threads="4" - timer.keep_alive_time="3000" - timer.queue_max_size="500" - - thread_pool.enabled="true" - thread_pool.min_threads="2" - thread_pool.max_threads="8" - thread_pool.keep_alive_time="5000" - thread_pool.queue_enabled="true" - thread_pool.queue_max_size="10000" - thread_pool.rejection_policy="discard" - - oob_thread_pool.enabled="true" - oob_thread_pool.min_threads="1" - oob_thread_pool.max_threads="8" - oob_thread_pool.keep_alive_time="5000" - oob_thread_pool.queue_enabled="false" - oob_thread_pool.queue_max_size="100" - oob_thread_pool.rejection_policy="discard"/> + thread_pool.min_threads="0" + thread_pool.max_threads="20" + thread_pool.keep_alive_time="30000"/> - + @@ -71,8 +50,10 @@ min_threshold="0.4"/> + - + +