Skip to content

Commit

Permalink
Add additional topic subscriber metrics
Browse files Browse the repository at this point in the history
(merge main -> ce/main 108880)

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 108882]
  • Loading branch information
thegridman committed May 8, 2024
1 parent 85fb040 commit 64d79e8
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -112,6 +112,7 @@

import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
Expand All @@ -123,6 +124,7 @@

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -403,15 +405,37 @@ public <T> NamedTopic<T> getNamedTopic()
public CompletableFuture<Element<V>> receive()
{
ensureActive();
return (CompletableFuture<Element<V>>) f_queueReceiveOrders.add(ReceiveRequest.SINGLE);
CompletableFuture<Element<V>> future = (CompletableFuture<Element<V>>) f_queueReceiveOrders.add(ReceiveRequest.SINGLE);
f_cReceiveRequests.add(1L);
future.handle((e, error) ->
{
if (error instanceof CancellationException)
{
Logger.err("Receive cancelled", error);
f_cCancelled.add(1L);
}
return null;
});
return future;
}

@Override
@SuppressWarnings("unchecked")
public CompletableFuture<List<Element<V>>> receive(int cBatch)
{
ensureActive();
return (CompletableFuture<List<Element<V>>>) f_queueReceiveOrders.add(new ReceiveRequest(true, cBatch));
CompletableFuture<List<Element<V>>> future = (CompletableFuture<List<Element<V>>>) f_queueReceiveOrders.add(new ReceiveRequest(true, cBatch));
f_cReceiveRequests.add(1L);
future.handle((e, error) ->
{
if (error instanceof CancellationException)
{
Logger.err("Receive cancelled", error);
f_cCancelled.add(1L);
}
return null;
});
return future;
}

public Optional<Element<V>> peek(int nChannel)
Expand Down Expand Up @@ -796,6 +820,26 @@ public Channel getChannel(int nChannel)
return nChannel < m_aChannel.length ? m_aChannel[nChannel] : new Channel.EmptyChannel(nChannel);
}

/**
* Return the number of cancelled receive requests.
*
* @return the number of cancelled receive requests
*/
public long getCancelled()
{
return f_cCancelled.longValue();
}

/**
* Return the count of calls to one of the receive methods.
*
* @return the count of calls to one of the receive methods
*/
public long getReceiveRequests()
{
return f_cReceiveRequests.longValue();
}

/**
* Return the number of completed receive requests.
*
Expand Down Expand Up @@ -4912,4 +4956,14 @@ public interface StateListener
* Listeners for subscriber state changes.
*/
private final Listeners f_stateListeners = new Listeners();

/**
* The count of calls to the {@link #receive()} or {@link #receive(int)} methods.
*/
private final LongAdder f_cReceiveRequests = new LongAdder();

/**
* The count of receive futures that have been cancelled.
*/
private final LongAdder f_cCancelled = new LongAdder();
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -74,13 +74,16 @@ public SubscriberModel(PagedTopicSubscriber<?> subscriber)
addAttribute(ATTRIBUTE_NOTIFICATION_ID);
addAttribute(ATTRIBUTE_POLLS);
addAttribute(ATTRIBUTE_SUB_TYPE_CODE);
addAttribute(ATTRIBUTE_RECEIVE_CANCELLED);
addAttribute(ATTRIBUTE_RECEIVE_COMPLETIONS);
addAttribute(ATTRIBUTE_RECEIVE_COMPLETIONS_MEAN);
addAttribute(ATTRIBUTE_RECEIVE_COMPLETIONS_ONE);
addAttribute(ATTRIBUTE_RECEIVE_COMPLETIONS_FIVE);
addAttribute(ATTRIBUTE_RECEIVE_COMPLETIONS_FIFTEEN);
addAttribute(ATTRIBUTE_RECEIVE_EMPTY);
addAttribute(ATTRIBUTE_RECEIVE_ERRORS);
addAttribute(ATTRIBUTE_RECEIVE_QUEUE);
addAttribute(ATTRIBUTE_RECEIVE_REQUESTS);
addAttribute(ATTRIBUTE_SERIALIZER);
addAttribute(ATTRIBUTE_STATE);
addAttribute(ATTRIBUTE_STATE_NAME);
Expand Down Expand Up @@ -315,6 +318,36 @@ protected String getSerializer()
return String.valueOf(f_subscriber.getSerializer());
}

/**
* Return the count of calls to one of the receive methods.
*
* @return the count of calls to one of the receive methods
*/
protected long getReceiveCalls()
{
return f_subscriber.getReceiveRequests();
}

/**
* Return the count of receive requests waiting.
*
* @return the count of receive requests waiting
*/
protected int getReceiveQueueSize()
{
return f_subscriber.getReceiveQueueSize();
}

/**
* Return the count of cancelled receive requests completed.
*
* @return the count of cancelled receive requests completed
*/
protected long getCancelledCount()
{
return f_subscriber.getCancelled();
}

/**
* Return the count of receive requests completed.
*
Expand Down Expand Up @@ -606,6 +639,36 @@ protected void invokeNotifyChannel(Object[] aoParam)
.metric(true)
.build();

/**
* The number of cancelled received requests.
*/
protected static final ModelAttribute<SubscriberModel> ATTRIBUTE_RECEIVE_CANCELLED =
SimpleModelAttribute.longBuilder("CancelledCount", SubscriberModel.class)
.withDescription("The number of cancelled receive requests.")
.withFunction(SubscriberModel::getCancelledCount)
.metric("CancelledCount")
.build();

/**
* The number of outstanding received requests.
*/
protected static final ModelAttribute<SubscriberModel> ATTRIBUTE_RECEIVE_QUEUE =
SimpleModelAttribute.longBuilder("ReceiveBacklog", SubscriberModel.class)
.withDescription("The number of outstanding receive requests.")
.withFunction(SubscriberModel::getReceiveQueueSize)
.metric("ReceiveBacklog")
.build();

/**
* The number of calls to one of the receive methods.
*/
protected static final ModelAttribute<SubscriberModel> ATTRIBUTE_RECEIVE_REQUESTS =
SimpleModelAttribute.longBuilder("ReceiveRequestCount", SubscriberModel.class)
.withDescription("The number of calls to one of the receive methods.")
.withFunction(SubscriberModel::getReceiveCalls)
.metric("ReceiveRequestCount")
.build();

/**
* The number of completed received requests.
*/
Expand Down

0 comments on commit 64d79e8

Please sign in to comment.