From c4222079a9842ed3aacdafe6529679b7c74347ba Mon Sep 17 00:00:00 2001 From: nit0906 Date: Mon, 22 Apr 2024 16:10:37 +0530 Subject: [PATCH] OAK-10766 | Make lease time out configurable for individual lanes (#1429) * OAK-10766 | Make lease time out configurable for individual lanes --- .../plugins/index/AsyncIndexerService.java | 46 ++++++++++++------- .../index/AsyncIndexerServiceTest.java | 27 +++++++++-- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java index 27a31cd74cc..22fdcbe40dd 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java @@ -72,16 +72,16 @@ public class AsyncIndexerService { @AttributeDefinition( cardinality = 1024, name = "Async Indexer Configs", - description = "Async indexer configs in the form of : e.g. \"async:5\"" + description = "Async indexer configs in the form of :: e.g. \"async:5:15\"" ) - String[] asyncConfigs() default {"async:5"}; + String[] asyncConfigs() default {"async:5:15"}; @AttributeDefinition( name = "Lease time out", description = "Lease timeout in minutes. AsyncIndexer would wait for this timeout period before breaking " + "async indexer lease" ) - int leaseTimeOutMinutes() default 15; + long leaseTimeOutMinutes() default 15L; @AttributeDefinition( name = "Failing Index Timeout (s)", @@ -127,13 +127,6 @@ public void activate(BundleContext bundleContext, Configuration config) { executor = new WhiteboardExecutor(); executor.start(whiteboard); - long leaseTimeOutMin = config.leaseTimeOutMinutes(); - - if (!(nodeStore instanceof Clusterable)){ - leaseTimeOutMin = 0; - log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing"); - } - TrackingCorruptIndexHandler corruptIndexHandler = createCorruptIndexHandler(config); for (AsyncConfig c : asyncIndexerConfig) { @@ -141,14 +134,27 @@ public void activate(BundleContext bundleContext, Configuration config) { statisticsProvider, false); task.setCorruptIndexHandler(corruptIndexHandler); task.setValidatorProviders(Collections.singletonList(validatorProvider)); - task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin)); + long leaseTimeOutMin = config.leaseTimeOutMinutes(); + + // Set lease time out = 0 for a non clusterable setup. + if (!(nodeStore instanceof Clusterable)){ + leaseTimeOutMin = 0; + log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing"); + } else if (c.leaseTimeOutInMin != null) { + // If lease time out is configured for a specific lane, use that. + leaseTimeOutMin = c.leaseTimeOutInMin; + log.info("Lease time out for {} configured as {} mins.", c.name, leaseTimeOutMin); + } else { + log.info("Lease time out for {} not configured explicitly. Using value {} mins configured via Lease Time out property.", c.name, leaseTimeOutMin); + } + + task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin)); indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs); closer.register(task); } registerAsyncReindexSupport(whiteboard); log.info("Configured async indexers {} ", asyncIndexerConfig); - log.info("Lease time: {} mins and AsyncIndexUpdate configured with {}", leaseTimeOutMin, validatorProvider.getClass().getName()); } private void registerAsyncReindexSupport(Whiteboard whiteboard) { @@ -211,20 +217,25 @@ static List getAsyncConfig(String[] configs) { int idOfEq = config.indexOf(CONFIG_SEP); checkArgument(idOfEq > 0, "Invalid config provided [%s]", Arrays.toString(configs)); - String name = config.substring(0, idOfEq).trim(); - long interval = Long.parseLong(config.substring(idOfEq + 1)); - result.add(new AsyncConfig(name, interval)); + String[] configElements = config.split(String.valueOf(CONFIG_SEP)); + String name = configElements[0].trim(); + Long interval = configElements.length > 1 ? Long.parseLong(configElements[1].trim()) : null; + Long leaseTimeOut = configElements.length > 2 ? Long.parseLong(configElements[2].trim()) : null; + + result.add(new AsyncConfig(name, interval, leaseTimeOut)); } return result; } static class AsyncConfig { final String name; - final long timeIntervalInSecs; + final Long timeIntervalInSecs; + final Long leaseTimeOutInMin; - private AsyncConfig(String name, long timeIntervalInSecs) { + private AsyncConfig(String name, Long timeIntervalInSecs, Long leaseTimeOutInMin) { this.name = AsyncIndexUpdate.checkValidName(name); this.timeIntervalInSecs = timeIntervalInSecs; + this.leaseTimeOutInMin = leaseTimeOutInMin; } @Override @@ -232,6 +243,7 @@ public String toString() { return "AsyncConfig{" + "name='" + name + '\'' + ", timeIntervalInSecs=" + timeIntervalInSecs + + ", leaseTimeOutInMin=" + leaseTimeOutInMin + '}'; } } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java index 99eba3eeb1e..98eca49389b 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java @@ -81,7 +81,7 @@ public void asyncReg() throws Exception{ } @Test - public void leaseTimeout() throws Exception{ + public void leaseTimeout() { injectDefaultServices(); Map config = ImmutableMap.of( "asyncConfigs", new String[] {"async:5"}, @@ -92,6 +92,21 @@ public void leaseTimeout() throws Exception{ assertEquals(TimeUnit.MINUTES.toMillis(20), indexUpdate.getLeaseTimeOut()); } + @Test + public void leaseTimeout2() { + injectDefaultServices(); + Map config = ImmutableMap.of( + "asyncConfigs", new String[] {"async:5:13", "foo-async:5"}, + "leaseTimeOutMinutes" , "20" + ); + MockOsgi.activate(service, context.bundleContext(), config); + AsyncIndexUpdate indexUpdate = getIndexUpdate("async"); + assertEquals(TimeUnit.MINUTES.toMillis(13), indexUpdate.getLeaseTimeOut()); + + indexUpdate = getIndexUpdate("foo-async"); + assertEquals(TimeUnit.MINUTES.toMillis(20), indexUpdate.getLeaseTimeOut()); + } + @Test public void changeCollectionEnabled() throws Exception{ injectDefaultServices(); @@ -141,12 +156,18 @@ public void configParsing() throws Exception{ List configs = AsyncIndexerService.getAsyncConfig(new String[]{"async:15"}); assertEquals(1, configs.size()); assertEquals("async", configs.get(0).name); - assertEquals(15, configs.get(0).timeIntervalInSecs); + assertEquals(Long.valueOf(15L), configs.get(0).timeIntervalInSecs); configs = AsyncIndexerService.getAsyncConfig(new String[]{"async:15", "foo-async:23"}); assertEquals(2, configs.size()); assertEquals("foo-async", configs.get(1).name); - assertEquals(23, configs.get(1).timeIntervalInSecs); + assertEquals(Long.valueOf(23L), configs.get(1).timeIntervalInSecs); + + configs = AsyncIndexerService.getAsyncConfig(new String[]{"async:15", "foo-async:23:5"}); + assertEquals(2, configs.size()); + assertEquals("foo-async", configs.get(1).name); + assertEquals(Long.valueOf(23L), configs.get(1).timeIntervalInSecs); + assertEquals(Long.valueOf(5L), configs.get(1).leaseTimeOutInMin); } @Test