Skip to content

Commit

Permalink
GH-16146 Add ZSTD compression format
Browse files Browse the repository at this point in the history
  • Loading branch information
krasinski committed Apr 24, 2024
1 parent c12b264 commit 02d1efd
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 4 deletions.
1 change: 1 addition & 0 deletions h2o-core/build.gradle
Expand Up @@ -17,6 +17,7 @@ dependencies {
api 'org.javassist:javassist:3.28.0-GA'
api 'org.apache.commons:commons-math3:3.6.1'
api "commons-io:commons-io:2.11.0"
api 'com.github.luben:zstd-jni:1.5.6-2'
compileOnly "javax.servlet:javax.servlet-api:3.1.0"
api("com.github.wendykierp:JTransforms:3.1") { exclude module: "junit" }
api project(":h2o-jaas-pam")
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/schemas3/FramesV3.java
Expand Up @@ -47,7 +47,7 @@ public class FramesV3 extends RequestSchemaV3<Frames, FramesV3> {
@API(help="Output file format. Defaults to 'csv'.", values = { "csv", "parquet"} , json=false)
public ExportFileFormat format;

@API(help="Compression method (default none; gzip, bzip2 and snappy available depending on runtime environment)")
@API(help="Compression method (default none; gzip, bzip2, zstd and snappy available depending on runtime environment)")
public String compression;

@API(help="Specifies if checksum should be written next to data files on export (if supported by export format).")
Expand Down
12 changes: 12 additions & 0 deletions h2o-core/src/main/java/water/parser/ParseDataset.java
@@ -1,5 +1,6 @@
package water.parser;

import com.github.luben.zstd.ZstdInputStream;
import jsr166y.CountedCompleter;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveAction;
Expand Down Expand Up @@ -906,6 +907,17 @@ private FVecParseWriter makeDout(ParseSetup localSetup, int chunkOff, int nchunk
chunksAreLocal(vec,chunkStartIdx,key);
break;
}
case ZSTD: {
localSetup = ParserService.INSTANCE.getByInfo(localSetup._parse_type).setupLocal(vec, localSetup);
try (InputStream bvs = vec.openStream(_jobKey);
InputStream dec = decryptionTool.decryptInputStream(bvs);
ZstdInputStream zstdIs = new ZstdInputStream(dec)) {
_dout[_lo] = streamParse(zstdIs, localSetup, makeDout(localSetup, chunkStartIdx, vec.nChunks()), bvs);
}
_errors = _dout[_lo].removeErrors();
chunksAreLocal(vec, chunkStartIdx, key);
break;
}
}
Log.trace("Finished a map stage of a file parse with start index "+chunkStartIdx+".");
} catch( IOException ioe ) {
Expand Down
13 changes: 10 additions & 3 deletions h2o-core/src/main/java/water/parser/ZipUtil.java
@@ -1,5 +1,6 @@
package water.parser;

import com.github.luben.zstd.ZstdInputStream;
import water.DKV;
import water.Iced;
import water.Key;
Expand All @@ -23,7 +24,8 @@

public abstract class ZipUtil {

public enum Compression { NONE, ZIP, GZIP }
public enum Compression { NONE, ZIP, GZIP, ZSTD }
public static int ZSTD_MAGIC = 0xFD2FB528;

/**
* This method will attempt to read the few bytes off a file which will in turn be used
Expand Down Expand Up @@ -147,6 +149,8 @@ static Compression guessCompressionMethod(byte [] bits) {
return Compression.ZIP;
if( bits.length > 2 && (UnsafeUtils.get2(bits,0)&0xffff) == GZIPInputStream.GZIP_MAGIC )
return Compression.GZIP;
if (bits.length >= 4 && UnsafeUtils.get4(bits, 0) == ZSTD_MAGIC)
return Compression.ZSTD;
return Compression.NONE;
}

Expand Down Expand Up @@ -185,7 +189,7 @@ static byte[] unzipBytes( byte[] bs, Compression cmp, int chkSize ) {
if( cmp == Compression.NONE ) return bs; // No compression
// Wrap the bytes in a stream
ByteArrayInputStream bais = new ByteArrayInputStream(bs);
InflaterInputStream is = null;
InputStream is = null;
try {
if (cmp == Compression.ZIP) {
ZipInputStream zis = new ZipInputStream(bais);
Expand All @@ -194,7 +198,10 @@ static byte[] unzipBytes( byte[] bs, Compression cmp, int chkSize ) {
if (ze == null || ze.isDirectory())
zis.getNextEntry(); // read the next entry which should be a file
is = zis;
} else {
} else if (cmp == Compression.ZSTD) {
is = new ZstdInputStream(bais);
}
else {
assert cmp == Compression.GZIP;
is = new GZIPInputStream(bais);
}
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/util/CompressionFactory.java
@@ -1,5 +1,6 @@
package water.util;

import com.github.luben.zstd.ZstdOutputStream;
import water.Iced;

import java.io.ByteArrayOutputStream;
Expand All @@ -23,6 +24,8 @@ OutputStream wrapOutputStream(OutputStream os) throws IOException {
return os;
case "gzip":
return new GZIPOutputStream(os);
case "zstd":
return new ZstdOutputStream(os);
case "bzip2":
return wrapDynamic("org.python.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream", os);
case "snappy":
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/util/DecompressionFactory.java
@@ -1,5 +1,6 @@
package water.util;

import com.github.luben.zstd.ZstdInputStream;
import water.Iced;

import java.io.IOException;
Expand All @@ -26,6 +27,8 @@ InputStream wrapInputStream(InputStream is) throws IOException {
return wrapDynamic("org.python.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream", is);
case "snappy":
return wrapDynamic("org.xerial.snappy.SnappyInputStream", is);
case "zstd":
return new ZstdInputStream(is);
default:
return wrapDynamic(_name, is);
}
Expand Down
37 changes: 37 additions & 0 deletions h2o-py/tests/testdir_misc/pyunit_export_zstd.py
@@ -0,0 +1,37 @@
import sys
sys.path.insert(1,"../../../")
import h2o
from tests import pyunit_utils
from os import path
import struct

'''
Export file with h2o.export_file compressed with 'zstd'
'''


def is_zstd_file(path):
with open(path, 'rb') as f:
magic_bytes = f.read(4)
return struct.unpack('<I', magic_bytes)[0] == 0xFD2FB528


def export_zstd():
prostate = h2o.import_file(pyunit_utils.locate("smalldata/prostate/prostate.csv"))
target = path.join(pyunit_utils.locate("results"), "prostate_export.csv.zst")
h2o.export_file(prostate, target, compression="zstd")

assert is_zstd_file(target)

prostate_zstd = h2o.import_file(target)

assert pyunit_utils.compare_frames(prostate, prostate_zstd, numElements=2)


if __name__ == "__main__":
pyunit_utils.standalone_test(export_zstd)
else:
export_zstd()



0 comments on commit 02d1efd

Please sign in to comment.