Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support standard brotli compression/decompression (.br format compressible with single-threaded brotli) #351

Merged
merged 4 commits into from Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion C/zstdmt/brotli-mt.h
Expand Up @@ -101,7 +101,7 @@ typedef struct BROTLIMT_CCtx_s BROTLIMT_CCtx;
* @inputsize - if zero, becomes some optimal value for the level
* - if nonzero, the given value is taken
*/
BROTLIMT_CCtx *BROTLIMT_createCCtx(int threads, int level, int inputsize);
BROTLIMT_CCtx *BROTLIMT_createCCtx(int threads, uint64_t unpackSize, int level, int inputsize, int lgwin);

/**
* 2) threaded compression
Expand Down
174 changes: 154 additions & 20 deletions C/zstdmt/brotli-mt_compress.c
Expand Up @@ -15,6 +15,7 @@
#include <string.h>

#include "encode.h"
#include "common/constants.h"

#include "brotli-mt.h"
#include "memmt.h"
Expand Down Expand Up @@ -52,12 +53,17 @@ struct BROTLIMT_CCtx_s {
/* levels: 1..BROTLIMT_LEVEL_MAX */
int level;

/* threads: 1..BROTLIMT_THREAD_MAX */
/* threads: 0..BROTLIMT_THREAD_MAX */
int threads;

/* size of file/stream to compress if known */
uint64_t unpackSize;

/* should be used for read from input */
int inputsize;

int lgwin;

/* statistic */
size_t insize;
size_t outsize;
Expand Down Expand Up @@ -87,7 +93,7 @@ struct BROTLIMT_CCtx_s {
* Compression
****************************************/

BROTLIMT_CCtx *BROTLIMT_createCCtx(int threads, int level, int inputsize)
BROTLIMT_CCtx *BROTLIMT_createCCtx(int threads, uint64_t unpackSize, int level, int inputsize, int lgwin)
{
BROTLIMT_CCtx *ctx;
int t;
Expand All @@ -98,7 +104,7 @@ BROTLIMT_CCtx *BROTLIMT_createCCtx(int threads, int level, int inputsize)
return 0;

/* check threads value */
if (threads < 1 || threads > BROTLIMT_THREAD_MAX)
if (threads < 0 || threads > BROTLIMT_THREAD_MAX)
return 0;

/* check level */
Expand All @@ -111,30 +117,37 @@ BROTLIMT_CCtx *BROTLIMT_createCCtx(int threads, int level, int inputsize)
else
ctx->inputsize = 1024 * 1024 * (level ? level : 1);

ctx->lgwin = lgwin;

/* setup ctx */
ctx->level = level;
ctx->threads = threads;
ctx->unpackSize = unpackSize;
ctx->insize = 0;
ctx->outsize = 0;
ctx->frames = 0;
ctx->curframe = 0;

pthread_mutex_init(&ctx->read_mutex, NULL);
pthread_mutex_init(&ctx->write_mutex, NULL);
if (threads) {
pthread_mutex_init(&ctx->read_mutex, NULL);
pthread_mutex_init(&ctx->write_mutex, NULL);

/* free -> busy -> out -> free -> ... */
INIT_LIST_HEAD(&ctx->writelist_free); /* free, can be used */
INIT_LIST_HEAD(&ctx->writelist_busy); /* busy */
INIT_LIST_HEAD(&ctx->writelist_done); /* can be written */
/* free -> busy -> out -> free -> ... */
INIT_LIST_HEAD(&ctx->writelist_free); /* free, can be used */
INIT_LIST_HEAD(&ctx->writelist_busy); /* busy */
INIT_LIST_HEAD(&ctx->writelist_done); /* can be written */

ctx->cwork = (cwork_t *) malloc(sizeof(cwork_t) * threads);
if (!ctx->cwork)
goto err_cwork;
ctx->cwork = (cwork_t *) malloc(sizeof(cwork_t) * threads);
if (!ctx->cwork)
goto err_cwork;

for (t = 0; t < threads; t++) {
cwork_t *w = &ctx->cwork[t];
w->ctx = ctx;
}
for (t = 0; t < threads; t++) {
cwork_t *w = &ctx->cwork[t];
w->ctx = ctx;
}
} else {
ctx->cwork = NULL;
}

return ctx;

Expand Down Expand Up @@ -269,7 +282,7 @@ static void *pt_compress(void *arg)
uint8_t *obuf = (uint8_t*)wl->out.buf + 16;
wl->out.size -= 16;
rv = BrotliEncoderCompress(ctx->level,
BROTLI_MAX_WINDOW_BITS,
ctx->lgwin,
BROTLI_MODE_GENERIC, in.size,
ibuf, &wl->out.size, obuf);

Expand Down Expand Up @@ -319,6 +332,119 @@ static void *pt_compress(void *arg)
return 0;
}

/* single threaded (standard brotli stream, without header/mt-frames) */
static size_t st_compress(void *arg)
{
BROTLIMT_CCtx *ctx = (BROTLIMT_CCtx *) arg;
BrotliEncoderOperation brop = BROTLI_OPERATION_PROCESS;
BROTLIMT_Buffer Out;
BROTLIMT_Buffer *out = &Out;
BROTLIMT_Buffer In;
BROTLIMT_Buffer *in = &In;
BrotliEncoderState *state;
const uint8_t* next_in;
uint8_t* next_out;
int rv;
size_t retval = 0;

/* allocate space for input buffer (default 1M * level) */
in->allocated = ctx->inputsize;
in->buf = malloc(in->allocated);
if (!in->buf)
return MT_ERROR(memory_allocation);
next_in = in->buf;
in->size = 0;

/* allocate space for output buffer */
out->allocated = out->size = ctx->inputsize / 4;
out->buf = malloc(out->size);
if (!out->buf) {
free(in->buf);
return MT_ERROR(memory_allocation);
}
next_out = out->buf;

state = BrotliEncoderCreateInstance(NULL, NULL, NULL);
if (!state) {
free(in->buf);
free(out->buf);
return MT_ERROR(memory_allocation);
}

BrotliEncoderSetParameter(state, BROTLI_PARAM_QUALITY, (uint32_t)ctx->level);
if (ctx->lgwin > 0) {
/* Specified by user. */
/* Do not enable "large-window" extension, if not required. */
if (ctx->lgwin > BROTLI_MAX_WINDOW_BITS) {
BrotliEncoderSetParameter(state, BROTLI_PARAM_LARGE_WINDOW, 1u);
}
BrotliEncoderSetParameter(state, BROTLI_PARAM_LGWIN, (uint32_t)ctx->lgwin);
} else {
/* 0, or not specified by user; could be chosen by compressor. */
uint32_t lgwin = 24 /* DEFAULT_LGWIN */;
/* Use file size to limit lgwin. */
if (ctx->unpackSize >= 0) {
lgwin = BROTLI_MIN_WINDOW_BITS;
while (BROTLI_MAX_BACKWARD_LIMIT(lgwin) <
(uint64_t)ctx->unpackSize) {
lgwin++;
if (lgwin == BROTLI_MAX_WINDOW_BITS) break;
}
}
BrotliEncoderSetParameter(state, BROTLI_PARAM_LGWIN, lgwin);
}
if (ctx->unpackSize > 0) {
uint32_t size_hint = ctx->unpackSize < (1 << 30) ?
(uint32_t)ctx->unpackSize : (1u << 30);
BrotliEncoderSetParameter(state, BROTLI_PARAM_SIZE_HINT, size_hint);
}

while (1) {
if (in->size == 0 && brop != BROTLI_OPERATION_FINISH) {
in->size = in->allocated;
rv = ctx->fn_read(ctx->arg_read, in);
if (rv != 0) {
retval = mt_error(rv);
goto done;
}
if (in->size == 0) {
brop = BROTLI_OPERATION_FINISH; // eof
}
next_in = in->buf;
}

if (!BrotliEncoderCompressStream(state, brop, &in->size, &next_in, &out->size, &next_out, 0)) {
retval = MT_ERROR(frame_compress);
goto done;
}
if (out->size == 0) {
out->size = next_out - (uint8_t*)out->buf;
rv = ctx->fn_write(ctx->arg_write, out);
if (rv != 0) {
retval = mt_error(rv);
goto done;
}
next_out = out->buf;
out->size = out->allocated;
}
if (BrotliEncoderIsFinished(state)) {
out->size = next_out - (uint8_t*)out->buf;
rv = ctx->fn_write(ctx->arg_write, out);
if (rv != 0) {
retval = mt_error(rv);
goto done;
}
break;
}
}

done:
free(in->buf);
free(out->buf);
BrotliEncoderDestroyInstance(state);
return retval;
}

size_t BROTLIMT_compressCCtx(BROTLIMT_CCtx * ctx, BROTLIMT_RdWr_t * rdwr)
{
int t;
Expand All @@ -333,6 +459,12 @@ size_t BROTLIMT_compressCCtx(BROTLIMT_CCtx * ctx, BROTLIMT_RdWr_t * rdwr)
ctx->arg_read = rdwr->arg_read;
ctx->arg_write = rdwr->arg_write;

/* single threaded brotli (no header, no mt-frames) */
if (ctx->threads == 0) {
/* decompress single threaded */
return st_compress(ctx);
}

/* start all workers */
for (t = 0; t < ctx->threads; t++) {
cwork_t *w = &ctx->cwork[t];
Expand Down Expand Up @@ -394,9 +526,11 @@ void BROTLIMT_freeCCtx(BROTLIMT_CCtx * ctx)
if (!ctx)
return;

pthread_mutex_destroy(&ctx->read_mutex);
pthread_mutex_destroy(&ctx->write_mutex);
free(ctx->cwork);
if (ctx->threads) {
pthread_mutex_destroy(&ctx->read_mutex);
pthread_mutex_destroy(&ctx->write_mutex);
free(ctx->cwork);
}
free(ctx);
ctx = 0;

Expand Down