Skip to content

Commit

Permalink
[GEOS-11284] Fix issues from pull request revision
Browse files Browse the repository at this point in the history
* Add missing StyleInfo.setCatalog(Catalog) call in
  AbsractCatalogFacade.resolve(StyleInfo)
* Rename config property datadir.load.parallelism as
  datdir.loader.parallelism for consistency with datadir.loader.enabled,
  and make it either a config property (i.e. lower.case and
  dot-separated) or an environment variable (i.e. UPPER_CASE)
* Fix logic to dispose the catalog loader when both the Catalog and
  GeoServer have been loaded.
* Rename CatalogConfigLoader.save(...) to CatalogConfigLoader.add(...)
  for correct semantics.
* Rename method getXstream() to getXStream()
* Clear GeoServerExtensionsHelper (init(null) does it all) after tests
  that use it.
  • Loading branch information
groldan committed Mar 12, 2024
1 parent 47bc149 commit 7ea6d38
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private void resolveLayerGroupLayers(List<PublishedInfo> layers) {
protected void resolve(StyleInfo style) {
setId(style);

((StyleInfoImpl) style).setCatalog(getCatalog());
// resolve the workspace
WorkspaceInfo ws = style.getWorkspace();
if (ws != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
* number of available processors as reported by {@link Runtime#availableProcessors()}.
*
* <p>The parallelism level can also be overridden through the environment variable or system
* property {@literal DATADIR_LOAD_PARALLELISM}. A value of zero or less will produce a warning and
* fall back to the default value heuristic mentioned above.
* property {@literal DATADIR_LOADER_PARALLELISM}. A value of zero or less will produce a warning
* and fall back to the default value heuristic mentioned above.
*
* @implNote this class shares the loading workflow of default {@link GeoServerLoader} and {@link
* DefaultGeoServerLoader}, tapping into {@link #readCatalog(XStreamPersister)} and {@link
Expand All @@ -72,8 +72,8 @@
* @since 2.25
*/
public class DataDirectoryGeoServerLoader extends DefaultGeoServerLoader {
static final String SYSPROP_KEY = "datadir.loader.enabled";
static final String ENVVAR_KEY = "DATADIR_LOADER_ENABLED";
static final String ENABLED_PROPERTY = "datadir.loader.enabled";
static final String PARALLELISM_PROPERTY = "datadir.loader.parallelism";

static final Logger LOGGER =
Logging.getLogger(DataDirectoryGeoServerLoader.class.getPackage().getName());
Expand All @@ -93,6 +93,8 @@ public class DataDirectoryGeoServerLoader extends DefaultGeoServerLoader {
*/
private boolean geoserverLoaded;

private ApplicationContext applicationContext;

/**
* @param resourceLoader determines the physical location of the data directory
* @param securityManager used to post-process the loaded catalog on the calling thread to
Expand All @@ -107,6 +109,11 @@ public DataDirectoryGeoServerLoader(
this.securityManager = securityManager;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

@Override
public void destroy() {
dispose();
Expand Down Expand Up @@ -158,11 +165,14 @@ protected Catalog readCatalog(XStreamPersister xp) throws Exception {
catalog.setResourceLoader(resourceLoader);
List.copyOf(catalog.getListeners()).forEach(catalog::removeListener);

catalog = loader().loadCatalog(catalog);
try {
loader().loadCatalog(catalog);
} finally {
catalogLoaded = true;
disposeIfBothLoaded();
}
logStop(startedStopWatch.stop(), catalog);

catalogLoaded = true;
disposeIfBothLoaded();
catalog.resolve();
decryptDataStorePasswords(catalog);
return catalog;
Expand All @@ -185,10 +195,12 @@ protected void readConfiguration(GeoServer target, XStreamPersister xp) throws E
LOGGER.config("Loading GeoServer config...");
Stopwatch stopWatch = Stopwatch.createStarted();

loader().loadGeoServer(target);
geoserverLoaded = true;
disposeIfBothLoaded();

try {
loader().loadGeoServer(target);
} finally {
geoserverLoaded = true;
disposeIfBothLoaded();
}
LOGGER.log(
Level.CONFIG,
"GeoServer config (settings and services) loaded in {0}",
Expand Down Expand Up @@ -233,7 +245,9 @@ protected DataDirectoryLoader createLoader() {
FileSystemResourceStore resourceStore = resolveResourceStore(resourceLoader);
List<XStreamServiceLoader<ServiceInfo>> serviceLoaders = findServiceLoaders();
XStreamPersisterFactory persisterFactory = super.xpf;
return new DataDirectoryLoader(resourceStore, serviceLoaders, persisterFactory);
int parallelism = determineParallelism();
return new DataDirectoryLoader(
resourceStore, serviceLoaders, persisterFactory, parallelism);
}

static List<XStreamServiceLoader<ServiceInfo>> findServiceLoaders() {
Expand Down Expand Up @@ -303,10 +317,7 @@ private void logStop(Stopwatch stoppedSw, final Catalog catalog) {
* environment variable.
*/
public static boolean isEnabled(@Nullable ApplicationContext context) {
String value =
getProperty(context, SYSPROP_KEY)
.or(() -> getProperty(context, ENVVAR_KEY))
.orElse("true");
String value = getProperty(context, ENABLED_PROPERTY).orElse("true");
return Boolean.parseBoolean(value);
}

Expand All @@ -320,4 +331,38 @@ private static Optional<String> getProperty(@Nullable ApplicationContext context
}
return Optional.ofNullable(value);
}

private int determineParallelism() {
final int processors = Runtime.getRuntime().availableProcessors();
final int defParallelism = Math.min(processors, 16);
int parallelism = defParallelism;

String logTailMessage = "out of " + processors + " available cores.";

final String configuredParallelism =
getProperty(applicationContext, PARALLELISM_PROPERTY).orElse(null);
if (StringUtils.hasText(configuredParallelism)) {
boolean parseFail = false;
try {
parallelism = Integer.parseInt(configuredParallelism);
} catch (NumberFormatException nfe) {
parseFail = true;
}
if (parseFail || parallelism < 1) {
parallelism = defParallelism;
LOGGER.log(
Level.WARNING,
"Configured parallelism is invalid: {0}={1}, using default of {2}",
new Object[] {PARALLELISM_PROPERTY, configuredParallelism, defParallelism});
} else {
logTailMessage =
"as indicated by the " + PARALLELISM_PROPERTY + " environment variable";
}
}
LOGGER.log(
Level.CONFIG,
"Catalog and configuration loader uses {0} threads {1}",
new Object[] {parallelism, logTailMessage});
return parallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ private void loadWorkspace(WorkspaceDirectory wsdir) {
if (wsinfo.isPresent() && nsinfo.isPresent()) {
WorkspaceInfo ws = wsinfo.get();
NamespaceInfo ns = nsinfo.get();
save(ws, catalog::add);
save(ns, catalog::add);
add(ws, catalog::add);
add(ns, catalog::add);

loadStyles(wsdir.styles().stream());
loadStores(wsdir.stores());
Expand All @@ -137,7 +137,7 @@ private void loadWorkspace(WorkspaceDirectory wsdir) {
}

private void loadStyles(Stream<Path> stream) {
depersist(stream).map(StyleInfo.class::cast).forEach(this::save);
depersist(stream).map(StyleInfo.class::cast).forEach(this::add);
}

private void loadStores(Stream<StoreDirectory> stream) {
Expand All @@ -146,7 +146,7 @@ private void loadStores(Stream<StoreDirectory> stream) {

private void loadStore(StoreDirectory storeDir) {
Optional<StoreInfo> store = depersist(storeDir.storeFile);
store.flatMap(this::save)
store.flatMap(this::add)
.ifPresent(
storeInfo -> {
loadLayers(storeDir.layers());
Expand All @@ -163,35 +163,35 @@ private void loadResourceAndLayer(LayerDirectory layerDir) {

private Optional<ResourceInfo> loadResource(Path resourceFile) {
Optional<ResourceInfo> resource = depersist(resourceFile);
return resource.filter(res -> null != res.getStore()).flatMap(this::save);
return resource.filter(res -> null != res.getStore()).flatMap(this::add);
}

private void loadLayer(LayerDirectory layerDir) {
Optional<LayerInfo> layer = depersist(layerDir.layerFile);
layer.filter(l -> l.getResource() instanceof ResourceInfo).ifPresent(this::save);
layer.filter(l -> l.getResource() instanceof ResourceInfo).ifPresent(this::add);
}

private Optional<StoreInfo> save(StoreInfo info) {
return save(info, catalog::add);
private Optional<StoreInfo> add(StoreInfo info) {
return add(info, catalog::add);
}

private Optional<ResourceInfo> save(ResourceInfo info) {
return save(info, catalog::add);
private Optional<ResourceInfo> add(ResourceInfo info) {
return add(info, catalog::add);
}

private void save(LayerInfo info) {
save(info, catalog::add);
private void add(LayerInfo info) {
add(info, catalog::add);
}

private void save(LayerGroupInfo info) {
save(info, catalog::add);
private void add(LayerGroupInfo info) {
add(info, catalog::add);
}

private void save(StyleInfo info) {
save(info, catalog::add);
private void add(StyleInfo info) {
add(info, catalog::add);
}

private <I extends CatalogInfo> Optional<I> save(I info, Consumer<I> saver) {
private <I extends CatalogInfo> Optional<I> add(I info, Consumer<I> saver) {
try {
saver.accept(info);
} catch (Exception e) {
Expand All @@ -206,7 +206,7 @@ private <I extends CatalogInfo> Optional<I> save(I info, Consumer<I> saver) {
}

private void loadLayerGroups(Stream<Path> stream) {
depersist(stream).map(LayerGroupInfo.class::cast).forEach(this::save);
depersist(stream).map(LayerGroupInfo.class::cast).forEach(this::add);
}

private Stream<CatalogInfo> depersist(Stream<Path> stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
Expand All @@ -23,23 +22,15 @@
import org.geoserver.config.ServiceInfo;
import org.geoserver.config.util.XStreamPersisterFactory;
import org.geoserver.config.util.XStreamServiceLoader;
import org.geoserver.platform.GeoServerExtensions;
import org.geoserver.platform.resource.FileSystemResourceStore;
import org.geotools.util.logging.Logging;
import org.springframework.util.StringUtils;

/**
* Provides methods to load both the {@link Catalog} and the {@link GeoServer} config from a data
* directory, returning new instances of each.
* directory with a given parallelism.
*
* <p>This is not API, but a collaborator of {@link DataDirectoryGeoServerLoader}
*
* <p>The loading process is multi-threaded, and will take place in an {@link Executor} whose
* parallelism is determined by an heuristic resolving to the minimum between {@code 16} and the
* number of available processors as reported by {@link Runtime#availableProcessors()}, or
* overridden by the value passed through the environment variable or system property {@literal
* DATADIR_LOAD_PARALLELISM}.
*
* @implNote a {@link DataDirectoryWalker} is created and used to delegate the actual loading logic
* to the {@link CatalogConfigLoader} and {@link GeoServerConfigLoader} collaborators.
* @see DataDirectoryWalker
Expand All @@ -49,8 +40,6 @@
*/
public class DataDirectoryLoader {

private static final String DATADIR_LOAD_PARALLELISM = "DATADIR_LOAD_PARALLELISM";

private static final Logger LOGGER =
Logging.getLogger(DataDirectoryLoader.class.getPackage().getName());

Expand All @@ -63,14 +52,18 @@ public class DataDirectoryLoader {
private ForkJoinPool forkJoinPool;
private DataDirectoryWalker fileWalker;

private int parallelism;

public DataDirectoryLoader(
FileSystemResourceStore resourceStore,
List<XStreamServiceLoader<ServiceInfo>> serviceLoaders,
XStreamPersisterFactory xpf) {
XStreamPersisterFactory xpf,
int parallelism) {

this.resourceStore = resourceStore;
this.serviceLoaders = serviceLoaders;
this.xpf = xpf;
this.parallelism = parallelism;
}

private void init() {
Expand All @@ -83,7 +76,6 @@ private void init() {
.collect(Collectors.toList());
this.fileWalker = new DataDirectoryWalker(dataDirRoot, serviceFileNames);

final int parallelism = determineParallelism();
final boolean asyncMode = false;
this.forkJoinPool =
new ForkJoinPool(
Expand Down Expand Up @@ -154,39 +146,4 @@ public void dispose() {
}
}
}

private int determineParallelism() {
String configuredParallelism = GeoServerExtensions.getProperty(DATADIR_LOAD_PARALLELISM);
final int processors = Runtime.getRuntime().availableProcessors();
final int defParallelism = Math.min(processors, 16);
int parallelism = defParallelism;
String logTailMessage = "out of " + processors + " available cores.";
if (StringUtils.hasText(configuredParallelism)) {
boolean parseFail = false;
try {
parallelism = Integer.parseInt(configuredParallelism);
} catch (NumberFormatException nfe) {
parseFail = true;
}
if (parseFail || parallelism < 1) {
parallelism = defParallelism;
LOGGER.log(
Level.WARNING,
() ->
String.format(
"Configured parallelism is invalid: %s=%s, using default of %d",
DATADIR_LOAD_PARALLELISM,
configuredParallelism,
defParallelism));
} else {
logTailMessage =
"as indicated by the " + DATADIR_LOAD_PARALLELISM + " environment variable";
}
}
LOGGER.log(
Level.CONFIG,
"Catalog and configuration loader uses {0} threads {1}",
new Object[] {parallelism, logTailMessage});
return parallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void loadServices() {
private void loadRootServices() {
Resource baseDirectory = resourceStore.get("");
for (XStreamServiceLoader<ServiceInfo> loader : serviceLoaders) {
loadService(loader, baseDirectory).ifPresent(this::save);
loadService(loader, baseDirectory).ifPresent(this::add);
}
}

Expand All @@ -133,7 +133,7 @@ private long loadWorkspaceServices(WorkspaceDirectory ws) {
.map(Optional::get)
.peek(s -> warnIfWorkspaceIsNull(s, wsName))
.filter(service -> null != service.getWorkspace())
.map(this::save)
.map(this::add)
.count();
}

Expand Down Expand Up @@ -193,21 +193,21 @@ private void loadSettings() {
.filter(Optional::isPresent)
.map(Optional::get)
.map(SettingsInfo.class::cast)
.map(this::save)
.map(this::add)
.filter(Optional::isPresent)
.count();
config("Loaded {0} workspace-specific settings.", count);
}

private Optional<SettingsInfo> save(SettingsInfo settings) {
return save(settings, geoServer::add);
private Optional<SettingsInfo> add(SettingsInfo settings) {
return add(settings, geoServer::add);
}

private Optional<ServiceInfo> save(ServiceInfo service) {
return save(service, geoServer::add);
private Optional<ServiceInfo> add(ServiceInfo service) {
return add(service, geoServer::add);
}

private <I extends Info> Optional<I> save(I info, Consumer<I> saver) {
private <I extends Info> Optional<I> add(I info, Consumer<I> saver) {
try {
saver.accept(info);
return Optional.of(info);
Expand Down

0 comments on commit 7ea6d38

Please sign in to comment.