Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] track the use of partition upsert metadata mgr to reset preloading flag #12938

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1333,6 +1333,9 @@ protected void doDestroy() {
_segmentLogger.error("Could not stop consumer thread");
}
_realtimeSegment.destroy();
if (_partitionUpsertMetadataManager != null) {
_partitionUpsertMetadataManager.decreaseReferenceCount();
}
closeStreamConsumers();
cleanupMetrics();
}
Expand Down
Expand Up @@ -406,7 +406,11 @@ public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig
_tableNameWithType));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig);
try {
partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig);
} finally {
partitionUpsertMetadataManager.decreaseReferenceCount();
}
// Continue to add segment after preloading, as the segment might not be added by preloading.
}
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
Expand Down Expand Up @@ -565,9 +569,13 @@ private void handleUpsert(ImmutableSegment immutableSegment) {
if (partitionUpsertMetadataManager.isPreloading()) {
// Preloading segment is ensured to be handled by a single thread, so no need to take the segment upsert lock.
// Besides, preloading happens before the table partition is made ready for any queries.
partitionUpsertMetadataManager.preloadSegment(immutableSegment);
registerSegment(segmentName, newSegmentManager);
_logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
try {
partitionUpsertMetadataManager.preloadSegment(immutableSegment);
registerSegment(segmentName, newSegmentManager);
_logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
} finally {
partitionUpsertMetadataManager.decreaseReferenceCount();
}
return;
}
// Replacing segment takes multiple steps, and particularly need to access the oldSegment. Replace segment may
Expand Down Expand Up @@ -610,6 +618,7 @@ private void handleUpsert(ImmutableSegment immutableSegment) {
releaseSegment(oldSegmentManager);
}
} finally {
partitionUpsertMetadataManager.decreaseReferenceCount();
segmentLock.unlock();
}
}
Expand Down
Expand Up @@ -122,6 +122,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
// The lock and boolean flag ensure only one thread can start preloading and preloading happens only once.
private final Lock _preloadLock = new ReentrantLock();
private volatile boolean _isPreloading;
// When no threads are using the manager and no segments are tracked, we can reset _isPreloading in order to
// preload segments again.
private int _referenceCount = 0;

protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
_tableNameWithType = tableNameWithType;
Expand All @@ -148,6 +151,24 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
}
}

@Override
public synchronized void increaseReferenceCount() {
_referenceCount++;
}

@Override
public synchronized void decreaseReferenceCount() {
_referenceCount--;
}

protected synchronized void tryResetIsPreloading() {
if (_enableSnapshot && _context.isPreloadEnabled() && _referenceCount == 0 && _trackedSegments.isEmpty()) {
_isPreloading = true;
_logger.info("Reset isPreloading to preload segments for table: {} partition: {} again", _tableNameWithType,
_partitionId);
}
}

@Override
public List<String> getPrimaryKeyColumns() {
return _primaryKeyColumns;
Expand Down
Expand Up @@ -37,8 +37,15 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta

@Override
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context));
return _partitionMetadataManagerMap.compute(partitionId, (k, mgr) -> {
if (mgr == null) {
mgr = new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, partitionId, _context);
} else {
mgr.tryResetIsPreloading();
}
mgr.increaseReferenceCount();
return mgr;
});
}

@Override
Expand Down
Expand Up @@ -110,6 +110,10 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
void removeExpiredPrimaryKeys();

void increaseReferenceCount();

void decreaseReferenceCount();

/**
* Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
*/
Expand Down