diff --git a/libraries.gradle b/libraries.gradle
index 83ed0c6e54..fa9dfb0689 100644
--- a/libraries.gradle
+++ b/libraries.gradle
@@ -7,7 +7,7 @@ ext {
jettyVersion = '9.3.19.v20170502'
servletApiVersion = '3.1.0'
websocketApiVersion = '1.1'
- jgroupsVersion = '3.6.9.Final'
+ jgroupsVersion = '4.0.4.Final'
jaxrsVersion = '2.0.1'
jsonSchemaVersion = '2.2.6'
jacksonVersion = '2.7.4'
diff --git a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java
index 346a93ae19..7fd940da63 100644
--- a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java
+++ b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java
@@ -27,20 +27,18 @@
import org.jpos.util.Log;
import org.jpos.util.Logger;
import org.jpos.util.LogEvent;
-import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.View;
import org.jgroups.Message;
import org.jgroups.Address;
import org.jgroups.Receiver;
-import org.jgroups.conf.XmlConfigurator;
@SuppressWarnings("unchecked")
public class ReplicatedSpace
extends Log
implements LocalSpace, Receiver
{
- Channel channel;
+ JChannel channel;
String nodeName;
String nodePrefix;
String seqName;
@@ -92,7 +90,7 @@ public void out (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.OUT, key, value, timeout);
- channel.send (new Message (null, null, r));
+ channel.send (new Message (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not out " + key);
@@ -104,10 +102,10 @@ public void push (Object key, Object value) {
push(key, value, 0L);
}
public void push (Object key, Object value, long timeout) {
- Address coordinator = getCoordinator();
+ getCoordinator();
try {
Request r = new Request (Request.PUSH, key, value, timeout);
- channel.send (new Message (null, null, r));
+ channel.send (new Message (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not push " + key);
@@ -122,7 +120,7 @@ public void put (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.PUT, key, value, timeout);
- channel.send (new Message (null, null, r));
+ channel.send (new Message (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not put " + key);
@@ -161,7 +159,7 @@ public void receive (Message msg) {
Request r = (Request) obj;
switch (r.type) {
case Request.OUT:
- if (r.timeout != 0)
+ if (r.timeout != 0)
sp.out (r.key, r.value, r.timeout + TIMEOUT);
else
sp.out (r.key, r.value);
@@ -216,11 +214,12 @@ public void receive (Message msg) {
case Request.INP:
Object v = sp.inp (r.key);
if (v != null) {
+ MD5Template tmpl = new MD5Template(r.key, v);
send (null,
new Request (
Request.INP_NOTIFICATION,
r.key,
- new MD5Template (r.key, v)
+ tmpl
)
);
}
@@ -238,9 +237,7 @@ public void receive (Message msg) {
sp.out (r.key, r.value, MAX_WAIT);
break;
case Request.INP_NOTIFICATION:
- // if not self notification
- if (!channel.getAddress().equals (msg.getSrc()))
- sp.inp (r.value);
+ sp.inp (r.value);
break;
case Request.SPACE_COPY:
if (replicate && !isCoordinator() && sp instanceof TSpace) {
@@ -427,7 +424,7 @@ public void run() {
send (null,
new Request (
Request.SPACE_COPY,
- null, // value is ref key for response
+ null,
((TSpace)sp).getEntries()
)
);
@@ -461,7 +458,7 @@ private void commitOn() {
private void send (Address destination, Request r)
{
try {
- channel.send (new Message (destination, null, r));
+ channel.send (new Message (destination, r));
} catch (Exception e) {
error (e);
}
@@ -471,7 +468,7 @@ private void sendToCoordinator (Request r)
while (true) {
Address coordinator = getCoordinator();
try {
- channel.send (new Message (coordinator, null, r));
+ channel.send (new Message (coordinator, r));
break;
} catch (Exception e) {
error ("error " + e.getMessage() + ", retrying");
@@ -484,12 +481,7 @@ private void sendToCoordinator (Request r)
private void initChannel (String groupName, String configFile)
throws Exception
{
- InputStream config = new FileInputStream (configFile);
- XmlConfigurator conf = XmlConfigurator.getInstance (config);
- String props = conf.getProtocolStackString();
- channel = new JChannel (props);
- // channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- // channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+ channel = new JChannel (configFile);
channel.setReceiver(this);
channel.connect (groupName);
info ("member: " + channel.getAddress().toString());
diff --git a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java
index f77848c794..eef807b669 100644
--- a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java
+++ b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java
@@ -36,7 +36,6 @@ public ReplicatedSpaceAdaptor () {
super ();
}
public void initService() throws ConfigurationException {
- Element e = getPersist ();
Space sp = SpaceFactory.getSpace (cfg.get ("space", ""));
rspaceUri = cfg.get ("rspace", "rspace");
try {
@@ -47,7 +46,7 @@ public void initService() throws ConfigurationException {
getLog().getLogger(),
getLog().getRealm(),
cfg.getBoolean ("trace"),
- cfg.getBoolean ("replicate", sp instanceof TSpace)
+ cfg.getBoolean ("replicate", false)
);
NameRegistrar.register (rspaceUri, rs);
} catch (Throwable t) {
diff --git a/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml b/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml
index 24fb61de73..6918818a47 100644
--- a/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml
+++ b/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml
@@ -17,31 +17,12 @@
mcast_recv_buf_size="5M"
mcast_send_buf_size="5M"
max_bundle_size="64K"
- max_bundle_timeout="30"
enable_diagnostics="true"
thread_naming_pattern="cl"
- timer_type="new3"
- timer.min_threads="2"
- timer.max_threads="4"
- timer.keep_alive_time="3000"
- timer.queue_max_size="500"
-
- thread_pool.enabled="true"
- thread_pool.min_threads="2"
- thread_pool.max_threads="8"
- thread_pool.keep_alive_time="5000"
- thread_pool.queue_enabled="true"
- thread_pool.queue_max_size="10000"
- thread_pool.rejection_policy="discard"
-
- oob_thread_pool.enabled="true"
- oob_thread_pool.min_threads="1"
- oob_thread_pool.max_threads="8"
- oob_thread_pool.keep_alive_time="5000"
- oob_thread_pool.queue_enabled="false"
- oob_thread_pool.queue_max_size="100"
- oob_thread_pool.rejection_policy="discard"/>
+ thread_pool.min_threads="0"
+ thread_pool.max_threads="20"
+ thread_pool.keep_alive_time="30000"/>
-
+
@@ -71,8 +50,10 @@
min_threshold="0.4"/>
+
-
+
+