Skip to content

Commit

Permalink
have a time based rollover (#921)
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed May 8, 2024
1 parent 92ca175 commit 539ff05
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import com.slack.astra.proto.config.AstraConfigs;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.index.IndexNotFoundException;
Expand Down Expand Up @@ -44,28 +47,58 @@ public class DiskOrMessageCountBasedRolloverStrategy implements ChunkRollOverStr

private final AtomicReference<FSDirectory> activeChunkDirectory = new AtomicReference<>();
private final AtomicLong approximateDirectoryBytes = new AtomicLong(0);
private final AtomicBoolean maxTimePerChunksMinsReached = new AtomicBoolean(false);
private Instant rolloverStartTime;

public static DiskOrMessageCountBasedRolloverStrategy fromConfig(
MeterRegistry meterRegistry, AstraConfigs.IndexerConfig indexerConfig) {
return new DiskOrMessageCountBasedRolloverStrategy(
meterRegistry, indexerConfig.getMaxBytesPerChunk(), indexerConfig.getMaxMessagesPerChunk());
meterRegistry,
indexerConfig.getMaxBytesPerChunk(),
indexerConfig.getMaxMessagesPerChunk(),
indexerConfig.getMaxTimePerChunkSeconds() > 0
? indexerConfig.getMaxTimePerChunkSeconds()
: Long.MAX_VALUE);
}

public DiskOrMessageCountBasedRolloverStrategy(
MeterRegistry registry, long maxBytesPerChunk, long maxMessagesPerChunk) {
// Default max time per chunk is 24 hours
this(registry, maxBytesPerChunk, maxMessagesPerChunk, 24 * 60 * 60);
}

public DiskOrMessageCountBasedRolloverStrategy(
MeterRegistry registry,
long maxBytesPerChunk,
long maxMessagesPerChunk,
long maxTimePerChunksSeconds) {
ensureTrue(maxBytesPerChunk > 0, "Max bytes per chunk should be a positive number.");
ensureTrue(maxMessagesPerChunk > 0, "Max messages per chunk should be a positive number.");
this.maxBytesPerChunk = maxBytesPerChunk;
this.maxMessagesPerChunk = maxMessagesPerChunk;
this.registry = registry;
this.rolloverStartTime = Instant.now();
this.liveBytesDirGauge = this.registry.gauge(LIVE_BYTES_DIR, new AtomicLong(0));

directorySizeExecutorService.scheduleAtFixedRate(
() -> {
long dirSize = calculateDirectorySize(activeChunkDirectory);
// in case the method fails to calculate we return -1 so don't update the old value
if (dirSize > 0) {
approximateDirectoryBytes.set(dirSize);
try {
long dirSize = calculateDirectorySize(activeChunkDirectory);
// in case the method fails to calculate we return -1 so don't update the old value
if (dirSize > 0) {
approximateDirectoryBytes.set(dirSize);
}
if (!maxTimePerChunksMinsReached.get()
&& Instant.now()
.isAfter(rolloverStartTime.plus(maxTimePerChunksSeconds, ChronoUnit.SECONDS))) {
LOG.info(
"Max time per chunk reached. chunkStartTime: {} currentTime: {}",
rolloverStartTime,
Instant.now());
maxTimePerChunksMinsReached.set(true);
}
} catch (Exception e) {
LOG.error("Error calculating directory size", e);
}
},
DIRECTORY_SIZE_EXECUTOR_PERIOD_MS,
Expand All @@ -78,7 +111,8 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde
liveBytesDirGauge.set(approximateDirectoryBytes.get());
boolean shouldRollover =
(approximateDirectoryBytes.get() >= maxBytesPerChunk)
|| (currentMessagesIndexed >= maxMessagesPerChunk);
|| (currentMessagesIndexed >= maxMessagesPerChunk)
|| maxTimePerChunksMinsReached.get();
if (shouldRollover) {
LOG.debug(
"After {} messages and {} ingested bytes rolling over chunk of {} bytes",
Expand All @@ -97,6 +131,8 @@ public long getMaxBytesPerChunk() {
public void setActiveChunkDirectory(FSDirectory directory) {
this.activeChunkDirectory.set(directory);
approximateDirectoryBytes.set(0);
maxTimePerChunksMinsReached.set(false);
rolloverStartTime = Instant.now();
}

public static long calculateDirectorySize(AtomicReference<FSDirectory> activeChunkDirectoryRef) {
Expand Down
1 change: 1 addition & 0 deletions astra/src/main/proto/astra_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ message IndexerConfig {
// Chunk config
int64 max_messages_per_chunk = 1;
int64 max_bytes_per_chunk = 2;
int64 max_time_per_chunk_seconds = 14;

// Lucene config
LuceneConfig lucene_config = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,33 @@ public void testNegativeMaxBytesPerChunk() {
.isThrownBy(() -> new MessageSizeOrCountBasedRolloverStrategy(metricsRegistry, -100, 1));
}

@Test
public void testRolloverBasedOnMaxTime() throws Exception {
ChunkRollOverStrategy chunkRollOverStrategy =
new DiskOrMessageCountBasedRolloverStrategy(
metricsRegistry, Long.MAX_VALUE, Long.MAX_VALUE, 2);

initChunkManager(
chunkRollOverStrategy, S3_TEST_BUCKET, MoreExecutors.newDirectExecutorService());

// add 1 message so that new chunk is created
// wait for 2+ seconds so that the chunk rollover code will get triggered
// add 2nd message to trigger chunk rollover
// add 3rd message to create new chunk
chunkManager.addMessage(SpanUtil.makeSpan(1), 100, TEST_KAFKA_PARTITION_ID, 1);
// so that the chunk rollover code will get triggered
Thread.sleep(2_000 + DiskOrMessageCountBasedRolloverStrategy.DIRECTORY_SIZE_EXECUTOR_PERIOD_MS);
chunkManager.addMessage(SpanUtil.makeSpan(2), 100, TEST_KAFKA_PARTITION_ID, 1);
chunkManager.addMessage(SpanUtil.makeSpan(3), 100, TEST_KAFKA_PARTITION_ID, 1);

await().until(() -> getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, metricsRegistry) == 1);

assertThat(getCount(RollOverChunkTask.ROLLOVERS_INITIATED, metricsRegistry)).isEqualTo(1);
assertThat(getCount(RollOverChunkTask.ROLLOVERS_FAILED, metricsRegistry)).isEqualTo(0);
assertThat(getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, metricsRegistry)).isEqualTo(1);
assertThat(chunkManager.getChunkList().size()).isEqualTo(2);
}

@Test
public void testDiskBasedRolloverWithMaxMessages() throws Exception {
ChunkRollOverStrategy chunkRollOverStrategy =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public void testParseAstraYamlConfigFile() throws IOException {
final AstraConfigs.IndexerConfig indexerConfig = config.getIndexerConfig();
assertThat(indexerConfig.getMaxMessagesPerChunk()).isEqualTo(100);
assertThat(indexerConfig.getMaxBytesPerChunk()).isEqualTo(100000);
assertThat(indexerConfig.getMaxTimePerChunkSeconds()).isEqualTo(1800);
assertThat(indexerConfig.getLuceneConfig().getCommitDurationSecs()).isEqualTo(10);
assertThat(indexerConfig.getLuceneConfig().getRefreshDurationSecs()).isEqualTo(11);
assertThat(indexerConfig.getLuceneConfig().getEnableFullTextSearch()).isTrue();
Expand Down
1 change: 1 addition & 0 deletions astra/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ nodeRoles: [ INDEX,QUERY,CACHE,MANAGER ]
indexerConfig:
maxMessagesPerChunk: 100
maxBytesPerChunk: 100000
maxTimePerChunkSeconds: 1800
luceneConfig:
commitDurationSecs: 10
refreshDurationSecs: 11
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ nodeRoles: [${NODE_ROLES:-QUERY,INDEX,CACHE,MANAGER,RECOVERY,PREPROCESSOR}]
indexerConfig:
maxMessagesPerChunk: ${INDEXER_MAX_MESSAGES_PER_CHUNK:-100000}
maxBytesPerChunk: ${INDEXER_MAX_BYTES_PER_CHUNK:-1000000}
maxTimePerChunkSeconds: ${INDEXER_MAX_TIME_PER_CHUNK_SECONDS:-5400}
luceneConfig:
commitDurationSecs: ${INDEXER_COMMIT_DURATION_SECS:-10}
refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11}
Expand Down

0 comments on commit 539ff05

Please sign in to comment.