Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Apr 22, 2024
1 parent acf2cdf commit 739cda4
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 16 deletions.
27 changes: 24 additions & 3 deletions src/org/jgroups/util/RingBufferSeqno.java
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;

/**
* Ring buffer of fixed capacity. Indices low and high point to the beginning and end of the buffer. Sequence numbers
Expand Down Expand Up @@ -70,7 +71,7 @@ public RingBufferSeqno(int capacity, long offset) {
public int spaceUsed() {return (int)(high - low);}
public Lock lock() {return lock;}

public double saturation() {
public double saturation() {
int space=spaceUsed();
return space == 0? 0.0 : space / (double)capacity();
}
Expand Down Expand Up @@ -240,6 +241,27 @@ public int purge(long seqno) {
}
}

/**
* Iterates throiugh all elements and invokes a consumer function
* @param c The consumer accepting the index and element
*/
public RingBufferSeqno<T> forAll(BiConsumer<Integer,T> c) {
if(c == null)
return this;
lock.lock();
try {
for(long i=low+1; i <= high; i++) {
int index=index(i);
T element=buf[index];
c.accept(index, element);
}
return this;
}
finally {
lock.unlock();
}
}

@Override
public void close() {
lock.lock();
Expand Down Expand Up @@ -327,9 +349,8 @@ public boolean hasNext() {
}

public T next() {
if(!hasNext()){
if(!hasNext())
throw new NoSuchElementException();
}
if(current <= low)
current=low+1;
return buffer[index(current++)];
Expand Down
84 changes: 71 additions & 13 deletions tests/junit-functional/org/jgroups/tests/RingBufferSeqnoTest.java
Expand Up @@ -6,6 +6,7 @@
import org.jgroups.util.Util;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -91,8 +92,8 @@ public void testSaturation() {
saturation=buf.saturation();
System.out.println("size=" + size + ", space used=" + space_used + ", saturation=" + saturation);
assert buf.size() == 5;
assert buf.spaceUsed() == 8;
assert buf.saturation() == 0.5;
assert buf.spaceUsed() == 5;
assert buf.saturation() == 5/16.0;

long low=buf.low();
buf.purge(3);
Expand All @@ -105,7 +106,7 @@ public void testSaturation() {
System.out.println("size=" + size + ", space used=" + space_used + ", saturation=" + saturation);
assert buf.size() == 5;
assert buf.spaceUsed() == 5;
assert buf.saturation() == 0.3125;
assert buf.saturation() == 5/16.0;
}

public void testAddWithWrapAround() {
Expand Down Expand Up @@ -221,22 +222,32 @@ public void testGetMissing() {

public void testGetMissing2() {
RingBufferSeqno<Integer> buf=new RingBufferSeqno<>(10, 0);
buf.add(1,1);
SeqnoList missing=buf.getMissing();
System.out.println("missing = " + missing);
assert missing == null && buf.missing() == 0;
buf.add(1,1);
missing=buf.getMissing();
System.out.println("missing = " + missing);
assert missing == null && buf.missing() == 0;

buf=new RingBufferSeqno<>(10, 0);
buf.add(10,10);
missing=buf.getMissing();
System.out.println("missing = " + missing);
assert buf.missing() == 9;
assert buf.missing() == missing.size();

buf=new RingBufferSeqno<>(10, 0);
buf.add(5,5);
missing=buf.getMissing();
System.out.println("missing = " + missing);
assert buf.missing() == missing.size();
buf.add(1,1);
buf.add(10,10);
missing=buf.getMissing();
System.out.println("missing = " + missing);
assert buf.missing() == 7;
assert buf.missing() == missing.size();

buf=new RingBufferSeqno<>(10, 0);
buf.add(5,7);
Expand Down Expand Up @@ -301,6 +312,22 @@ public void testBlockingAddAndPurge2() throws TimeoutException {
assert buf.size() == 15;
}

public void testBlockingAddAndRemove() throws TimeoutException {
final RingBufferSeqno<Integer> buf=new RingBufferSeqno<>(10, 0);
for(int i=1; i <= buf.capacity(); i++)
buf.add(i, i, true);
System.out.println("buf = " + buf);
assert buf.size() == 16;
Remover remover=new Remover(buf);
remover.start();
for(int i=buf.capacity()+1; i <= buf.capacity()+5; i++) {
boolean rc=buf.add(i,i, true);
assert rc;
}
List<Integer> removed=remover.removed();
assert removed.size() == 5;
}

public void testGet() {
final RingBufferSeqno<Integer> buf=new RingBufferSeqno<>(10, 0);
for(int i: Arrays.asList(1,2,3,4,5))
Expand Down Expand Up @@ -369,19 +396,13 @@ public void testRemovedPastHighestReceived() {
RingBufferSeqno<Integer> buf=new RingBufferSeqno<>(10, 0);
int highest=buf.capacity();
for(int i=1; i <= 20; i++) {
if(i > highest) {
if(i > highest)
assert !buf.add(i,i);
Integer num=buf.remove();
assert num == null;
}
else {
else
assert buf.add(i,i);
Integer num=buf.remove();
assert num != null && num == i;
}
}
System.out.println("buf = " + buf);
assert buf.size() == 0;
assert buf.size() == 16;
assert buf.missing() == 0;
}

Expand Down Expand Up @@ -520,6 +541,27 @@ public void run() {
assertIndices(buf, 15, 15);
}

public void testConcurrentAddAndRemove2() throws InterruptedException {
final int NUM=1000;
final RingBufferSeqno<Integer> buf=new RingBufferSeqno<>(10,0);
final CountDownLatch latch=new CountDownLatch(1);
Adder[] adders=new Adder[NUM];
for(int i=0; i < NUM; i++) {
adders[i]=new Adder(latch, i+1, buf);
adders[i].start();
}
latch.countDown();
List<Integer> list=new ArrayList<>(NUM);
while(list.size() != NUM) {
List<Integer> l=buf.removeMany(256);
if(l != null)
list.addAll(l);
}
assert list.size() == NUM;
for(int i=0; i < NUM; i++)
assert list.get(i) == i+1;
}

public void testPurge() {
RingBufferSeqno<Integer> buf=new RingBufferSeqno<>(10, 0);
int purged=buf.purge(0);
Expand Down Expand Up @@ -659,5 +701,21 @@ public void run() {
}
}

protected static class Remover extends Thread {
protected final RingBufferSeqno<Integer> buf;
protected List<Integer> removed;

public Remover(RingBufferSeqno<Integer> buf) {
this.buf=buf;
}

public List<Integer> removed() {return removed;}

public void run() {
Util.sleep(1000);
removed=buf.removeMany(5);
}
}


}

0 comments on commit 739cda4

Please sign in to comment.