Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compressed output format (jsongz) for rethinkdb export/import #251

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 74 additions & 3 deletions rethinkdb/_export.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import tempfile
import time
import traceback
import zlib
from multiprocessing.queues import SimpleQueue

import six
Expand All @@ -48,7 +49,7 @@

usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR]
[-e (DB | DB.TABLE)]...
[--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER]
[--format (csv | json | ndjson | jsongz)] [--fields FIELD,FIELD...] [--delimiter CHARACTER]
[--clients NUM]"""
help_description = (
"`rethinkdb export` exports data from a RethinkDB cluster into a directory"
Expand Down Expand Up @@ -118,11 +119,11 @@ def parse_options(argv, prog=None):
parser.add_option(
"--format",
dest="format",
metavar="json|csv|ndjson",
metavar="json|csv|ndjson|jsongz",
default="json",
help="format to write (defaults to json. ndjson is newline delimited json.)",
type="choice",
choices=["json", "csv", "ndjson"],
choices=["json", "csv", "ndjson", "jsongz"],
)
parser.add_option(
"--clients",
Expand Down Expand Up @@ -150,6 +151,17 @@ def parse_options(argv, prog=None):
)
parser.add_option_group(csvGroup)

jsongzGroup = optparse.OptionGroup(parser, "jsongz options")
jsongzGroup.add_option(
"--compression-level",
dest="compression_level",
metavar="NUM",
default=None,
help="compression level, an integer from 0 to 9 (defaults to -1 default zlib compression)",
type="int",
)
parser.add_option_group(jsongzGroup)

options, args = parser.parse_args(argv)

# -- Check validity of arguments
Expand Down Expand Up @@ -185,6 +197,15 @@ def parse_options(argv, prog=None):
if options.delimiter:
parser.error("--delimiter option is only valid for CSV file formats")

if options.format == "jsongz":
if options.compression_level is None:
options.compression_level = -1
elif options.compression_level < 0 or options.compression_level > 9:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if someone passes -1 as option?
In my opinion it should be changed into elif options.compression_level < -1 or options.compression_level > 9:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was: passing -1 is the same as not specifying the compression level, it's not really setting the compression_level.
Happy to make the change as suggested though, it might make it easier to switch between setting and not setting the compression level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your reasoning, but then if someone passes -1, if options.compression_level is None is evaluated False, if options.compression_level < 0 or options.compression_level > 9 is evaluated to True and raises an exception. Which is incorrect, since -1 is an acceptable value.

If you have another suggestion on how to handle such situation, feel free to add it. Mine was just a potential suggestion on how to handle it.

All in all, It's no big deal to support -1, but could prevent some errors/exceptions, since I assume that the default value is an acceptable value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested.

parser.error("--compression-level must be an integer from 0 and 9")
else:
if options.compression_level:
parser.error("--compression-level option is only valid for jsongz file formats")

# -

return options
Expand Down Expand Up @@ -226,6 +247,43 @@ def json_writer(filename, fields, task_queue, error_queue, format):
pass


def jsongz_writer(filename, fields, task_queue, error_queue, format, compression_level):
try:
with open(filename, "wb") as out:
# wbits 31 = MAX_WBITS + gzip header and trailer
compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31)
def compress_and_write(str):
out.write(compressor.compress(str.encode("utf-8")))

first = True
compress_and_write("[")
item = task_queue.get()
while not isinstance(item, StopIteration):
row = item[0]
if fields is not None:
for item in list(row.keys()):
if item not in fields:
del row[item]
if first:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies that the objects in the JSON array are each on a separate line.

I'm no compression expert, but since it'll be binary and unreadable from a high level perspective, why not skip the new line? Object would be written as follows
[{...},{...},{...}...{...}]
which would save n + 1 + 1 new lines (n-1 between each object, plus 2 for the first and the last, + 1 for the last row which EOF and is good. On huge table this could imply saving a considerable amount of bits. @gabor-boros do you have any knowledge about the topic?

Or maybe I'm wrong and the \n are used for compression. Let me know, I'm curious now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here is replicating what the json export does. \n are not used for compression.

One can still unpack the jsongz file get the json file from inside (it's a standard gzip file), in which case the formatting might help. I considered the gains from removing the \n-s marginal, but you might be right. If you have really small documents, the extra end lines might make a difference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should say that import script includes a custom json paarser (which I found odd, not sure what the reason for using a custom parser was, performance perhaps?) which might be affected by the lack of new lines (I expect it to be happy though).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have really small documents, the extra end lines might make a difference

Did you mean big documents? Hehe

Regarding the custom json parser, I have no clue why there's a custom one. Probably, when the library has been written, there were parsers that did not fit/match the requirements. Nowadays there are tons of high performance parsers. In order to not break anything, I would keep the custom one for now.

compress_and_write("\n")
first = False
else:
compress_and_write(",\n")

compress_and_write(json.dumps(row))
item = task_queue.get()

compress_and_write("\n]\n")
out.write(compressor.flush())
except BaseException:
ex_type, ex_class, tb = sys.exc_info()
error_queue.put((ex_type, ex_class, traceback.extract_tb(tb)))

# Read until the exit task so the readers do not hang on pushing onto the queue
while not isinstance(task_queue.get(), StopIteration):
pass


def csv_writer(filename, fields, delimiter, task_queue, error_queue):
try:
with open(filename, "w") as out:
Expand Down Expand Up @@ -331,6 +389,19 @@ def export_table(
options.format,
),
)
elif options.format == "jsongz":
filename = directory + "/%s/%s.jsongz" % (db, table)
writer = multiprocessing.Process(
target=jsongz_writer,
args=(
filename,
options.fields,
task_queue,
error_queue,
options.format,
options.compression_level,
),
)
elif options.format == "csv":
filename = directory + "/%s/%s.csv" % (db, table)
writer = multiprocessing.Process(
Expand Down
72 changes: 57 additions & 15 deletions rethinkdb/_import.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import optparse
import os
import signal
import struct
import sys
import time
import traceback
import zlib
from multiprocessing.queues import Queue, SimpleQueue

import six
Expand All @@ -56,6 +58,9 @@
JSON_MAX_BUFFER_SIZE = 128 * 1024 * 1024
MAX_NESTING_DEPTH = 100

# jsongz parameters
JSON_GZ_READ_CHUNK_SIZE = 16 * 1024

Error = collections.namedtuple("Error", ["message", "traceback", "file"])


Expand Down Expand Up @@ -133,7 +138,10 @@ def __init__(
self._source = source
else:
try:
self._source = codecs.open(source, mode="r", encoding="utf-8")
if self.format == "jsongz":
self._source = open(source, mode="rb")
else:
self._source = codecs.open(source, mode="r", encoding="utf-8")
except IOError as exc:
default_logger.exception(exc)
raise ValueError(
Expand All @@ -145,9 +153,16 @@ def __init__(
and self._source.name
and os.path.isfile(self._source.name)
):
self._bytes_size.value = os.path.getsize(source)
self._bytes_size.value = os.path.getsize(self._source.name)
if self._bytes_size.value == 0:
raise ValueError("Source is zero-length: %s" % source)
raise ValueError("Source is zero-length: %s" % self._source.name)

# get uncompressed file length from gzip trailer (last 4 bytes)
if self.format == "jsongz":
# TODO: check valid gzip
self._source.seek(-4, 2)
self._bytes_size.value = struct.unpack("I", self._source.read(4))[0]
self._source.seek(0)

# table info
self.db = db
Expand Down Expand Up @@ -500,6 +515,9 @@ class JsonSourceFile(SourceFile):
_buffer_pos = None
_buffer_end = None

def read_chunk(self, max_length):
return self._source.read(max_length)

def fill_buffer(self):
if self._buffer_str is None:
self._buffer_str = ""
Expand All @@ -520,7 +538,7 @@ def fill_buffer(self):
if read_target < 1:
raise AssertionError("Can not set the read target and full the buffer")

new_chunk = self._source.read(read_target)
new_chunk = self.read_chunk(read_target)

if len(new_chunk) == 0:
raise StopIteration() # file ended
Expand Down Expand Up @@ -634,6 +652,28 @@ def teardown(self):
)


class JsonGzSourceFile(JsonSourceFile):
format = "jsongz"

def __init__(self, *args, **kwargs):

# initialize zlib decompressor
# wbits 31 = window size MAX_WBITS & expects gzip header and trailer
self._decompressor = zlib.decompressobj(31)

super(JsonGzSourceFile, self).__init__(*args, **kwargs)

def read_chunk(self, max_length):
chunk = b''
while len(chunk) < max_length:
compressed_buf = self._decompressor.unconsumed_tail + self._source.read(JSON_GZ_READ_CHUNK_SIZE)
if len(compressed_buf) == 0:
break
decompressed_buf = self._decompressor.decompress(compressed_buf, max_length - len(chunk))
chunk += decompressed_buf
return chunk.decode("utf-8")


class CsvSourceFile(SourceFile):
format = "csv"

Expand Down Expand Up @@ -855,11 +895,11 @@ def parse_options(argv, prog=None):
file_import_group.add_option(
"--format",
dest="format",
metavar="json|csv",
metavar="json|jsongz|csv",
default=None,
help="format of the file (default: json, accepts newline delimited json)",
type="choice",
choices=["json", "csv"],
choices=["json", "jsongz", "csv"],
)
file_import_group.add_option(
"--pkey",
Expand Down Expand Up @@ -1036,7 +1076,7 @@ def parse_options(argv, prog=None):
if options.custom_header:
options.custom_header = options.custom_header.split(",")

elif options.format == "json":
elif (options.format == "json" or options.format == "jsongz") :
# disallow invalid options
if options.delimiter is not None:
parser.error("--delimiter option is not valid for json files")
Expand All @@ -1045,13 +1085,6 @@ def parse_options(argv, prog=None):
if options.custom_header is not None:
parser.error("--custom-header option is not valid for json files")

# default options
options.format = "json"

if options.max_document_size > 0:
global JSON_MAX_BUFFER_SIZE
JSON_MAX_BUFFER_SIZE = options.max_document_size

options.file = os.path.abspath(options.file)

else:
Expand All @@ -1062,6 +1095,11 @@ def parse_options(argv, prog=None):

# --

# max_document_size - json
if options.max_document_size > 0:
global JSON_MAX_BUFFER_SIZE
JSON_MAX_BUFFER_SIZE = options.max_document_size

# max_nesting_depth
if options.max_nesting_depth > 0:
global MAX_NESTING_DEPTH
Expand Down Expand Up @@ -1552,6 +1590,8 @@ def parse_info_file(path):
table_type_options = None
if ext == ".json":
table_type = JsonSourceFile
elif ext == ".jsongz":
table_type = JsonGzSourceFile
elif ext == ".csv":
table_type = CsvSourceFile
table_type_options = {
Expand Down Expand Up @@ -1622,7 +1662,7 @@ def parse_info_file(path):
table, ext = os.path.splitext(filename)
table = os.path.basename(table)

if ext not in [".json", ".csv", ".info"]:
if ext not in [".json", ".jsongz", ".csv", ".info"]:
files_ignored.append(os.path.join(root, filename))
elif ext == ".info":
pass # Info files are included based on the data files
Expand Down Expand Up @@ -1657,6 +1697,8 @@ def parse_info_file(path):
table_type = None
if ext == ".json":
table_type = JsonSourceFile
elif ext == ".jsongz":
table_type = JsonGzSourceFile
elif ext == ".csv":
table_type = CsvSourceFile
else:
Expand Down