From 756da93363e71b78874676ededfe5ee34a53a135 Mon Sep 17 00:00:00 2001 From: "Unknown W. Brackets" Date: Sat, 1 Nov 2014 18:28:19 -0700 Subject: [PATCH] Fix a bug introduced in the last change. --- src/compress.h | 2 +- src/output.cpp | 30 ++++++++++++++---------------- src/sector.cpp | 9 ++++++++- src/sector.h | 1 + 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/compress.h b/src/compress.h index b15e1a241..ed7ce7058 100644 --- a/src/compress.h +++ b/src/compress.h @@ -7,7 +7,7 @@ namespace maxcso { -static const char *VERSION = "1.4.1"; +static const char *VERSION = "1.4.2"; struct Task; diff --git a/src/output.cpp b/src/output.cpp index c90f694b7..8c65e984d 100644 --- a/src/output.cpp +++ b/src/output.cpp @@ -116,21 +116,19 @@ void Output::Enqueue(int64_t pos, uint8_t *buffer) { HandleReadySector(sector); }); - if (blockSize_ != SECTOR_SIZE) { - const bool lastBlock = pos + SECTOR_SIZE >= srcSize_; - const bool needsPad = pos + blockSize_ > srcSize_; - if (lastBlock && needsPad) { - // Our src is not aligned to the blockSize_, so this sector might not ever wake up. - // So let's send in some padding. - for (int64_t padPos = srcSize_; padPos < pos + blockSize_; padPos += SECTOR_SIZE) { - // Sector takes ownership, so we need a new one each time. - uint8_t *padBuffer = pool.Alloc(); - memset(padBuffer, 0, SECTOR_SIZE); - sector->Process(padPos, padBuffer, [this, sector, block](bool status, const char *reason) { - partialSectors_.erase(block); - HandleReadySector(sector); - }); - } + // Only check for the last block of a larger block size. + if (blockSize_ != SECTOR_SIZE && pos + SECTOR_SIZE >= srcSize_) { + // Our src may not be aligned to the blockSize_, so this sector might never wake up. + // So let's send in some padding if needed. + const int64_t paddedSize = (srcSize_ + blockSize_ - 1) & ~static_cast(blockSize_ - 1); + for (int64_t padPos = srcSize_; padPos < paddedSize; padPos += SECTOR_SIZE) { + // Sector takes ownership, so we need a new one each time. + uint8_t *padBuffer = pool.Alloc(); + memset(padBuffer, 0, SECTOR_SIZE); + sector->Process(padPos, padBuffer, [this, sector, block](bool status, const char *reason) { + partialSectors_.erase(block); + HandleReadySector(sector); + }); } } } @@ -248,7 +246,7 @@ void Output::HandleReadySector(Sector *sector) { progress_(srcPos_, srcSize_, dstPos_); - if (nextPos == srcSize_) { + if (nextPos >= srcSize_) { state_ |= STATE_DATA_WRITTEN; CheckFinish(); } else { diff --git a/src/sector.cpp b/src/sector.cpp index adc984c82..632e89090 100644 --- a/src/sector.cpp +++ b/src/sector.cpp @@ -23,7 +23,7 @@ static void EndZlib(z_stream *&z) { } Sector::Sector(uint32_t flags, uint32_t orig_max_cost, uint32_t lz4_max_cost) - : flags_(flags), origMaxCost_(orig_max_cost), lz4MaxCost_(lz4_max_cost), busy_(false), + : flags_(flags), origMaxCost_(orig_max_cost), lz4MaxCost_(lz4_max_cost), busy_(false), enqueued_(false), compress_(true), readySize_(0), buffer_(nullptr), best_(nullptr) { // Set up the zlib streams, which we will reuse each time we hit this sector. if (!(flags_ & TASKFLAG_NO_ZLIB_DEFAULT)) { @@ -90,7 +90,13 @@ void Sector::Process(int64_t pos, uint8_t *buffer, SectorCallback ready) { return; } + if (enqueued_) { + ready_(false, "Sector already waiting for queued operation"); + return; + } + if (compress_) { + enqueued_ = true; ready_ = ready; uv_.queue_work(loop_, &work_, [this](uv_work_t *req) { Compress(); @@ -274,6 +280,7 @@ void Sector::Release() { } busy_ = false; + enqueued_ = false; compress_ = true; readySize_ = 0; } diff --git a/src/sector.h b/src/sector.h index ec6e1a1c6..911ce80f8 100644 --- a/src/sector.h +++ b/src/sector.h @@ -86,6 +86,7 @@ class Sector { uint32_t origMaxCost_; uint32_t lz4MaxCost_; bool busy_; + bool enqueued_; bool compress_; uint32_t blockSize_;