-
Notifications
You must be signed in to change notification settings - Fork 1
Storage Execution
We are now ready to actually read data for our storage plugin. Drill will pass our sub scan from the planner to the execution Drillbit(s) in the form of serialized JSON. Our job is to:
- Tell Drill how to associate the sub scan with a factory that will create our scan operator.
- Create the scan operator (which Drill calls a "scan batch").
- Implement the reader for our data source.
This example uses the enhanced vector framework (EVF) to implement the scan operator. Most existing plugins use the older ScanBatch
implementation.
Drill uses an operator factory (which drill calls a "batch creator") to create the actual execution operator. Drill looks for all classes that extend BatchCreator
. Then, of all such classes, Drill looks for the one where the first argument to the getBatch()
method matches the class of our sub scan.
public class ExampleScanBatchCreator implements BatchCreator<ExampleSubScan> {
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
ExampleSubScan subScan, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children == null || children.isEmpty());
ExampleStoragePlugin plugin = (ExampleStoragePlugin) context.getStorageRegistry()
.getPlugin(subScan.getConfig());
return plugin.createScan(context, subScan);
}
}
We passed the storage plugin config as part of the scan definition. Drill has given us a context that is a gateway into the Drill engine. We use that to get the storage plugin registry, then use the plugin config to locate our plugin. From there, we'll ask the plugin to create the scan operator.
public CloseableRecordBatch createScan(ExecutorFragmentContext context,
SumoSubScan scanDef) throws ExecutionSetupException {
try {
final ScanFrameworkBuilder builder = frameworkBuilder(context.getOptions(), scanDef);
return builder.buildScanOperator(context, scanDef);
} catch (final UserException e) {
// Rethrow user exceptions directly
throw e;
} catch (final Throwable e) {
// Wrap all others
throw new ExecutionSetupException(e);
}
}
private ScanFrameworkBuilder frameworkBuilder(OptionManager options,
SumoSubScan scanDef) {
return null; // TODO
}
We can now verify that our sub scan is properly serialized, sent over the wire, deserialized, and used to match our operator factory. Just set a breakpoint in the getBatch()
method and run the test case. We'll only get as far as the frameworkBuilder()
method above before we get an NPE.
Storage plugins do not provide their own scan operator. Instead, they use an existing operator and simply provide a record reader (old-style ScanBatch
) or batch reader (newer EVF).
Here we must get into the details of EVF setup. You use a "builder" to specify the options for your scan, then let EVF build the scan operator itself.
private ScanFrameworkBuilder frameworkBuilder(OptionManager options,
ExampleSubScan scanDef) {
ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
builder.setProjection(scanDef.getColumns());
builder.setUserName(scanDef.getUserName());
builder.setReaderFactory(new SumoReaderFactory(scanDef));
builder.setNullType(Types.optional(MinorType.VARCHAR));
builder.setContext(
new ChildErrorContext(builder.errorContext()) {
@Override
public void addContext(UserException.Builder builder) {
builder.addContext("Format plugin:", ExampleStoragePluginConfig.NAME);
builder.addContext("Format plugin:",
getClass().getSimpleName());
builder.addContext("Plugin config name:", getName());
builder.addContext("Table:", scanDef.getTableSpec().getTableName());
}
});
return builder;
}
The above provides:
- The projection as specified in the sub scan.
- The name of the user running the query (which we don't use here.)
- The factory to create our reader.
- The type to use for missing columns:
VARCHAR
. - Some useful context to show the user in error messages if anything goes wrong.
The reader factory class normally creates multiple readers for each scan. A file scan, for example, normally reads all the files in a directory. In our case, we will do only one scan:
private static class ExampleReaderFactory implements ReaderFactory {
private final ExampleSubScan scanSpec;
private int readerCount;
public SumoReaderFactory(ExampleSubScan scanSpec) {
this.scanSpec = scanSpec;
}
@Override
public void bind(ManagedScanFramework framework) { }
@Override
public ManagedReader<? extends SchemaNegotiator> next() {
if (readerCount++ == 0) {
return new ExampleBatchReader(scanSpec);
} else {
return null;
}
}
}
Construction of an EVF-based batch reader was already described elsewhere, the work is the same whether the reader is for a format plugin or a storage plugin. The key difference is that here we use the generic ManagedScanFramework
, not the FileScanFramework
used for reading files.
We have designed the above steps so that the information we need is available in the storage plugin config, the storage plugin, or in the sub scan. For example, if we are calling a REST service, then the endpoint and any credentials should appear in the plugin config, while the table name and columns should tell us what to query.
You can now run a full test: