Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-2773: [C] Add decimal logical schema #2891

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions lang/c/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ source_group(Avro FILES ${AVRO_SRC})
string(REPLACE ":" "." LIBAVRO_DOT_VERSION ${LIBAVRO_VERSION})

add_library(avro-static STATIC ${AVRO_SRC})
target_link_libraries(avro-static ${JANSSON_LIBRARIES} ${CODEC_LIBRARIES} ${THREADS_LIBRARIES})
target_link_libraries(avro-static ${JANSSON_LIBRARIES} ${CODEC_LIBRARIES} ${THREADS_LIBRARIES} m)
set_target_properties(avro-static PROPERTIES OUTPUT_NAME avro)

if (NOT WIN32)
# TODO: Create Windows DLLs. See https://www.cmake.org/Wiki/BuildingWinDLL
add_library(avro-shared SHARED ${AVRO_SRC})
target_link_libraries(avro-shared ${JANSSON_LIBRARIES} ${CODEC_LIBRARIES} ${THREADS_LIBRARIES})
target_link_libraries(avro-shared ${JANSSON_LIBRARIES} ${CODEC_LIBRARIES} ${THREADS_LIBRARIES} m)
set_target_properties(avro-shared PROPERTIES
OUTPUT_NAME avro
VERSION ${LIBAVRO_DOT_VERSION}
Expand Down
2 changes: 2 additions & 0 deletions lang/c/src/avro/basics.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum avro_type_t {
AVRO_UNION,
AVRO_LINK,
AVRO_INVALID = EINVAL,
AVRO_DECIMAL,
};
typedef enum avro_type_t avro_type_t;

Expand Down Expand Up @@ -91,6 +92,7 @@ struct avro_obj_t {
#define is_avro_complex_type(obj) (!(is_avro_primitive(obj))
#define is_avro_link(obj) (obj && avro_typeof(obj) == AVRO_LINK)

#define is_avro_decimal(obj) (obj && avro_typeof(obj) == AVRO_DECIMAL)


CLOSE_EXTERN
Expand Down
11 changes: 11 additions & 0 deletions lang/c/src/avro/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,17 @@ struct avro_consumer_t {
avro_consumer_t **branch_consumer,
void **branch_user_data,
void *user_data);

/**
* Called when a decimal value is encountered. The @ref value
* pointer is only guaranteed to be valid for the duration of
* the callback function. If you need to save the data for
* processing later, you must copy it into another buffer.
*/

int (*decimal_value)(avro_consumer_t *consumer,
const void *value, size_t value_len,
void *user_data);
};


Expand Down
7 changes: 7 additions & 0 deletions lang/c/src/avro/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ avro_schema_t avro_schema_union_branch_by_name
avro_schema_t avro_schema_link(avro_schema_t schema);
avro_schema_t avro_schema_link_target(avro_schema_t schema);

avro_schema_t avro_schema_decimal(const avro_schema_t underlying,
size_t scale, size_t precision);
size_t avro_schema_decimal_scale(const avro_schema_t decimal);
size_t avro_schema_decimal_precision(const avro_schema_t decimal);

avro_schema_t avro_schema_logical_underlying(const avro_schema_t logical);

typedef struct avro_schema_error_t_ *avro_schema_error_t;

int avro_schema_from_json(const char *jsontext, int32_t unused1,
Expand Down
23 changes: 23 additions & 0 deletions lang/c/src/avro/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ struct avro_value_iface {
int (*set_branch)(const avro_value_iface_t *iface,
void *self, int discriminant,
avro_value_t *branch);

/*
* Returns the unscaled value, along with the values to the left and
* right hand sides of the decimal point. Any of unscaled, lhs, and rhs
* may be NULL.
*/
int (*get_decimal)(const avro_value_iface_t *iface,
const void *self, int64_t *unscaled,
int64_t *lhs, uint64_t *rhs);

/*
* Sets the decimal value from either unscaled or lhs and rhs. If
* unscaled is non-NULL, lhs and rhs are ignored. Otherwise, lhs and
* rhs must be non-NULL.
*/
int (*set_decimal)(const avro_value_iface_t *iface,
void *self, const int64_t *unscaled,
const int64_t *lhs, const uint64_t *rhs);
};


Expand Down Expand Up @@ -494,5 +512,10 @@ avro_value_to_json(const avro_value_t *value,
#define avro_value_set_branch(value, discriminant, branch) \
avro_value_call(value, set_branch, EINVAL, discriminant, branch)

#define avro_value_get_decimal(value, unscaled, lhs, rhs) \
avro_value_call(value, get_decimal, EINVAL, unscaled, lhs, rhs)
#define avro_value_set_decimal(value, unscaled, lhs, rhs) \
avro_value_call(value, set_decimal, EINVAL, unscaled, lhs, rhs)

CLOSE_EXTERN
#endif
92 changes: 72 additions & 20 deletions lang/c/src/consume-binary.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,41 @@ read_record(avro_reader_t reader, const avro_encoding_t * enc,
return 0;
}

static int
read_bytes(avro_reader_t reader, const avro_encoding_t *enc, char **bytes,
int64_t *len)
{
int rval;
check_prefix(rval, enc->read_bytes(reader, bytes, len),
"Cannot read bytes value: ");
return rval;
}

static int
read_fixed(avro_reader_t reader, avro_schema_t fixed_schema, char **out_bytes,
int64_t *out_size)
{
int rval;
char *bytes;
int64_t size = avro_schema_to_fixed(fixed_schema)->size;

bytes = (char *) avro_malloc(size);
if (!bytes) {
avro_prefix_error("Cannot allocate new fixed value");
return ENOMEM;
}
rval = avro_read(reader, bytes, size);
if (rval) {
avro_prefix_error("Cannot read fixed value: ");
avro_free(bytes, size);
return rval;
}

*out_bytes = bytes;
*out_size = size;
return rval;
}

int
avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
{
Expand Down Expand Up @@ -267,33 +302,22 @@ avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
{
char *bytes;
int64_t len;
check_prefix(rval, enc->read_bytes(reader, &bytes, &len),
"Cannot read bytes value: ");
check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud));
check(rval, read_bytes(reader, enc, &bytes, &len));
check(rval, avro_consumer_call(consumer, bytes_value,
bytes, len, ud));
}
break;

case AVRO_FIXED:
{
char *bytes;
int64_t size =
avro_schema_to_fixed(consumer->schema)->size;

bytes = (char *) avro_malloc(size);
if (!bytes) {
avro_prefix_error("Cannot allocate new fixed value");
return ENOMEM;
}
rval = avro_read(reader, bytes, size);
if (rval) {
avro_prefix_error("Cannot read fixed value: ");
avro_free(bytes, size);
return rval;
}

rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud);
int64_t len;
check(rval, read_fixed(reader, consumer->schema,
&bytes, &len));
rval = avro_consumer_call(consumer, fixed_value, bytes,
len, ud);
if (rval) {
avro_free(bytes, size);
avro_free(bytes, len);
return rval;
}
}
Expand Down Expand Up @@ -325,6 +349,34 @@ avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
case AVRO_INVALID:
avro_set_error("Consumer can't consume an invalid schema");
return EINVAL;

case AVRO_DECIMAL:
{
char *bytes;
int should_free = 0;
int64_t len;
avro_schema_t underlying =
avro_schema_logical_underlying(consumer->schema);
if (is_avro_fixed(underlying)) {
check(rval, read_fixed(reader, underlying,
&bytes, &len));
should_free = 1;
} else {
check(rval, read_bytes(reader, enc, &bytes,
&len));
should_free = 0;
}

rval = avro_consumer_call(consumer, decimal_value,
bytes, len, ud);
if (rval) {
if (should_free) {
avro_free(bytes, len);
}
return rval;
}
}
break;
}

return 0;
Expand Down
6 changes: 6 additions & 0 deletions lang/c/src/datum.c
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,12 @@ static void avro_datum_free(avro_datum_t datum)
/* TODO */
}
break;
case AVRO_DECIMAL:
/*
* Datums are not created from logical schemas, only
* their underlying schemas.
*/
break;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion lang/c/src/datum_equal.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ int avro_datum_equal(const avro_datum_t a, const avro_datum_t b)
*/
return 0;
case AVRO_INVALID:
case AVRO_DECIMAL:
/*
* Invalid datums should not be compared and returning 0
* matches the other error conditions
* matches the other error conditions. Datums will also not be
* created from logical schemas, only their underlying schemas.
*/
return 0;
}
Expand Down
5 changes: 5 additions & 0 deletions lang/c/src/datum_size.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ static int64_t size_datum(avro_writer_t writer, const avro_encoding_t * enc,

case AVRO_LINK:
case AVRO_INVALID:
case AVRO_DECIMAL:
/*
* Datums will not be created from logical schemas, only their
* underlying schemas.
*/
break;
}

Expand Down
11 changes: 11 additions & 0 deletions lang/c/src/datum_skip.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ int avro_skip_data(avro_reader_t reader, avro_schema_t writers_schema)
case AVRO_INVALID:
rval = EINVAL;
break;

case AVRO_DECIMAL:
if (is_avro_fixed(avro_schema_logical_underlying(
writers_schema))) {
rval = avro_skip(reader, avro_schema_to_fixed(
avro_schema_logical_underlying(
writers_schema))->size);
} else {
rval = enc->skip_bytes(reader);
}
break;
}

return rval;
Expand Down
3 changes: 3 additions & 0 deletions lang/c/src/datum_validate.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ avro_schema_datum_validate(avro_schema_t expected_schema, avro_datum_t datum)
datum);
}
break;
case AVRO_DECIMAL:
return avro_schema_datum_validate(
avro_schema_logical_underlying(expected_schema), datum);
case AVRO_INVALID:
return EINVAL;
}
Expand Down
4 changes: 3 additions & 1 deletion lang/c/src/datum_value.c
Original file line number Diff line number Diff line change
Expand Up @@ -768,5 +768,7 @@ avro_value_iface_t AVRO_DATUM_VALUE_CLASS =
/* compound setters */
avro_datum_value_append,
avro_datum_value_add,
avro_datum_value_set_branch
avro_datum_value_set_branch,
NULL, /* get_decimal */
NULL, /* set_decimal */
};