Skip to content

Commit

Permalink
Adding logging
Browse files Browse the repository at this point in the history
  • Loading branch information
thegridman committed Nov 22, 2023
1 parent cac5ba6 commit df94946
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/topics-tests.yaml
Expand Up @@ -12,7 +12,7 @@ name: Topics Test
on:
workflow_dispatch:
schedule:
- cron: '0 * * * *'
- cron: '0/30 * * * *'
push:
branches-ignore:
- gh-pages
Expand Down
Expand Up @@ -253,6 +253,7 @@ public OfferProcessor.Result offerToPageTail(BinaryEntry<Page.Key, Page> entry,
List<Binary> listElements = processor.getElements();
int nNotifyPostFull = processor.getNotifyPostFull();
boolean fSealPage = processor.isSealPage();
Logger.info("**** Entered (channel=" + nChannel + ") PagedTopicPartition.offerToPageTail() key=" + entry.getKey());

if (nChannel >= channelCount)
{
Expand Down Expand Up @@ -286,6 +287,7 @@ public OfferProcessor.Result offerToPageTail(BinaryEntry<Page.Key, Page> entry,
Page page = ensurePage(nChannel, entry);
if (page == null || page.isSealed())
{
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.offerToPageTail() - page sealed key=" + entry.getKey());
// The page has been removed or is full so the producer's idea of the tail
// is out of date, and it needs to re-offer to the correct tail page
return new OfferProcessor.Result(OfferProcessor.Result.Status.PageSealed, 0, 0, PagedPosition.NULL_OFFSET);
Expand All @@ -296,6 +298,7 @@ else if (page.getTail() == Page.EMPTY)
cbCapServer, cbCapPage);
if (result != null)
{
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.offerToPageTail() - (1) result=" + result + " key=" + entry.getKey());
return result;
}
}
Expand Down Expand Up @@ -368,9 +371,12 @@ else if (page.getTail() == Page.EMPTY)
// update the page entry with the modified page
entry.setValue(page);

getStatistics().onPublished(nChannel, cAccepted, new PagedPosition(keyPage.getPageId(), page.getTail()));
int nTailEnd = page.getTail();
getStatistics().onPublished(nChannel, cAccepted, new PagedPosition(keyPage.getPageId(), nTailEnd));

return new OfferProcessor.Result(status, cAccepted, cbRemainingCapacity, nTailStart + 1);
OfferProcessor.Result result = new OfferProcessor.Result(status, cAccepted, cbRemainingCapacity, nTailStart + 1);
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.offerToPageTail() - (2) pageId=" + lPage + " nTailEnd=" + nTailEnd + " result=" + result + " key=" + entry.getKey());
return result;
}

/**
Expand Down Expand Up @@ -732,10 +738,13 @@ public boolean removePageIfNotRetainingElements(int nChannel, long lPage)
*/
public boolean removePage(int nChannel, long lPage)
{
Logger.info("**** Entered (channel=" + nChannel + ") PagedTopicPartition.removePage() - lPage=" + lPage);

BinaryEntry<Page.Key, Page> entryPage = enlistPageEntry(nChannel, lPage);
Page page = entryPage.getValue();
if (page == null)
{
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.removePage() - Page is NULL");
return false;
}

Expand Down Expand Up @@ -769,9 +778,11 @@ public boolean removePage(int nChannel, long lPage)
}
}

Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.removePage() - Removing Page");
entryPage.remove(false);

// notify any publishers waiting for space to free up
Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.removePage() - Notifying");
notifyAll(usage.resetRemovalNotifiers());

return true;
Expand All @@ -786,13 +797,15 @@ public void notifyAll(int[] anNotify)
{
if (anNotify != null)
{
Logger.info("**** In PagedTopicPartition.notifyAll() - anNotify=" + java.util.Arrays.toString(anNotify));
BackingMapContext ctxNotify = getBackingMapContext(PagedTopicCaches.Names.NOTIFICATIONS);
int nPart = getPartition();
for (int nNotify : anNotify)
{
InvocableMap.Entry entry = ctxNotify.getBackingMapEntry(toBinaryKey(new NotificationKey(nPart, nNotify)));
if (entry.isPresent())
{
Logger.info("**** In PagedTopicPartition.notifyAll() - removing entry " + entry);
entry.remove(false);
}
}
Expand Down Expand Up @@ -1369,17 +1382,20 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
Subscription.Key keySubscription = entrySubscription.getKey();
int nChannel = keySubscription.getChannelId();
int cChannel = getChannelCount();
Logger.info("**** Entered (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - key=" + entrySubscription.getKey());

if (nChannel >= cChannel || nChannel < 0)
{
// the subscriber requested a channel that does not exist
// the subscriber may have an incorrect channel count
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - channel does not exist key=" + entrySubscription.getKey());
return PollProcessor.Result.notAllocated(0);
}

if (!entrySubscription.isPresent() || entrySubscription.getValue() == null)
{
// the subscriber is unknown, but we're allowing that as the client should handle it
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - unknown subscriber key=" + entrySubscription.getKey());
return PollProcessor.Result.unknownSubscriber();
}

Expand All @@ -1390,6 +1406,7 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
if (cChannel != cChannelActual)
{
// the channel count has changed to force the subscriber to reconnect, which will refresh allocations
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - unknown subscriber key=" + entrySubscription.getKey());
return PollProcessor.Result.unknownSubscriber();
}

Expand All @@ -1403,6 +1420,7 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
if (!Objects.equals(owner, subscriberId))
{
// the subscriber does not own this channel, it should not have got here, but it probably had out of date state
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - unowned channel key=" + entrySubscription.getKey());
return PollProcessor.Result.notAllocated(Integer.MAX_VALUE);
}

Expand All @@ -1412,6 +1430,7 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
if (lPage == Page.NULL_PAGE)
{
// subscriber tried to poll a page that is before the first ever head
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - page is before head key=" + entrySubscription.getKey());
return PollProcessor.Result.exhausted(subscription);
}

Expand All @@ -1427,6 +1446,7 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
// We should have previously exhausted and detached from this page, we can't just fall through
// as the page has already been detached we can't allow a double detach
checkForPageCleanup(entrySubscription, lPage, page);
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - page less than last committed key=" + entrySubscription.getKey());
return PollProcessor.Result.exhausted(subscription);
}

Expand All @@ -1440,6 +1460,7 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
// We should have previously exhausted and detached from this page, we can't just fall through
// as the page has already been detached we can't allow a double detach
checkForPageCleanup(entrySubscription, lPage, page);
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - page is exhausted (1) key=" + entrySubscription.getKey());
return PollProcessor.Result.exhausted(subscription);
}
}
Expand All @@ -1455,11 +1476,13 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
// the client is making a blind request, or
// we'd previously exhausted and detached from this page, we can't just fall through
// as the page has already been detached we can't allow a double detach
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - blind request key=" + entrySubscription.getKey());
return PollProcessor.Result.exhausted(subscription);
}
}
else if (lPage < lPageThis) // read from fully consumed page
{
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - page is exhausted (2) key=" + entrySubscription.getKey());
return PollProcessor.Result.exhausted(subscription);
}
else // otherwise lPage > lPageThis; first poll from page, start at the beginning
Expand All @@ -1471,6 +1494,7 @@ else if (lPage < lPageThis) // read from fully consumed page
page = ensurePage(nChannel, enlistPageEntry(nChannel, lPage));
if (page == null)
{
Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - page is exhausted (3) key=" + entrySubscription.getKey());
return PollProcessor.Result.exhausted(subscription);
}
}
Expand Down Expand Up @@ -1502,6 +1526,9 @@ else if (lPage < lPageThis) // read from fully consumed page
nPos = Math.max(nPos, nPosCommitted + 1);
}

Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - About to enter loop for contents nPos=" + nPos + " nPosTail=" + nPosTail
+ " nPosCommitted=" + nPosCommitted + " cReqValues=" + cReqValues);

for (; cReqValues > 0 && nPos <= nPosTail && cbResult < cbLimit; ++nPos)
{
Binary binPosKey = ContentKey.toBinary(f_nPartition, nChannel, lPage, nPos);
Expand All @@ -1522,6 +1549,9 @@ else if (lPage < lPageThis) // read from fully consumed page
}
}

Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() in content loop - nPos=" + nPos + " nPosTail=" + nPosTail
+ " cReqValues=" + cReqValues + " binValue=" + binValue);

if (binValue != null && (filter == null || InvocableMapHelper.evaluateEntry(filter, entryElement)))
{
if (fnConvert != null)
Expand All @@ -1545,6 +1575,8 @@ else if (lPage < lPageThis) // read from fully consumed page
long lPageNext = lPage;
int nPosNext = nPos;

Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() finished content loop - nPos=" + nPos + " nPosTail=" + nPosTail + " sealed=" + page.isSealed());

if (nPos > nPosTail && page.isSealed())
{
page = enlistPage(nChannel, lPage);
Expand Down Expand Up @@ -1580,6 +1612,7 @@ else if (lPage < lPageThis) // read from fully consumed page
// so there may now be an entry in the page we have not read.
// Now we have enlisted the page it is not going to change, so we can tell by checking the tail.
int nPosTailLatest = pageEnlisted.getTail();
Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - nPos=" + nPos + " nPosTail=" + nPosTail + " nPosTailLatest=" + nPosTailLatest);
if (nPosTail == nPosTailLatest)
{
// the tail has not changed, so we need to add a notification
Expand All @@ -1595,6 +1628,7 @@ else if (lPage < lPageThis) // read from fully consumed page
PagedPosition posHead = new PagedPosition(lPageNext, nPosNext);
getStatistics().getSubscriberGroupStatistics(subscriberGroupId).onPolled(nChannel, listValues.size(), posHead);

Logger.info("**** Leaving (channel=" + nChannel + ") PagedTopicPartition.pollFromPageHead() - result=" + result + " key=" + entrySubscription.getKey());
return result;
}

Expand Down Expand Up @@ -2356,6 +2390,8 @@ else if (nMatch < 0)
*/
protected void requestInsertionNotification(Page page, int nNotifyPostEmpty, int nChannel)
{
Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.requestInsertionNotification() channel=" + nChannel
+ " nNotifyPostEmpty=" + nNotifyPostEmpty + " page=" + page);
// record that this subscriber requires notification on the next insert to this page
page.addInsertionNotifier(nNotifyPostEmpty);
BinaryEntry<NotificationKey, int[]> entry = enlistBackingMapEntry(PagedTopicCaches.Names.NOTIFICATIONS,
Expand All @@ -2366,7 +2402,9 @@ protected void requestInsertionNotification(Page page, int nNotifyPostEmpty, int
entry.setValue(Arrays.binaryInsert(entry.getValue(), nChannel));
// we expire in half the subscriber timeout time to ensure we see receive requests from waiting subscribers
// so that they do not time out
entry.expire(getDependencies().getNotificationTimeout());
long cMillis = getDependencies().getNotificationTimeout();
entry.expire(cMillis);
Logger.info("**** In (channel=" + nChannel + ") PagedTopicPartition.requestInsertionNotification() - added notification for channel=" + nChannel + " expiry=" + cMillis + " value=" + java.util.Arrays.toString(entry.getValue()));
}
}

Expand Down

0 comments on commit df94946

Please sign in to comment.