Skip to content

Commit

Permalink
update: add extra padding and end message
Browse files Browse the repository at this point in the history
  • Loading branch information
gc-garcol committed Dec 3, 2024
1 parent fa0a867 commit 6cfd596
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 25 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

group = "io.github.gc-garcol"
version = "1.2.1"
version = "1.3.0"

java {
withJavadocJar()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
@State(Scope.Thread)
@BenchmarkMode({ Mode.Throughput, Mode.AverageTime })
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Measurement(iterations = 3, time = 3)
@Fork(1)
public class Pipeline1P3C_OneToManyRingBufferBechmark
{
@Benchmark
@Timeout(time = 20)
@Warmup(iterations = 3, time = 20)
@Timeout(time = 60)
@Measurement(iterations = 1, time = 60)
@Warmup(iterations = 1, time = 10)
public void publish(Pipeline1P3C_OneToManyRingBufferPlan ringBufferPlan, Blackhole blackhole) throws IOException
{
ringBufferPlan.writeBuffer.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class Pipeline1P3C_OneToManyRingBufferPlan
@Setup(Level.Trial)
public void setUp(Blackhole blackhole) throws InterruptedException
{
ringBuffer = new OneToManyRingBuffer(16, 3);
ringBuffer = new OneToManyRingBuffer(18, 3);

messageHandler = new MessageHandler()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
@State(Scope.Thread)
@BenchmarkMode({ Mode.Throughput, Mode.AverageTime })
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Measurement(iterations = 3, time = 3)
@Fork(1)
public class Unicast1P1C_OneToManyRingBufferBechmark
{
@Benchmark
@Timeout(time = 20)
@Warmup(iterations = 2, time = 20)
@Timeout(time = 60)
@Measurement(iterations = 1, time = 60)
@Warmup(iterations = 1, time = 10)
public void publish(Unicast1P1C_OneToManyRingBufferPlan ringBufferPlan, Blackhole blackhole) throws IOException
{
ringBufferPlan.writeBuffer.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class Unicast1P1C_OneToManyRingBufferPlan
@Setup(Level.Trial)
public void setUp(Blackhole blackhole) throws InterruptedException
{
ringBuffer = new OneToManyRingBuffer(16, 1);
ringBuffer = new OneToManyRingBuffer(18, 1);

messageHandler = new MessageHandler()
{
Expand Down
2 changes: 1 addition & 1 deletion lib-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

group = "io.github.gc-garcol"
version = "1.2.1"
version = "1.3.0"

java {
toolchain {
Expand Down
30 changes: 19 additions & 11 deletions lib-core/src/main/java/gc/garcol/libcore/OneToManyRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public class OneToManyRingBuffer
private final UnsafeBuffer pointers;

private final int capacity;
private final int maxMsgLength;
private final int maxRecordLength;
private final int lastConsumerIndex;
private final int consumerSize;

private final int producerPointerIndex;
private final int[] consumerPointerIndexes;
Expand All @@ -36,6 +37,8 @@ public class OneToManyRingBuffer
*/
public static final int HEADER_LENGTH = Integer.BYTES * 2; // length, type

public static final int EXTRA_PADDING_LENGTH = Long.BYTES * 8;

/**
* Alignment as a multiple of bytes for each record.
* Padding to align the record in order to prevent false sharing.
Expand All @@ -61,14 +64,15 @@ public OneToManyRingBuffer(int powSize, int consumerSize)

producerPointerIndex = Long.BYTES * 8;
consumerPointerIndexes = new int[consumerSize];
this.consumerSize = consumerSize;

consumerPointerIndexes[0] = Long.BYTES * 8 + Long.BYTES + Long.BYTES * 7; // padding + producer-pointer-block + padding
for (int i = 1; i < consumerSize; i++)
{
consumerPointerIndexes[i] = consumerPointerIndexes[i - 1] + Long.BYTES + Long.BYTES * 7; // padding + consumer-pointer-block + padding
}

maxMsgLength = capacity >> 3;
maxRecordLength = capacity >> 3;
}

/**
Expand All @@ -80,13 +84,15 @@ public OneToManyRingBuffer(int powSize, int consumerSize)
*/
public boolean write(int msgTypeId, ByteBuffer message)
{
int msgLength = message.limit();
checkMsgLength(msgLength, maxMsgLength);
int messageLength = message.limit();
final int recordLength = calculateRecordLength(messageLength);
final int alignedRecordLength = BitUtil.align(recordLength, ALIGNMENT);
checkMsgLength(alignedRecordLength, maxRecordLength);

// [1] happen-before guarantee for reads
long currentProducerPosition = pointers.getLongVolatile(producerPointerIndex);
long firstConsumerPosition = pointers.getLongVolatile(consumerPointerIndexes[0]);
long lastConsumerPosition = pointers.getLongVolatile(consumerPointerIndexes[lastConsumerIndex]);
long lastConsumerPosition = consumerSize == 1 ? firstConsumerPosition : pointers.getLongVolatile(consumerPointerIndexes[lastConsumerIndex]);

int currentProducerOffset = offset(currentProducerPosition);
boolean currentProducerFlip = flip(currentProducerPosition);
Expand All @@ -95,9 +101,6 @@ public boolean write(int msgTypeId, ByteBuffer message)
int lastConsumerOffset = offset(lastConsumerPosition);
boolean lastConsumerFlip = flip(lastConsumerPosition);

final int recordLength = msgLength + HEADER_LENGTH;
final int alignedRecordLength = BitUtil.align(recordLength, ALIGNMENT);

final int expectedEndOffsetOfRecord = currentProducerOffset + alignedRecordLength - 1;

boolean sameCircleWithFirstConsumer = sameCircle(currentProducerFlip, firstConsumerFlip);
Expand Down Expand Up @@ -151,8 +154,8 @@ else if (!sameCircleWithLastConsumer) // !sameCircleWithFirstConsumer, the produ
int nextProducerOffset = (realStartOfRecord + alignedRecordLength) % capacity; // maybe nextProducerOffset == 0

// when [2] happened, the [2] ensures that the these instructions are synchronized into main memory as well
buffer.putBytes(realStartOfRecord + HEADER_LENGTH, message, 0, msgLength);
buffer.putInt(realStartOfRecord, msgLength);
buffer.putBytes(realStartOfRecord + HEADER_LENGTH, message, 0, messageLength);
buffer.putInt(realStartOfRecord, messageLength);
buffer.putInt(realStartOfRecord + Integer.BYTES, msgTypeId);

shouldFlip |= nextProducerOffset == 0;
Expand Down Expand Up @@ -284,7 +287,7 @@ public boolean readOne(int consumerIndex, final MessageHandler handler)
return false;
}

int recordLength = messageLength + HEADER_LENGTH;
int recordLength = calculateRecordLength(messageLength);
int alignedRecordLength = BitUtil.align(recordLength, ALIGNMENT);
int endRecordOffset = currentConsumerOffset + alignedRecordLength - 1;

Expand All @@ -305,4 +308,9 @@ public boolean readOne(int consumerIndex, final MessageHandler handler)

return true;
}

private int calculateRecordLength(int messageLength)
{
return messageLength + HEADER_LENGTH + EXTRA_PADDING_LENGTH;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,15 @@ public void shouldConsumeThreadSafe_1P2C_10()

oneToManyRingBuffer = new OneToManyRingBuffer(10, 2);

int publishedMessage = 0;
for (int i = 0; i < messages.size(); i++)
{
messageBufferWriter.clear();
ByteBufferUtil.put(messageBufferWriter, 0, messages.get(i).getBytes());
messageBufferWriter.flip();
oneToManyRingBuffer.write(i, messageBufferWriter);
if (oneToManyRingBuffer.write(i, messageBufferWriter)) {
publishedMessage++;
}
}

Function<Integer, MessageHandler> handlerSupplier = (Integer consumerId) ->
Expand Down Expand Up @@ -180,7 +183,8 @@ public void shouldConsumeThreadSafe_1P2C_10()
for (int i = 0; i < messages.size(); i++)
{
UncheckUtil.run(() -> Thread.sleep(15));
oneToManyRingBuffer.read(0, handlerSupplier.apply(0), 1);
int consumedMessage = oneToManyRingBuffer.read(0, handlerSupplier.apply(0), 1);
System.out.println("Consumer 0 consume message: " + consumedMessage);
}
end.set(true);
});
Expand All @@ -197,8 +201,8 @@ public void shouldConsumeThreadSafe_1P2C_10()
UncheckUtil.run(thread0::join);
UncheckUtil.run(thread1::join);

Assertions.assertEquals(messages.size(), consumedIndexes.get(0).get(), "Consumer 0 not consume all messages");
Assertions.assertEquals(messages.size(), consumedIndexes.get(1).get(), "Consumer 1 not consume all messages");
Assertions.assertEquals(publishedMessage, consumedIndexes.get(0).get(), "Consumer 0 not consume all messages");
Assertions.assertEquals(publishedMessage, consumedIndexes.get(1).get(), "Consumer 1 not consume all messages");
}

@Test
Expand Down

0 comments on commit 6cfd596

Please # to comment.