Skip to content
Paul Rogers edited this page Nov 20, 2019 · 10 revisions

Create the Group and Sub Scans

We have now helped Calcite resolve a table name. Next we begin to define what we will do with the table, which is to scan (read) it. Some systems are fancier than others: some read a single, contiguous data source, some read "splits" of a data source (such as file blocks in HDFS), some arbitrarily break up a scan into chunks, others "push" the scan to an external system, and so forth. Calcite provides a great deal of flexibility to define these cases. For our needs, we assume we scan a single data source from start to end.

Drill uses a three-stage process to convert a table into a scan operator definition:

  • Resolve the table to create a scan specification.
  • Specify the columns for the scan to create a group scan specification.
  • Determine the information needed to perform the physical scan and create a sub (specific) scan specification.

Scan Specification

We start by defining a "scan specification": a description of the overall logical scan, before we've started worrying about the details. Our class just needs the table name:

@JsonTypeName("example-scan-spec")
public class ExampleScanSpec {

  private final String tableName;

  @JsonCreator
  public ExampleScanSpec(
      @JsonProperty("tableName") String tableName) {
    this.tableName = tableName;
  }

  public String getTableName() { return tableName; }
}

We've defined this class as Jackson-serializable so that we can pass it along to the execution engine. (We could have chosen a different format, but this approach is simplest.)

Test the Scan Spec

We can now repeat our test, stepping through the code to verify that the scan spec and dynamic table are created correctly.

Group Scan Definition

Drill is a member of the Hadoop ecosystem, and so supports the idea that a table may be represented by a directory of files, where each file consists of blocks distributed across storage nodes. A "group scan" is the term for the logical scan of the pieces that make up a table, and represents one additional layer of refinement from the scan specification.

As we have noted, Drill uses Calcite for query planning, and so the scan objects fit into the Calcite structure. As a result, the group (and later, "sub") scans are a bit complex.

In our case, our table consists of a single chunk of data, so the "group" consists of a single physical scan.

The group scan brings together three layers of information:

  • The configuration of the storage plugin (the "storage engine"),
  • The (optional schema) and table,
  • The set of columns.

That is, the group scan extends the scan spec by providing a list of columns from our query:

SELECT * FROM example.myTable

SELECT a, b, c FROM example.myTable

Drill uses schema-on-read, so we will assume that we can figure out the table names and types at runtime. However, if we know the available columns and types at plan time, we can tell Calcite to use that information. See the existing storage plugins to see how that is done.

public class ExampleGroupScan extends AbstractGroupScan {

  private final ExampleStoragePluginConfig config;
  private final ExampleScanSpec scanSpec;
  private final List<SchemaPath> columns;

  public ExampleGroupScan(ExampleGroupScan that) {
    super(that);
    this.config = that.config;
    this.scanSpec = that.scanSpec;
    this.columns = that.columns;
  }

  public ExampleGroupScan(
      ExampleStoragePluginConfig config,
      ExampleScanSpec scanSpec,
      List<SchemaPath> columns) {
    super("dummy-user"); // No real user for this service
    this.config = config;
    this.scanSpec = scanSpec;
    this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
  }

This class is not serialized, so no Jackson serialization is needed. (Some of the existing Drill plugins do make the class serializable, but it appears this is actually unnecessary.)

We need both a copy constructor and a "normal" constructor. We accept the storage plugin, though this implementation does not really need it.

This class needs a number of methods to help with planning. First, we have to tell Drill how to assign the scan to nodes. By default, we do no assignments and let Drill decide:

  @Override
  public void applyAssignments(List<DrillbitEndpoint> endpoints) { }

Next we tell Drill how much it can parallelize the scan. We assume we are calling an external service, so we allow only one thread of execution per query:

  @Override
  public int getMaxParallelizationWidth() { return 1; }

If you use EVF to implement your reader (and you probably should), then your reader can handle projection:

  @Override
  public boolean canPushdownProjects(List<SchemaPath> columns) {
    return true;
  }

This class implements the next step in the planning process: creating specific scans, but we'll stub this part out for now:

  @Override
  public SubScan getSpecificScan(int minorFragmentId) {
    // TODO Auto-generated method stub
    return null;
  }

Finally, we need some boilerplate required by Drill:

  @Override
  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
    Preconditions.checkArgument(children.isEmpty());
    return new ExampleGroupScan(this);
  }

  @Override
  public String getDigest() {
    return toString();
  }

  @Override
  public String toString() {
    return "Example scan of " + scanSpec.getSchemaName() + "." + scanSpec.getTableName();
  }

Create the Group Scan Instance

Add the following method to the plugin class:

  @Override
  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
    ExampleScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<ExampleScanSpec>() { });
    return new ExampleGroupScan(userName, config, scanSpec, null);
  }

This code deserializers a JSON representation of your scan spec. (It is not clear why the scan spec is in JSON at this point.)

Test the Group Scan

We can now again run our test. First set a breakpoint in the getSpecificScan, and run the test. This will verify that things work up to this point.

Specific Scan

The group scan represents the general idea of "scan table X." The specific scan (also called a "sub scan") contains the information needed to implement the physical scan, including Hadoop splits, database shards or whatever is needed by the storage engine. In our case, we assume a single scan operator that just needs the information gathered above.

The specific scan is serialized to JSON and sent to each Drillbit, where it is deserialized and passed to the scan operator.

@JsonTypeName("example-sub-scan")
public class ExampleSubScan extends AbstractSubScan {

  private final ExampleStoragePluginConfig pluginConfig;
  private final ExampleScanSpec tableSpec;
  private final List<SchemaPath> columns;

  public ExampleSubScan(
      @JsonProperty("config") ExampleStoragePluginConfig config,
      @JsonProperty("tableSpec") ExampleScanSpec tableSpec,
      @JsonProperty("columns") List<SchemaPath> columns) {
    super("user-if-needed");
    this.pluginConfig = config;
    this.tableSpec = tableSpec;
    this.columns = columns;
  }

  @Override
  @JsonIgnore
  public int getOperatorType() {
    return CoreOperatorType.EXAMPLE_SUB_SCAN_VALUE;
  }

  public ExampleStoragePluginConfig getConfig() {
    return pluginConfig;
  }

  public ExampleScanSpec getTableSpec() {
    return tableSpec;
  }

  public List<SchemaPath> getColumns() {
    return columns;
  }
}

Notice that we have included the storage plugin config, not the storage plugin itself. The config is Jackson-serializable, the plugin is not. We now see why we made the scan spec serializable, it will be included in our sub scan.

Operator Type

Notice above that we referenced CoreOperatorType.EXAMPLE_SUB_SCAN_VALUE. This is a unique numeric identifier for each operator in Drill. Unfortunately, all operators must be defined in a single global class, which makes it hard to add a true plugin not known to Drill itself. Until this is resolved, you must modify UserBitShared.proto to add your operator type:

enum CoreOperatorType {
  SINGLE_SENDER = 0;
  ...
  SHP_SUB_SCAN = 65;
  EXAMPLE_SUB_SCAN = 66;
}

Choose the next available number for your ID.

Then, rebuild the Protobuf files as described in protocol/readme.txt.

Create the Sub Scan Instance

We now connect up our sub scan with the group scan:

public class ExampleGroupScan extends AbstractGroupScan {
  ...

  @Override
  public SubScan getSpecificScan(int minorFragmentId) {
    return new ExampleSubScan(config, scanSpec, columns);
  }

Test the Sub Scan

Set a breakpoint in the above method and run the test case. Execution should stop there and you can verify that your sub scan is created as you expect.

With this, we are done with the planner side of the project and are ready to move onto the execution side.

Clone this wiki locally