Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GG-37730 Logging improvements for events of WAL segments deletion #2963

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1067,16 +1067,19 @@ private boolean hasIndex(long absIdx) {

int deleted = 0;

List<String> deletedSegments = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use lazy initialization here and in similar places, since this is only for logging

long lastCpIdx = lastCheckpointPtr.index();

for (FileDescriptor desc : descs) {
long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex();

long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex();

if (desc.idx >= lastCheckpointPtr.index() // We cannot delete segments needed for binary recovery.
if (desc.idx >= lastCpIdx // We cannot delete segments needed for binary recovery.
|| desc.idx >= lastArchived // We cannot delete last segment, it is needed at start of node and avoid gaps.
|| desc.idx >= highPtr.index() // We cannot delete segments larger than the border.
|| !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment.
return deleted;
break;

long len = desc.file.length();

Expand All @@ -1087,6 +1090,8 @@ private boolean hasIndex(long absIdx) {
else {
deleted++;

deletedSegments.add(desc.file().getName());

long idx = desc.idx();

segmentSize.remove(idx);
Expand All @@ -1100,6 +1105,11 @@ private boolean hasIndex(long absIdx) {
cctx.kernalContext().encryption().onWalSegmentRemoved(desc.idx);
}

if (log.isInfoEnabled() && !deletedSegments.isEmpty()) {
log.info("Segments removed after WAL archive cleaning [cleanedSegments=" + deletedSegments
+ ", lastCpIdx=" + lastCpIdx + ", highIdx=" + highPtr.index() + ']');
}

return deleted;
}

Expand Down Expand Up @@ -2412,21 +2422,36 @@ private void deleteObsoleteRawSegments() {
Set<Long> indices = new HashSet<>();
Set<Long> duplicateIndices = new HashSet<>();

long lastCpIndex = lastCheckpointPtr.index();

for (FileDescriptor desc : descs) {
if (!indices.add(desc.idx))
duplicateIndices.add(desc.idx);
}

List<Long> deletedRawSegments = new ArrayList<>();

for (FileDescriptor desc : descs) {
if (desc.isCompressed())
continue;

// Do not delete reserved or locked segment and any segment after it.
if (segmentReservedOrLocked(desc.idx))
return;
break;

if (desc.idx < lastCpIndex && duplicateIndices.contains(desc.idx)) {
long cleanedUpSize = deleteArchiveFiles(desc.file);

if (cleanedUpSize > 0)
deletedRawSegments.add(desc.idx);

segmentAware.addSize(desc.idx, -cleanedUpSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move this line under the if as well?

}
}

if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx))
segmentAware.addSize(desc.idx, -deleteArchiveFiles(desc.file));
if (log.isInfoEnabled() && !deletedRawSegments.isEmpty()) {
log.info("Raw segments removed after compression [deletedSegments=" + deletedRawSegments
+ ", lastCpIndex=" + lastCpIndex + ']');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

Expand Down Expand Up @@ -92,6 +95,9 @@ public class WalCompactionTest extends GridCommonAbstractTest {
/** Wal mode. */
private WALMode walMode;

/** Test logger. */
private ListeningTestLogger listeningLog;

/** */
private static class RolloverRecord extends CheckpointRecord {
/** */
Expand All @@ -104,6 +110,9 @@ private RolloverRecord() {
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);

if (listeningLog != null)
cfg.setGridLogger(listeningLog);

DataStorageConfiguration dsCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
Expand Down Expand Up @@ -176,6 +185,13 @@ private RolloverRecord() {
public void testBinaryRecoveryAfterFullCompaction() throws Exception {
int minNumberOfSegmentsToCompress = 5;

//checking that at least one segment is deleted eventually
LogListener logLsnr = LogListener.matches("Raw segments removed after compression [deletedSegments=[0")
.build();

listeningLog = new ListeningTestLogger(GridAbstractTest.log);
listeningLog.registerListener(logLsnr);

walSegments = minNumberOfSegmentsToCompress;
fileIoFactory = new CheckpointFailingIoFactory(false);

Expand All @@ -192,6 +208,9 @@ public void testBinaryRecoveryAfterFullCompaction() throws Exception {
() -> wal(ig).lastArchivedSegment() >= minNumberOfSegmentsToCompress,
getTestTimeout()));

// no raw segments are compressed/deleted until a cp covering at least one wal segment happens
assertFalse(logLsnr.check());

forceCheckpoint(ig);

loadData(cache, ENTRIES);
Expand All @@ -203,7 +222,11 @@ public void testBinaryRecoveryAfterFullCompaction() throws Exception {
() -> wal(ig).lastArchivedSegment() > lastCpIdx,
getTestTimeout()));

Collection<?> queue = GridTestUtils.getFieldValue(wal(ig), "segmentAware", "segmentCompressStorage", "segmentsToCompress");
//after the second checkpoint there are some wal segments that could be compressed and deleted
assertTrue(logLsnr.check());

Collection<?> queue = GridTestUtils.getFieldValue(wal(ig),
"segmentAware", "segmentCompressStorage", "segmentsToCompress");

assertTrue(GridTestUtils.waitForCondition(queue::isEmpty, getTestTimeout()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
Expand All @@ -41,6 +43,8 @@
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
Expand All @@ -62,16 +66,19 @@
*/
public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractTest {
/**
* Start grid with override default configuration via customConfigurator.
* Start grid with override default configuration via customConfigurator and with custom logger (if provided).
sergey-chugunov-1985 marked this conversation as resolved.
Show resolved Hide resolved
*/
private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator) throws Exception {
private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator, IgniteLogger customLog) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use Nullable annotations in GG?

IgniteConfiguration configuration = getConfiguration(getTestIgniteInstanceName());

if (customLog != null)
configuration.setGridLogger(customLog);

DataStorageConfiguration dbCfg = new DataStorageConfiguration();

dbCfg.setWalMode(walMode());
dbCfg.setWalSegmentSize(512 * 1024);
dbCfg.setCheckpointFrequency(60 * 1000);//too high value for turn off frequency checkpoint.
dbCfg.setCheckpointFrequency(60 * 1000);//too high value for turn off triggering checkpoint by timeout.
sergey-chugunov-1985 marked this conversation as resolved.
Show resolved Hide resolved
dbCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100 * 1024 * 1024)
.setPersistenceEnabled(true));
Expand All @@ -87,6 +94,13 @@ private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator)
return ignite;
}

/**
* Start grid with override default configuration via customConfigurator and default logger.
sergey-chugunov-1985 marked this conversation as resolved.
Show resolved Hide resolved
*/
private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator) throws Exception {
return startGrid(customConfigurator, null);
}

/** */
private CacheConfiguration<Integer, Object> cacheConfiguration() {
CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
Expand Down Expand Up @@ -125,9 +139,16 @@ private String findSourceMessage(Throwable ex) {
*/
@Test
public void testCorrectDeletedArchivedWalFiles() throws Exception {
ListeningTestLogger listeningLog = new ListeningTestLogger(GridAbstractTest.log);
//"Segments removed after WAL archive cleaning [cleanedSegments=["
LogListener listener = LogListener
.matches(Pattern.compile("Segments removed after WAL archive cleaning \\[cleanedSegments=\\[\\d+.wal"))
.build();
listeningLog.registerListener(listener);

//given: configured grid with setted max wal archive size
long maxWalArchiveSize = 2 * 1024 * 1024;
Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(maxWalArchiveSize));
Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(maxWalArchiveSize), listeningLog);

GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);

Expand Down Expand Up @@ -158,6 +179,8 @@ public void testCorrectDeletedArchivedWalFiles() throws Exception {
assertFalse(Stream.of(files).anyMatch(desc -> desc.file().getName().endsWith("00001.wal")));

assertTrue(!hist.checkpoints().isEmpty());

assertTrue(listener.check());
}

/**
Expand Down