Skip to content

Commit

Permalink
GG-37730 All instances when WAL segments are deleted are provided wit…
Browse files Browse the repository at this point in the history
…h additional logging (#2963)

Co-authored-by: Alexander Polovtcev <alex.polovtcev@gmail.com>

Signed-off-by: Alexander Polovtcev <alex.polovtcev@gmail.com>
  • Loading branch information
sergey-chugunov-1985 committed Nov 23, 2023
1 parent 17860d2 commit 77268b6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1067,16 +1067,19 @@ private boolean hasIndex(long absIdx) {

int deleted = 0;

List<String> deletedSegments = null;
long lastCpIdx = lastCheckpointPtr.index();

for (FileDescriptor desc : descs) {
long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex();

long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex();

if (desc.idx >= lastCheckpointPtr.index() // We cannot delete segments needed for binary recovery.
if (desc.idx >= lastCpIdx // We cannot delete segments needed for binary recovery.
|| desc.idx >= lastArchived // We cannot delete last segment, it is needed at start of node and avoid gaps.
|| desc.idx >= highPtr.index() // We cannot delete segments larger than the border.
|| !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment.
return deleted;
break;

long len = desc.file.length();

Expand All @@ -1087,6 +1090,11 @@ private boolean hasIndex(long absIdx) {
else {
deleted++;

if (deletedSegments == null)
deletedSegments = new ArrayList<>();

deletedSegments.add(desc.file().getName());

long idx = desc.idx();

segmentSize.remove(idx);
Expand All @@ -1100,6 +1108,11 @@ private boolean hasIndex(long absIdx) {
cctx.kernalContext().encryption().onWalSegmentRemoved(desc.idx);
}

if (log.isInfoEnabled() && deletedSegments != null) {
log.info("Segments removed after WAL archive cleaning [cleanedSegments=" + deletedSegments
+ ", lastCpIdx=" + lastCpIdx + ", highIdx=" + highPtr.index() + ']');
}

return deleted;
}

Expand Down Expand Up @@ -2412,21 +2425,40 @@ private void deleteObsoleteRawSegments() {
Set<Long> indices = new HashSet<>();
Set<Long> duplicateIndices = new HashSet<>();

long lastCpIndex = lastCheckpointPtr.index();

for (FileDescriptor desc : descs) {
if (!indices.add(desc.idx))
duplicateIndices.add(desc.idx);
}

List<Long> deletedRawSegments = null;

for (FileDescriptor desc : descs) {
if (desc.isCompressed())
continue;

// Do not delete reserved or locked segment and any segment after it.
if (segmentReservedOrLocked(desc.idx))
return;
break;

if (desc.idx < lastCpIndex && duplicateIndices.contains(desc.idx)) {
long cleanedUpSize = deleteArchiveFiles(desc.file);

if (cleanedUpSize > 0) {
if (deletedRawSegments == null)
deletedRawSegments = new ArrayList<>();

deletedRawSegments.add(desc.idx);

segmentAware.addSize(desc.idx, -cleanedUpSize);
}
}
}

if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx))
segmentAware.addSize(desc.idx, -deleteArchiveFiles(desc.file));
if (log.isInfoEnabled() && deletedRawSegments != null) {
log.info("Raw segments removed after compression [deletedSegments=" + deletedRawSegments
+ ", lastCpIndex=" + lastCpIndex + ']');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

Expand Down Expand Up @@ -92,6 +95,9 @@ public class WalCompactionTest extends GridCommonAbstractTest {
/** Wal mode. */
private WALMode walMode;

/** Test logger. */
private ListeningTestLogger listeningLog;

/** */
private static class RolloverRecord extends CheckpointRecord {
/** */
Expand All @@ -104,6 +110,9 @@ private RolloverRecord() {
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);

if (listeningLog != null)
cfg.setGridLogger(listeningLog);

DataStorageConfiguration dsCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
Expand Down Expand Up @@ -176,6 +185,13 @@ private RolloverRecord() {
public void testBinaryRecoveryAfterFullCompaction() throws Exception {
int minNumberOfSegmentsToCompress = 5;

//checking that at least one segment is deleted eventually
LogListener logLsnr = LogListener.matches("Raw segments removed after compression [deletedSegments=[0")
.build();

listeningLog = new ListeningTestLogger(GridAbstractTest.log);
listeningLog.registerListener(logLsnr);

walSegments = minNumberOfSegmentsToCompress;
fileIoFactory = new CheckpointFailingIoFactory(false);

Expand All @@ -192,6 +208,9 @@ public void testBinaryRecoveryAfterFullCompaction() throws Exception {
() -> wal(ig).lastArchivedSegment() >= minNumberOfSegmentsToCompress,
getTestTimeout()));

// no raw segments are compressed/deleted until a cp covering at least one wal segment happens
assertFalse(logLsnr.check());

forceCheckpoint(ig);

loadData(cache, ENTRIES);
Expand All @@ -203,7 +222,11 @@ public void testBinaryRecoveryAfterFullCompaction() throws Exception {
() -> wal(ig).lastArchivedSegment() > lastCpIdx,
getTestTimeout()));

Collection<?> queue = GridTestUtils.getFieldValue(wal(ig), "segmentAware", "segmentCompressStorage", "segmentsToCompress");
//after the second checkpoint there are some wal segments that could be compressed and deleted
assertTrue(logLsnr.check());

Collection<?> queue = GridTestUtils.getFieldValue(wal(ig),
"segmentAware", "segmentCompressStorage", "segmentsToCompress");

assertTrue(GridTestUtils.waitForCondition(queue::isEmpty, getTestTimeout()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
Expand All @@ -41,8 +43,11 @@
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
Expand All @@ -62,16 +67,21 @@
*/
public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractTest {
/**
* Start grid with override default configuration via customConfigurator.
* Start grid with overridden default configuration via customConfigurator and with custom logger (if provided).
*/
private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator) throws Exception {
private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator,
@Nullable IgniteLogger customLog
) throws Exception {
IgniteConfiguration configuration = getConfiguration(getTestIgniteInstanceName());

if (customLog != null)
configuration.setGridLogger(customLog);

DataStorageConfiguration dbCfg = new DataStorageConfiguration();

dbCfg.setWalMode(walMode());
dbCfg.setWalSegmentSize(512 * 1024);
dbCfg.setCheckpointFrequency(60 * 1000);//too high value for turn off frequency checkpoint.
dbCfg.setCheckpointFrequency(60 * 1000); // very high value to disable triggering checkpoint by timeout.
dbCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100 * 1024 * 1024)
.setPersistenceEnabled(true));
Expand All @@ -87,6 +97,13 @@ private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator)
return ignite;
}

/**
* Start grid with overridden default configuration via customConfigurator and default logger.
*/
private Ignite startGrid(Consumer<DataStorageConfiguration> customConfigurator) throws Exception {
return startGrid(customConfigurator, null);
}

/** */
private CacheConfiguration<Integer, Object> cacheConfiguration() {
CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
Expand Down Expand Up @@ -125,9 +142,16 @@ private String findSourceMessage(Throwable ex) {
*/
@Test
public void testCorrectDeletedArchivedWalFiles() throws Exception {
ListeningTestLogger listeningLog = new ListeningTestLogger(GridAbstractTest.log);
//"Segments removed after WAL archive cleaning [cleanedSegments=["
LogListener listener = LogListener
.matches(Pattern.compile("Segments removed after WAL archive cleaning \\[cleanedSegments=\\[\\d+.wal"))
.build();
listeningLog.registerListener(listener);

//given: configured grid with setted max wal archive size
long maxWalArchiveSize = 2 * 1024 * 1024;
Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(maxWalArchiveSize));
Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(maxWalArchiveSize), listeningLog);

GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);

Expand Down Expand Up @@ -158,6 +182,8 @@ public void testCorrectDeletedArchivedWalFiles() throws Exception {
assertFalse(Stream.of(files).anyMatch(desc -> desc.file().getName().endsWith("00001.wal")));

assertTrue(!hist.checkpoints().isEmpty());

assertTrue(listener.check());
}

/**
Expand Down

0 comments on commit 77268b6

Please sign in to comment.