Skip to content

Commit

Permalink
Upgrade to JGroups 4.0.4.Final
Browse files Browse the repository at this point in the history
default to replicate=false
  • Loading branch information
ar committed Aug 4, 2017
1 parent 05445cb commit 58a2a04
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 51 deletions.
2 changes: 1 addition & 1 deletion libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ext {
jettyVersion = '9.3.19.v20170502'
servletApiVersion = '3.1.0'
websocketApiVersion = '1.1'
jgroupsVersion = '3.6.9.Final'
jgroupsVersion = '4.0.4.Final'
jaxrsVersion = '2.0.1'
jsonSchemaVersion = '2.2.6'
jacksonVersion = '2.7.4'
Expand Down
34 changes: 13 additions & 21 deletions modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@
import org.jpos.util.Log;
import org.jpos.util.Logger;
import org.jpos.util.LogEvent;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.View;
import org.jgroups.Message;
import org.jgroups.Address;
import org.jgroups.Receiver;
import org.jgroups.conf.XmlConfigurator;

@SuppressWarnings("unchecked")
public class ReplicatedSpace
extends Log
implements LocalSpace, Receiver
{
Channel channel;
JChannel channel;
String nodeName;
String nodePrefix;
String seqName;
Expand Down Expand Up @@ -92,7 +90,7 @@ public void out (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.OUT, key, value, timeout);
channel.send (new Message (null, null, r));
channel.send (new Message (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not out " + key);
Expand All @@ -104,10 +102,10 @@ public void push (Object key, Object value) {
push(key, value, 0L);
}
public void push (Object key, Object value, long timeout) {
Address coordinator = getCoordinator();
getCoordinator();
try {
Request r = new Request (Request.PUSH, key, value, timeout);
channel.send (new Message (null, null, r));
channel.send (new Message (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not push " + key);
Expand All @@ -122,7 +120,7 @@ public void put (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.PUT, key, value, timeout);
channel.send (new Message (null, null, r));
channel.send (new Message (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not put " + key);
Expand Down Expand Up @@ -161,7 +159,7 @@ public void receive (Message msg) {
Request r = (Request) obj;
switch (r.type) {
case Request.OUT:
if (r.timeout != 0)
if (r.timeout != 0)
sp.out (r.key, r.value, r.timeout + TIMEOUT);
else
sp.out (r.key, r.value);
Expand Down Expand Up @@ -216,11 +214,12 @@ public void receive (Message msg) {
case Request.INP:
Object v = sp.inp (r.key);
if (v != null) {
MD5Template tmpl = new MD5Template(r.key, v);
send (null,
new Request (
Request.INP_NOTIFICATION,
r.key,
new MD5Template (r.key, v)
tmpl
)
);
}
Expand All @@ -238,9 +237,7 @@ public void receive (Message msg) {
sp.out (r.key, r.value, MAX_WAIT);
break;
case Request.INP_NOTIFICATION:
// if not self notification
if (!channel.getAddress().equals (msg.getSrc()))
sp.inp (r.value);
sp.inp (r.value);
break;
case Request.SPACE_COPY:
if (replicate && !isCoordinator() && sp instanceof TSpace) {
Expand Down Expand Up @@ -427,7 +424,7 @@ public void run() {
send (null,
new Request (
Request.SPACE_COPY,
null, // value is ref key for response
null,
((TSpace)sp).getEntries()
)
);
Expand Down Expand Up @@ -461,7 +458,7 @@ private void commitOn() {
private void send (Address destination, Request r)
{
try {
channel.send (new Message (destination, null, r));
channel.send (new Message (destination, r));
} catch (Exception e) {
error (e);
}
Expand All @@ -471,7 +468,7 @@ private void sendToCoordinator (Request r)
while (true) {
Address coordinator = getCoordinator();
try {
channel.send (new Message (coordinator, null, r));
channel.send (new Message (coordinator, r));
break;
} catch (Exception e) {
error ("error " + e.getMessage() + ", retrying");
Expand All @@ -484,12 +481,7 @@ private void sendToCoordinator (Request r)
private void initChannel (String groupName, String configFile)
throws Exception
{
InputStream config = new FileInputStream (configFile);
XmlConfigurator conf = XmlConfigurator.getInstance (config);
String props = conf.getProtocolStackString();
channel = new JChannel (props);
// channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
// channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
channel = new JChannel (configFile);
channel.setReceiver(this);
channel.connect (groupName);
info ("member: " + channel.getAddress().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public ReplicatedSpaceAdaptor () {
super ();
}
public void initService() throws ConfigurationException {
Element e = getPersist ();
Space sp = SpaceFactory.getSpace (cfg.get ("space", ""));
rspaceUri = cfg.get ("rspace", "rspace");
try {
Expand All @@ -47,7 +46,7 @@ public void initService() throws ConfigurationException {
getLog().getLogger(),
getLog().getRealm(),
cfg.getBoolean ("trace"),
cfg.getBoolean ("replicate", sp instanceof TSpace)
cfg.getBoolean ("replicate", false)
);
NameRegistrar.register (rspaceUri, rs);
} catch (Throwable t) {
Expand Down
35 changes: 8 additions & 27 deletions modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,12 @@
mcast_recv_buf_size="5M"
mcast_send_buf_size="5M"
max_bundle_size="64K"
max_bundle_timeout="30"
enable_diagnostics="true"
thread_naming_pattern="cl"

timer_type="new3"
timer.min_threads="2"
timer.max_threads="4"
timer.keep_alive_time="3000"
timer.queue_max_size="500"

thread_pool.enabled="true"
thread_pool.min_threads="2"
thread_pool.max_threads="8"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="true"
thread_pool.queue_max_size="10000"
thread_pool.rejection_policy="discard"

oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="discard"/>
thread_pool.min_threads="0"
thread_pool.max_threads="20"
thread_pool.keep_alive_time="30000"/>

<PING />
<MERGE3 max_interval="30000"
Expand All @@ -54,25 +35,25 @@
xmit_table_num_rows="100"
xmit_table_msgs_per_row="2000"
xmit_table_max_compaction_time="30000"
max_msg_batch_size="500"
use_mcast_xmit="false"
discard_delivered_msgs="true"/>
<UNICAST3 xmit_interval="500"
xmit_table_num_rows="100"
xmit_table_msgs_per_row="2000"
xmit_table_max_compaction_time="60000"
conn_expiry_timeout="0"
max_msg_batch_size="500"/>
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
conn_expiry_timeout="0"/>
<pbcast.STABLE desired_avg_gossip="50000"
max_bytes="4M"/>
<pbcast.GMS print_local_addr="true" join_timeout="2000"
view_bundling="true"/>
<UFC max_credits="2M"
min_threshold="0.4"/>
<MFC max_credits="2M"
min_threshold="0.4"/>
<SEQUENCER />
<FRAG2 frag_size="60K" />
<RSVP resend_interval="2000" timeout="10000"/>
<pbcast.STATE_TRANSFER />
<!-- pbcast.FLUSH /-->
<pbcast.FLUSH />
</config>

0 comments on commit 58a2a04

Please # to comment.