Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CacheCompressionManager removed, writeInternal from FilePageStoreMana… #239

Open
wants to merge 1 commit into
base: ignite-defragmentation-feature-branch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void initializeForCache(CacheGroupDescriptor grpDesc, StoredCacheData cac
* @param grpId Cache group ID.
* @param pageId Page ID.
* @param pageBuf Page buffer to write.
* @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated
* @throws IgniteCheckedException If failed to write page.
*/
public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Store manager. */
private CacheStoreManager storeMgr;

/** Compression manager. */
private CacheCompressionManager compressMgr;

/** Replication manager. */
private GridCacheDrManager drMgr;

Expand Down Expand Up @@ -336,7 +333,6 @@ public GridCacheContext(
* ===========================
*/

CacheCompressionManager compressMgr,
GridCacheEventManager evtMgr,
CacheStoreManager storeMgr,
CacheEvictionManager evictMgr,
Expand All @@ -355,7 +351,6 @@ public GridCacheContext(
assert cacheCfg != null;
assert locStartTopVer != null : cacheCfg.getName();

assert compressMgr != null;
assert grp != null;
assert evtMgr != null;
assert storeMgr != null;
Expand All @@ -382,7 +377,6 @@ public GridCacheContext(
* Managers in starting order!
* ===========================
*/
this.compressMgr = add(compressMgr);
this.evtMgr = add(evtMgr);
this.storeMgr = add(storeMgr);
this.evictMgr = add(evictMgr);
Expand Down Expand Up @@ -1246,13 +1240,6 @@ public GridCacheVersion[] emptyVersion() {
return EMPTY_VERSION;
}

/**
* @return Compression manager.
*/
public CacheCompressionManager compress() {
return compressMgr;
}

/**
* Sets cache object context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,6 @@ private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {

boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg);

CacheCompressionManager compressMgr = new CacheCompressionManager();
GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
GridCacheEventManager evtMgr = new GridCacheEventManager();
CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled())
Expand Down Expand Up @@ -1258,7 +1257,6 @@ private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {
* Managers in starting order!
* ===========================
*/
compressMgr,
evtMgr,
storeMgr,
evictMgr,
Expand Down Expand Up @@ -1397,7 +1395,6 @@ private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {
* Managers in starting order!
* ===========================
*/
compressMgr,
evtMgr,
storeMgr,
evictMgr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
Expand Down Expand Up @@ -3159,7 +3160,17 @@ private void finalizeCheckpointOnRecovery(
long pageId = fullPageId.pageId();

// Write buf to page store.
PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
int partId = PageIdUtils.partId(pageId);
PageStore store = storeMgr.getStore(groupId, partId);

try {
store.write(pageId, buf, tag, true);
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

throw e;
}

// Save store for future fsync.
updStores.add(store);
Expand Down Expand Up @@ -4763,7 +4774,18 @@ private PageStoreWriter createPageStoreWriter(Map<PageMemoryEx, List<FullPageId>

getCheckpointer().currentProgress().updateWrittenPages(1);

PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
int partId = PageIdUtils.partId(pageId);

PageStore store = storeMgr.getStore(groupId, partId);

try {
store.write(pageId, buf, tag, true);
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

throw e;
}

updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,67 +588,26 @@ public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) th

/** {@inheritDoc} */
@Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
writeInternal(grpId, pageId, pageBuf, tag, true);
}

/** {@inheritDoc} */
@Override public long pageOffset(int grpId, long pageId) throws IgniteCheckedException {
PageStore store = getStore(grpId, PageIdUtils.partId(pageId));

return store.pageOffset(pageId);
}

/**
* @param cacheId Cache ID to write.
* @param pageId Page ID.
* @param pageBuf Page buffer.
* @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated
* @param calculateCrc if {@code False} crc calculation will be forcibly skipped.
* @return PageStore to which the page has been written.
* @throws IgniteCheckedException If IO error occurred.
*/
public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc)
throws IgniteCheckedException {
int partId = PageIdUtils.partId(pageId);

PageStore store = getStore(cacheId, partId);
PageStore store = getStore(grpId, partId);

try {
int pageSize = store.getPageSize();
int compressedPageSize = pageSize;

GridCacheContext cctx0 = cctx.cacheContext(cacheId);

if (cctx0 != null) {
assert pageBuf.position() == 0 && pageBuf.limit() == pageSize : pageBuf;

ByteBuffer compressedPageBuf = cctx0.compress().compressPage(pageBuf, store);

if (compressedPageBuf != pageBuf) {
compressedPageSize = PageIO.getCompressedSize(compressedPageBuf);

if (!calculateCrc) {
calculateCrc = true;
PageIO.setCrc(compressedPageBuf, 0); // It will be recalculated over compressed data further.
}

PageIO.setCrc(pageBuf, 0); // It is expected to be reset to 0 after each write.
pageBuf = compressedPageBuf;
}
}

store.write(pageId, pageBuf, tag, calculateCrc);

if (pageSize > compressedPageSize)
store.punchHole(pageId, compressedPageSize); // TODO maybe add async punch mode?
// Do we still need to set calculateCrc as true?
store.write(pageId, pageBuf, tag, true);
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

throw e;
}
}

return store;
/** {@inheritDoc} */
@Override public long pageOffset(int grpId, long pageId) throws IgniteCheckedException {
PageStore store = getStore(grpId, PageIdUtils.partId(pageId));

return store.pageOffset(pageId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.CacheCompressionManager;
import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager;
import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheType;
Expand Down Expand Up @@ -98,7 +97,6 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception {
true,
false,
false,
new CacheCompressionManager(),
new GridCacheEventManager(),
new CacheOsStoreManager(null, new CacheConfiguration()),
new GridCacheEvictionManager(),
Expand Down