Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
bdeggleston committed Mar 14, 2024
1 parent f9291f4 commit f0c2022
Showing 1 changed file with 5 additions and 12 deletions.
Expand Up @@ -36,10 +36,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.impl.CommandTimeseries;
import accord.impl.CommandsForKey;
import accord.impl.SimpleProgressLog;
import accord.local.KeyHistory;
import accord.local.Node;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
Expand Down Expand Up @@ -204,22 +202,17 @@ private static <V> V getUninterruptibly(Future<V> future)
return getUninterruptibly(future, 1, TimeUnit.MINUTES);
}

private static <D> TxnId txnId(Timestamp key, CommandTimeseries<D> timeseries)
{
return timeseries.loader().txnId(timeseries.commands.get(key));
}

private static void awaitLocalApplyOnKey(PartitionKey key)
{
Node node = accordService().node();
AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key, KeyHistory.DEPS), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
CommandsForKey commands = store.depsCommandsForKey(key).current();
if (commands.commands().isEmpty())
CommandsForKey commands = store.commandsForKey(key).current();
int size = commands.size();
if (size < 1)
return;
Timestamp executeAt = commands.commands().commands.lastKey();
waitFor.set(txnId(executeAt, commands.commands()));
waitFor.set(commands.txnId(size - 1));
}));
Assert.assertNotNull(waitFor.get());
TxnId txnId = waitFor.get();
Expand Down

0 comments on commit f0c2022

Please sign in to comment.