Skip to content

Commit

Permalink
OAK-10766 | Make lease time out configurable for individual lanes (#1429
Browse files Browse the repository at this point in the history
)

* OAK-10766 | Make lease time out configurable for individual lanes
  • Loading branch information
nit0906 committed Apr 22, 2024
1 parent 4fdc15f commit c422207
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
Expand Up @@ -72,16 +72,16 @@ public class AsyncIndexerService {
@AttributeDefinition(
cardinality = 1024,
name = "Async Indexer Configs",
description = "Async indexer configs in the form of <name>:<interval in secs> e.g. \"async:5\""
description = "Async indexer configs in the form of <name>:<interval in secs>:<lease time out in minutes> e.g. \"async:5:15\""
)
String[] asyncConfigs() default {"async:5"};
String[] asyncConfigs() default {"async:5:15"};

@AttributeDefinition(
name = "Lease time out",
description = "Lease timeout in minutes. AsyncIndexer would wait for this timeout period before breaking " +
"async indexer lease"
)
int leaseTimeOutMinutes() default 15;
long leaseTimeOutMinutes() default 15L;

@AttributeDefinition(
name = "Failing Index Timeout (s)",
Expand Down Expand Up @@ -127,28 +127,34 @@ public void activate(BundleContext bundleContext, Configuration config) {
executor = new WhiteboardExecutor();
executor.start(whiteboard);

long leaseTimeOutMin = config.leaseTimeOutMinutes();

if (!(nodeStore instanceof Clusterable)){
leaseTimeOutMin = 0;
log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing");
}

TrackingCorruptIndexHandler corruptIndexHandler = createCorruptIndexHandler(config);

for (AsyncConfig c : asyncIndexerConfig) {
AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, indexEditorProvider,
statisticsProvider, false);
task.setCorruptIndexHandler(corruptIndexHandler);
task.setValidatorProviders(Collections.singletonList(validatorProvider));
task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));

long leaseTimeOutMin = config.leaseTimeOutMinutes();

// Set lease time out = 0 for a non clusterable setup.
if (!(nodeStore instanceof Clusterable)){
leaseTimeOutMin = 0;
log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing");
} else if (c.leaseTimeOutInMin != null) {
// If lease time out is configured for a specific lane, use that.
leaseTimeOutMin = c.leaseTimeOutInMin;
log.info("Lease time out for {} configured as {} mins.", c.name, leaseTimeOutMin);
} else {
log.info("Lease time out for {} not configured explicitly. Using value {} mins configured via Lease Time out property.", c.name, leaseTimeOutMin);
}

task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));
indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs);
closer.register(task);
}
registerAsyncReindexSupport(whiteboard);
log.info("Configured async indexers {} ", asyncIndexerConfig);
log.info("Lease time: {} mins and AsyncIndexUpdate configured with {}", leaseTimeOutMin, validatorProvider.getClass().getName());
}

private void registerAsyncReindexSupport(Whiteboard whiteboard) {
Expand Down Expand Up @@ -211,27 +217,33 @@ static List<AsyncConfig> getAsyncConfig(String[] configs) {
int idOfEq = config.indexOf(CONFIG_SEP);
checkArgument(idOfEq > 0, "Invalid config provided [%s]", Arrays.toString(configs));

String name = config.substring(0, idOfEq).trim();
long interval = Long.parseLong(config.substring(idOfEq + 1));
result.add(new AsyncConfig(name, interval));
String[] configElements = config.split(String.valueOf(CONFIG_SEP));
String name = configElements[0].trim();
Long interval = configElements.length > 1 ? Long.parseLong(configElements[1].trim()) : null;
Long leaseTimeOut = configElements.length > 2 ? Long.parseLong(configElements[2].trim()) : null;

result.add(new AsyncConfig(name, interval, leaseTimeOut));
}
return result;
}

static class AsyncConfig {
final String name;
final long timeIntervalInSecs;
final Long timeIntervalInSecs;
final Long leaseTimeOutInMin;

private AsyncConfig(String name, long timeIntervalInSecs) {
private AsyncConfig(String name, Long timeIntervalInSecs, Long leaseTimeOutInMin) {
this.name = AsyncIndexUpdate.checkValidName(name);
this.timeIntervalInSecs = timeIntervalInSecs;
this.leaseTimeOutInMin = leaseTimeOutInMin;
}

@Override
public String toString() {
return "AsyncConfig{" +
"name='" + name + '\'' +
", timeIntervalInSecs=" + timeIntervalInSecs +
", leaseTimeOutInMin=" + leaseTimeOutInMin +
'}';
}
}
Expand Down
Expand Up @@ -81,7 +81,7 @@ public void asyncReg() throws Exception{
}

@Test
public void leaseTimeout() throws Exception{
public void leaseTimeout() {
injectDefaultServices();
Map<String,Object> config = ImmutableMap.<String, Object>of(
"asyncConfigs", new String[] {"async:5"},
Expand All @@ -92,6 +92,21 @@ public void leaseTimeout() throws Exception{
assertEquals(TimeUnit.MINUTES.toMillis(20), indexUpdate.getLeaseTimeOut());
}

@Test
public void leaseTimeout2() {
injectDefaultServices();
Map<String,Object> config = ImmutableMap.<String, Object>of(
"asyncConfigs", new String[] {"async:5:13", "foo-async:5"},
"leaseTimeOutMinutes" , "20"
);
MockOsgi.activate(service, context.bundleContext(), config);
AsyncIndexUpdate indexUpdate = getIndexUpdate("async");
assertEquals(TimeUnit.MINUTES.toMillis(13), indexUpdate.getLeaseTimeOut());

indexUpdate = getIndexUpdate("foo-async");
assertEquals(TimeUnit.MINUTES.toMillis(20), indexUpdate.getLeaseTimeOut());
}

@Test
public void changeCollectionEnabled() throws Exception{
injectDefaultServices();
Expand Down Expand Up @@ -141,12 +156,18 @@ public void configParsing() throws Exception{
List<AsyncConfig> configs = AsyncIndexerService.getAsyncConfig(new String[]{"async:15"});
assertEquals(1, configs.size());
assertEquals("async", configs.get(0).name);
assertEquals(15, configs.get(0).timeIntervalInSecs);
assertEquals(Long.valueOf(15L), configs.get(0).timeIntervalInSecs);

configs = AsyncIndexerService.getAsyncConfig(new String[]{"async:15", "foo-async:23"});
assertEquals(2, configs.size());
assertEquals("foo-async", configs.get(1).name);
assertEquals(23, configs.get(1).timeIntervalInSecs);
assertEquals(Long.valueOf(23L), configs.get(1).timeIntervalInSecs);

configs = AsyncIndexerService.getAsyncConfig(new String[]{"async:15", "foo-async:23:5"});
assertEquals(2, configs.size());
assertEquals("foo-async", configs.get(1).name);
assertEquals(Long.valueOf(23L), configs.get(1).timeIntervalInSecs);
assertEquals(Long.valueOf(5L), configs.get(1).leaseTimeOutInMin);
}

@Test
Expand Down

0 comments on commit c422207

Please sign in to comment.