TL;DR A new version of the Bulk Writer library has been released. One of the major enhancements is the addition of a new helper class for creating ETL pipelines. With some fluent configuration, you can put fast, memory-efficient processing pipelines ahead of SqlBulkCopy for a customized ETL process.
In a previous post, we took an introductory look at how the Bulk Writer library is an easy way to get a fast, memory-efficient bulk load of data to your database implemented quickly. If you’re thinking in terms of ETL, the BulkWriter class implements the “L” (i.e. load” portion of the acronym). We’ve recently released v3.0 of the BulkWriter library (Download Here), which includes a new feature to help with the T – “transform”.
Bulk Writer Pipelines
Pipelines can be thought of as a set of stages for transforming your input data into the form that will ultimately be written to the database. Each stage runs inside its own Task, so this is for heavier processes than what you’d see with some simple LINQ processing, for example. The BulkWriter library handles the internals of funneling data through the stages of your pipeline so all you need to do is define your transforms.
Configuring a Pipeline
Configuration of your pipeline is achieved via a fluent interface on the EtlPipeline class. Pipeline execution always culminates in writing to a BulkWriter, and the process is async. In its simplest form, a pipeline simply writes to the BulkWriter with no transforms at all. This pipeline has two steps: writing an IEnumerable into the pipeline (the StartWith step) and then writing that enumerable into the BulkWriter object (the WriteTo step).
Note that the above is a simple intro to the pipeline feature. If you have no transforms, skip the pipeline and write your IEnumerable directly into the BulkWriter.
There are 4 different ways to transform your data on its way through the pipeline: aggregation, pivot, project and transform. Each of these transform types can be configured in the pipeline by either providing a Func / Action or by passing implementations of interfaces corresponding to each transform type: IAggregator, IPivot, IProjector, and ITransformer.
Aggregate can take multiple records and output a single record.
Pivot can turn one record into many.
Project can translate your current type into a new type.
Transform can apply changes in-place.
The fluent interface also includes a method for enabling logging within your pipeline: LogWith. This method accepts an instance of Microsoft.Extensions.Logging.ILoggerFactory. When provided, the pipeline will log the start, end, and any exceptions encountered within your pipeline steps. These loggers will not be accessible inside your transform code. If you would like to log the internals of the logic you provide to the pipeline, you should do so with your own logger in your Func / Action or implementation of the BulkWriter transform interfaces.
Running the Pipeline
The pipeline is started with a call to ExecuteAsync. This method will return an awaitable Task that will, in turn, wait on each step in the pipeline to complete.
Any exceptions encountered during the pipeline run will terminate the pipeline. Partial output may be generated depending on when and where the exception occurs in the pipeline. Since the pipeline is running under an awaitable Task, you can examine the Exception member of the resulting task for further diagnostic info. This is an AggregateException and will contain a list of exceptions if more than one stage threw an exception.
The BulkWriter library’s new Pipelines feature provides a first-class way to use BulkWriter to act as an ETL engine within your code. You’ll get the same great performance when writing your records to the database as you would with SqlBulkCopy, and you also don’t have to worry about setting up the pipeline infrastructure. Give it a shot and let us know what you think.