Skip to content

Commit

Permalink
Use implicit row constructors. Optionally skip nested rows. Nullabili…
Browse files Browse the repository at this point in the history
…ty bug fixes. (#64)
  • Loading branch information
ryannedolan committed Apr 9, 2024
1 parent 8edcd27 commit c3fa16a
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 35 deletions.
Expand Up @@ -27,7 +27,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
.filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
.collect(Collectors.toList());
return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields);
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
dataType.isNullable());
} else {
switch (dataType.getSqlTypeName()) {
case INTEGER:
Expand All @@ -42,6 +43,15 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
case CHAR:
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
case BOOLEAN:
return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
case ARRAY:
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
dataType.isNullable());
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
case UNKNOWN:
case NULL:
return Schema.createUnion(Schema.create(Schema.Type.NULL));
Expand All @@ -56,14 +66,18 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro
return avro(namespace, name, relProtoDataType.apply(factory));
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
private static Schema createAvroSchemaWithNullability(Schema schema, boolean nullable) {
if (nullable) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType));
return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
} else {
return Schema.create(rawType);
return schema;
}
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
return createAvroSchemaWithNullability(Schema.create(rawType), nullable);
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
Expand All @@ -74,17 +88,25 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
case INT:
// schema.isNullable() should be false for basic types iiuc
return createRelTypeWithNullability(typeFactory, SqlTypeName.INTEGER, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.INTEGER);
case LONG:
return createRelTypeWithNullability(typeFactory, SqlTypeName.BIGINT, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.BIGINT);
case ENUM:
case FIXED:
case STRING:
return createRelTypeWithNullability(typeFactory, SqlTypeName.VARCHAR, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.VARCHAR);
case FLOAT:
return createRelTypeWithNullability(typeFactory, SqlTypeName.FLOAT, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.FLOAT);
case DOUBLE:
return createRelTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.DOUBLE);
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
case UNION:
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
Expand All @@ -102,9 +124,9 @@ public static RelDataType rel(Schema schema) {
return rel(schema, DataType.DEFAULT_TYPE_FACTORY);
}

private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
RelDataType rawType = typeFactory.createSqlType(typeName);
return typeFactory.createTypeWithNullability(rawType, nullable);
return typeFactory.createTypeWithNullability(rawType, false);
}

public static RelProtoDataType proto(Schema schema) {
Expand Down
Expand Up @@ -14,7 +14,7 @@
/** Common data types. Not authoratitive or exhaustive. */
public enum DataType {

VARCHAR_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false));

public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
Expand Down Expand Up @@ -56,16 +56,24 @@ public static Struct struct(RelDataType relDataType) {
/** Convenience builder for non-scalar types */
public interface Struct extends RelProtoDataType {

default Struct with(String name, DataType dataType) {
default Struct with(String name, RelDataType dataType) {
return x -> {
RelDataType existing = apply(x);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x);
builder.addAll(existing.getFieldList());
builder.add(name, dataType.rel(x));
builder.add(name, dataType);
return builder.build();
};
}

default Struct with(String name, DataType dataType) {
return with(name, dataType.rel());
}

default Struct with(String name, Struct struct) {
return with(name, struct.rel());
}

default RelDataType rel() {
return apply(DEFAULT_TYPE_FACTORY);
}
Expand All @@ -85,6 +93,17 @@ default Struct drop(String name) {
};
}

default Struct dropNestedRows() {
return x -> {
RelDataType dataType = apply(x);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x);
builder.addAll(dataType.getFieldList().stream()
.filter(y -> y.getType().getSqlTypeName() != SqlTypeName.ROW)
.collect(Collectors.toList()));
return builder.build();
};
}

default Struct get(String name) {
return x -> {
RelDataTypeField field = apply(x).getField(name, true, false);
Expand Down
@@ -1,23 +1,33 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.sql.SqlWriter;
//import org.apache.calcite.sql.SqlWriterConfig;
// needed in next Calcite version
// import org.apache.calcite.sql.SqlWriterConfig;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCollectionTypeNameSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.SqlShuttle;

import java.util.Map;
import java.util.List;
Expand Down Expand Up @@ -94,6 +104,7 @@ default String sql() {
/** Render the script as DDL/SQL in the given dialect */
default String sql(SqlDialect dialect) {
SqlWriter w = new SqlPrettyWriter(dialect);
// TODO: fix in next Calcite version
// above is deprecated; replace with:
// SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect));
implement(w);
Expand Down Expand Up @@ -129,9 +140,31 @@ public QueryImplementor(RelNode relNode) {
public void implement(SqlWriter w) {
RelToSqlConverter converter = new RelToSqlConverter(w.getDialect());
SqlImplementor.Result result = converter.visitRoot(relNode);
w.literal(result.asSelect().toSqlString(w.getDialect()).getSql());
SqlSelect select = result.asSelect();
if (select.getSelectList() != null) {
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
}
w.literal(select.toSqlString(w.getDialect()).getSql());
}
}

// A `ROW(...)` operator which will unparse as just `(...)`.
private final SqlRowOperator IMPLIED_ROW_OPERATOR = new SqlRowOperator(""); // empty string name

// a shuttle that replaces `Row(...)` with just `(...)`
private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() {
@Override
public SqlNode visit(SqlCall call) {
List<SqlNode> operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList());
if ((call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST
|| call.getOperator() instanceof SqlRowOperator)
&& operands.size() > 1) {
return IMPLIED_ROW_OPERATOR.createCall(call.getParserPosition(), operands);
} else {
return call.getOperator().createCall(call.getParserPosition(), operands);
}
}
};
}

/**
* Implements a CREATE TABLE...WITH... DDL statement.
Expand Down Expand Up @@ -291,14 +324,18 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) {
.map(x -> toSpec(x))
.collect(Collectors.toList());
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO));
} if (dataType.getComponentType() != null) {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec(
dataType.getComponentType().getSqlTypeName(), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO),
SqlParserPos.ZERO));
} else {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
}
}

private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSpec spec) {
if (!dataType.isNullable()) {
return spec.withNullable(true);
return spec.withNullable(false);
} else {
// we don't want "VARCHAR NULL", only "VARCHAR NOT NULL"
return spec;
Expand Down
Expand Up @@ -2,13 +2,19 @@

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;

import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/** Resolves a table name into a concrete row type. Usually involves a network call. */
public interface TableResolver {
RelDataType resolve(String table) throws InterruptedException, ExecutionException;

static TableResolver from(Function<String, RelDataType> f) {
return x -> f.apply(x);
}

/** Appends an extra column to the resolved type */
default TableResolver with(String name, RelDataType dataType) {
return x -> {
Expand All @@ -19,4 +25,20 @@ default TableResolver with(String name, RelDataType dataType) {
return builder.build();
};
}

default TableResolver with(String name, DataType dataType) {
return with(name, dataType.rel());
}

default TableResolver with(String name, DataType.Struct struct) {
return with(name, struct.rel());
}

default TableResolver mapStruct(Function<DataType.Struct, DataType.Struct> f) {
return x -> f.apply(DataType.struct(resolve(x))).rel();
}

default TableResolver map(Function<RelDataType, RelDataType> f) {
return x -> f.apply(resolve(x));
}
}
@@ -0,0 +1,42 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.Litmus;
import org.apache.avro.Schema;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class AvroConverterTest {

@Test
public void convertsNestedSchemas() {
String schemaString = "{\"type\":\"record\",\"name\":\"E\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"h\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"H\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"A\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"A\",\"fields\":[]}]}]}]}]}";

Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
RelDataType rel1 = AvroConverter.rel(avroSchema1);
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
assertTrue(rel1.toString(), rel1.getField("h", false, false) != null);
RelDataType rel2 = rel1.getField("h", false, false).getType();
assertTrue(rel2.toString(), rel2.isNullable());
Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1);
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size());
assertTrue(rel2.toString(), rel2.getField("A", false, false) != null);
RelDataType rel3 = rel2.getField("A", false, false).getType();
assertTrue(rel3.toString(), rel3.isNullable());
Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1);
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable());
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2);
assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable());
assertEquals(avroSchema5.toString(), avroSchema5.getTypes().get(1).getFields().size(), rel2.getFieldCount());
Schema avroSchema6 = AvroConverter.avro("NS", "R", rel3);
assertEquals(avroSchema6.toString(), avroSchema6.getTypes().get(1).getFields().size(), rel3.getFieldCount());
RelDataType rel4 = AvroConverter.rel(avroSchema4);
assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW));
}
}
@@ -0,0 +1,23 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.rel.type.RelDataType;

import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class DataTypeTest {

@Test
public void skipsNestedRows() {
DataType.Struct struct = DataType.struct().with("one", DataType.VARCHAR)
.with("two", DataType.struct().with("three", DataType.VARCHAR));
RelDataType row1 = struct.rel();
assertTrue(row1.toString(), row1.getFieldCount() == 2);
assertTrue(row1.toString(), row1.getField("one", false, false) != null);
assertTrue(row1.toString(), row1.getField("two", false, false) != null);
RelDataType row2 = struct.dropNestedRows().rel();
assertTrue(row2.toString(), row2.getFieldCount() == 1);
assertTrue(row2.toString(), row2.getField("one", false, false) != null);
assertTrue(row2.toString(), row2.getField("two", false, false) == null);
}
}
Expand Up @@ -28,10 +28,10 @@ public void implementsFlinkCreateTableDDL() {
// Output isn't necessarily deterministic, but should be something like:
// CREATE TABLE IF NOT EXISTS "DATABASE"."TABLE1" ("idValue1" VARCHAR) WITH
// ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='topic1')
assertTrue(out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out.contains("'connector'='kafka'"));
assertTrue(out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out.contains("'topic'='topic1'"));
assertFalse(out.contains("Row"));
assertTrue(out, out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out, out.contains("'connector'='kafka'"));
assertTrue(out, out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}
}
Expand Up @@ -27,8 +27,8 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
String principal = (String) operand.getOrDefault("principal", "User:ANONYMOUS");
Map<String, Object> clientConfig = (Map<String, Object>) operand.get("clientConfig");
DataType.Struct rowType = DataType.struct()
.with("PAYLOAD", DataType.VARCHAR_NULL)
.with("KEY", DataType.VARCHAR_NULL);
.with("PAYLOAD", DataType.VARCHAR)
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
Expand Down
Expand Up @@ -148,6 +148,7 @@ public Result reconcile(Request request) {
// Mark the Subscription as failed.
status.setFailed(true);
status.setMessage("Error: " + e.getMessage());
result = new Result(true, operator.failureRetryDuration());
}
} else if (status.getReady() == null && status.getResources() != null) {
// Phase 2
Expand Down

0 comments on commit c3fa16a

Please sign in to comment.