Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3135: Schema ser deser hooks #2777

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
61 changes: 31 additions & 30 deletions lang/java/avro/src/main/java/org/apache/avro/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,27 @@ public String toString() {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = Schema.FACTORY.createGenerator(writer);
toJson(gen);
toJson(SchemaJsonSerDe.DEFAULT, gen);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}

void toJson(JsonGenerator gen) throws IOException {
void toJson(SchemaJsonSerDe customSchemaSerDe, JsonGenerator gen) throws IOException {
gen.writeStartObject();
if (doc != null)
gen.writeStringField("doc", doc);
writeProps(gen); // write out properties
gen.writeFieldName("request");
request.fieldsToJson(types, gen);
request.fieldsToJson(types, customSchemaSerDe, gen);

toJson1(gen);
toJson1(customSchemaSerDe, gen);
gen.writeEndObject();
}

void toJson1(JsonGenerator gen) throws IOException {
void toJson1(SchemaJsonSerDe customSchemaSerDe, JsonGenerator gen) throws IOException {
gen.writeStringField("response", "null");
gen.writeBooleanField("one-way", true);
}
Expand Down Expand Up @@ -226,15 +226,15 @@ public int hashCode() {
}

@Override
void toJson1(JsonGenerator gen) throws IOException {
void toJson1(SchemaJsonSerDe customSchemaSerDe, JsonGenerator gen) throws IOException {
gen.writeFieldName("response");
response.toJson(types, gen);
response.toJson(types, customSchemaSerDe, gen);

List<Schema> errs = errors.getTypes(); // elide system error
if (errs.size() > 1) {
Schema union = Schema.createUnion(errs.subList(1, errs.size()));
gen.writeFieldName("errors");
union.toJson(types, gen);
union.toJson(types, customSchemaSerDe, gen);
}
}

Expand Down Expand Up @@ -416,15 +416,15 @@ public String toString(boolean pretty) {
JsonGenerator gen = Schema.FACTORY.createGenerator(writer);
if (pretty)
gen.useDefaultPrettyPrinter();
toJson(gen);
toJson(SchemaJsonSerDe.DEFAULT, gen);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}

void toJson(JsonGenerator gen) throws IOException {
void toJson(SchemaJsonSerDe customSchemaSerDe, JsonGenerator gen) throws IOException {
types.space(namespace);

gen.writeStartObject();
Expand All @@ -440,13 +440,13 @@ void toJson(JsonGenerator gen) throws IOException {
Schema.Names resolved = new Schema.Names(namespace);
for (Schema type : types.values())
if (!resolved.contains(type))
type.toJson(resolved, gen);
type.toJson(resolved, customSchemaSerDe, gen);
gen.writeEndArray();

gen.writeObjectFieldStart("messages");
for (Map.Entry<String, Message> e : messages.entrySet()) {
gen.writeFieldName(e.getKey());
e.getValue().toJson(gen);
e.getValue().toJson(customSchemaSerDe, gen);
}
gen.writeEndObject();
gen.writeEndObject();
Expand All @@ -466,13 +466,13 @@ public byte[] getMD5() {
/** Read a protocol from a Json file. */
public static Protocol parse(File file) throws IOException {
try (JsonParser jsonParser = Schema.FACTORY.createParser(file)) {
return parse(jsonParser);
return parse(jsonParser, SchemaJsonSerDe.DEFAULT);
}
}

/** Read a protocol from a Json stream. */
public static Protocol parse(InputStream stream) throws IOException {
return parse(Schema.FACTORY.createParser(stream));
return parse(Schema.FACTORY.createParser(stream), SchemaJsonSerDe.DEFAULT);
}

/** Read a protocol from one or more json strings */
Expand All @@ -486,26 +486,27 @@ public static Protocol parse(String string, String... more) {
/** Read a protocol from a Json string. */
public static Protocol parse(String string) {
try {
return parse(Schema.FACTORY.createParser(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8))));
return parse(Schema.FACTORY.createParser(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8))),
SchemaJsonSerDe.DEFAULT);
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}

private static Protocol parse(JsonParser parser) {
private static Protocol parse(JsonParser parser, SchemaJsonSerDe customSchemaSerDe) {
try {
Protocol protocol = new Protocol();
protocol.parse((JsonNode) Schema.MAPPER.readTree(parser));
protocol.parse((JsonNode) Schema.MAPPER.readTree(parser), customSchemaSerDe);
return protocol;
} catch (IOException e) {
throw new SchemaParseException(e);
}
}

private void parse(JsonNode json) {
private void parse(JsonNode json, SchemaJsonSerDe customSchemaSerDe) {
parseNameAndNamespace(json);
parseTypes(json);
parseMessages(json);
parseTypes(json, customSchemaSerDe);
parseMessages(json, customSchemaSerDe);
parseDoc(json);
parseProps(json);
}
Expand All @@ -532,7 +533,7 @@ private String parseDocNode(JsonNode json) {
return nameNode.textValue();
}

private void parseTypes(JsonNode json) {
private void parseTypes(JsonNode json, SchemaJsonSerDe customSchemaSerDe) {
JsonNode defs = json.get("types");
if (defs == null)
return; // no types defined
Expand All @@ -542,11 +543,11 @@ private void parseTypes(JsonNode json) {
for (JsonNode type : defs) {
if (!type.isObject())
throw new SchemaParseException("Type not an object: " + type);
Schema.parseNamesDeclared(type, types, types.space());
Schema.parseNamesDeclared(type, types, customSchemaSerDe, types.space());

}
for (JsonNode type : defs) {
Schema.parseCompleteSchema(type, types, types.space());
Schema.parseCompleteSchema(type, types, customSchemaSerDe, types.space());
}
}

Expand All @@ -558,17 +559,17 @@ private void parseProps(JsonNode json) {
}
}

private void parseMessages(JsonNode json) {
private void parseMessages(JsonNode json, SchemaJsonSerDe customSchemaSerDe) {
JsonNode defs = json.get("messages");
if (defs == null)
return; // no messages defined
for (Iterator<String> i = defs.fieldNames(); i.hasNext();) {
String prop = i.next();
this.messages.put(prop, parseMessage(prop, defs.get(prop)));
this.messages.put(prop, parseMessage(prop, defs.get(prop), customSchemaSerDe));
}
}

private Message parseMessage(String messageName, JsonNode json) {
private Message parseMessage(String messageName, JsonNode json, SchemaJsonSerDe customSchemaSerDe) {
String doc = parseDocNode(json);

Map<String, JsonNode> mProps = new LinkedHashMap<>();
Expand All @@ -594,8 +595,8 @@ private Message parseMessage(String messageName, JsonNode json) {
JsonNode fieldDocNode = field.get("doc");
if (fieldDocNode != null)
fieldDoc = fieldDocNode.textValue();
Field newField = new Field(name, Schema.parse(fieldTypeNode, types), fieldDoc, field.get("default"), true,
Order.ASCENDING);
Field newField = new Field(name, Schema.parse(fieldTypeNode, types, customSchemaSerDe), fieldDoc,
field.get("default"), true, Order.ASCENDING);
Set<String> aliases = Schema.parseAliases(field);
if (aliases != null) { // add aliases
for (String alias : aliases)
Expand Down Expand Up @@ -629,12 +630,12 @@ private Message parseMessage(String messageName, JsonNode json) {
if (oneWay) {
if (decls != null)
throw new SchemaParseException("one-way can't have errors: " + json);
if (responseNode != null && Schema.parse(responseNode, types).getType() != Schema.Type.NULL)
if (responseNode != null && Schema.parse(responseNode, types, customSchemaSerDe).getType() != Schema.Type.NULL)
throw new SchemaParseException("One way response must be null: " + json);
return new Message(messageName, doc, mProps, request);
}

Schema response = Schema.parse(responseNode, types);
Schema response = Schema.parse(responseNode, types, customSchemaSerDe);

List<Schema> errs = new ArrayList<>();
errs.add(SYSTEM_ERROR); // every method can throw
Expand Down