Baker

PkgGoDev Go codecov

Baker is a high performance, composable and extendable data-processing pipeline for the big data era. It shines at converting, processing, extracting or storing records (structured data), applying whatever transformation between input and output through easy-to-write filters.
Baker is fully parallel and maximizes usage of both CPU-bound and I/O bound pipelines.

Pipelines

A pipeline is the configured set of operations that Baker performs during its execution.

It is defined by:

  • One input component, defining where to fetch records from
  • Zero or more filters, which are functions that can process records (reading/writing fields, clearing them or even splitting them into multiple records)
  • One output component, defining where to send the filtered records to (and which columns)
  • One optional upload component, defining where to send files produced by the output component (if any)

Notice that there are two main usage scenarios for Baker:

  1. Baker as a batch processor. In this case, Baker will go through all the records that are fed by the input component, process them as quickly as possible, and exit.
  2. Baker as a daemon. In this case, Baker will never exit; it will keep waiting for incoming records from the input component (e.g.: Kinesis), process them and send them to the output.

Selecting between scenario 1 or scenario 2 is just a matter of configuring the pipeline; in fact, it is the input component that drives the scenario. If the input component exits at some point, Baker will flush the pipeline and exit as well; if instead the input component is endless, Baker will never exit and thus behave like a daemon.

Usage

Baker uses a baker.Config struct to know what to do. The configuration can be created parsing a toml file with baker.NewConfigFromToml(). This function requires a baker.Components object including all available/required components.
This is an example of this struct:

comp := baker.Components{
	Inputs:  input.AllInputs(),
	Filters: MyCustomFilters(),
	// merge available outputs with user custom outputs
	Outputs: append(output.AllOutputs(), MyCustomOutputs()...),
	Uploads: MyCustomUploads(),
	// optional: custom extra config
	User: MyCustomConfigs(),
	// optional: used if sharding is enabled
	ShardingFuncs: MyRecordShardingFuncs,
	// optional: used if records must be validated. Not used if [general.dont_validate_fields] is used in TOML
	Validate: MyRecordValidationFunc,
	// optional: functions to get fields indexes by name and vice-versa
	FieldByName: MyFieldByNameFunc,
	FieldNames:  []string{"field0", "field1", "field2"},
}

Components (inputs, filters, outputs and uploads) are either generic ones provided with Baker or user-defined components, or a mix of both.

Performance

Read from S3 and write to local disk

On a c5.2xlarge instance, Baker managed to read zstandard records from S3, uncompress them and apply a basic filtering logic, compressing them back on local files with zstandard at compression level 3 and long range mode at 27 using ~90% of capacity of each vCPU (8 in total) and ~3.5GB of RAM.
It read and wrote a total of 94 million records in 8’51” (~178k r/w records per second).
On a c5.12xlarge instance (48vCPUs) the same test took 2’2″ (~775k r/w records per second).

For this test we used 711 zstd compressed files for a total of 17 GB of compressed size and 374 GB of uncompressed size. The average size of each record was ~4.5 KB.

Read from S3 and write to DynamoDB (in the same region)

On a c5.4xlarge instance, Baker read zstd compressed files from S3 writing to DynamoDB (configured with 20k write capacity units) at an average speed of 60k records/s (the average size of each record is 4.3 KB) using less than 1 GB of memory and ~300% of the total CPU capacity (less than 20% for each core). The bottleneck here was the DynamoDB write capacity, so Baker can easily cope with an increased load just increasing the write capacity units in DynamoDB (up to 400k).

Read from Kinesis and write to DynamoDB (in the same region)

On a c5.4xlarge instance, we performed a test reading from a Kinesis stream with 130 shards and writing to a DynamoDB table with 20k write capacity units. Baker was able to read and write more than 10k records per second (the avg size of each record was 4.5 KB) using less than 1 GB of RAM and ~400% of the total CPU capacity (less than 25% for each core).

Baker and AWS Kinesis Data Firehose

On many aspects Baker can be compared with Firehose and so we did when we used Baker in one of the NextRoll project.
As mentioned in the NextRoll Tech Blog the price of that service, OMFG, if served using Amazon Firehose, would have been around $30k/month (not including S3 and data transfer costs). That monthly cost is more than the whole yearly cost of the service using Baker.

How to build a Baker executable

The examples/ folder contains several main() examples:

  • basic: a simple example with minimal support
  • filtering: shows how to code your own filter
  • sharding: shows how to use an output that supports sharding (see below for details about sharding)
  • help: shows components’ help messages
  • metrics: shows how to implement and plug a new metrics client to Baker
  • advanced: an advanced example with most of the features supported by Baker

TOML Configuration files

This is a minimalist Baker pipeline TOML configuration that reads a record from the disk, updates its timestamp field with a “Timestamp” filter and pushes it to DynamoDB:

[input]
name="List"

    [input.config]
    files=["records.csv.gz"]

[[filter]]
name="ClauseFilter"

[output]
name="DynamoDB"
fields=["source","timestamp","user"]


    [output.config]
    regions=["us-west-2","us-east-1"]
    table="TestTableName"
    columns=["s:Source", "n:Timestamp", "s:User"]

[input] selects the input component, or where to read the records from. In this case, the List component is selected, which is a component that fetches CSV files from a list of local or remote paths/URLs. [input.config] is where component-specific configurations can be specified, and in this case we simply provide the files option to List. Notice that List would accept http:// or even s3:// URLs there in addition to local paths, and some more (run ./baker-bin -help List in the help example for more details).

[[filter]] In TOML syntax, the double brackets indicates an array of sections. This is where you declare the list of filters (i.e filter chain) to sequentially apply to your records. As other components, each filter may be followed by a [filter.config] section. This is an example:

[[filter]]
name="filterA"

    [filter.config]
    foo = "bar"

[[filter]]
name="filterB"

[output] selects the output component; the output is where records that made it to the end of the filter chain without being discarded end up. In this case, the DynamoDB output is selected, and its configuration is specified in [output.config].

The fields option in the [output] section selects which fields of the record will be send to the output. In fact, most pipelines don’t want to send the full records to the output, but they will select a few important columns out of the many available columns. Notice that this is just a selection: it is up to the output component to decide how to physically serialize those columns. For instance, the DynamoDB component requires the user to specify an option called columns that specifies the name and the type of the column where the fields will be written.

Baker supports environment variables replacement in the configuration file. Use ${ENV_VAR_NAME} or $ENV_VAR_NAME and the value in the file will be replaced at runtime. Note that if the variable doesn’t exist, then an empty string will be used for replacement.

How to create components

To register a new component within Baker and make it available for your pipelines, you must create and fill a description structure and provide it to baker.Components. The structure to fill up is either a InputDesc, FilterDesc, OutputDesc, UploadDesc or MetricsDesc, depending on the component type.

At runtime, components configurations (i.e [input.config], [output.config] and so on) are serialized from TOML and each of them forwarded to the component constructor function.

Configuration fields may contains some struct tags. Let’s see their use with an example:

type MyConfig struct {
  Name string      `help:"Name is ..." required:"true"`
  Value int        `help:"Value is ..."`
  Strings []string `help:"Strings ..." default:"[a, b, c]"`
}

Supported struct tags:

  • help: shown on the terminal when requesting this component’s help
  • default: also shown in the component help
  • required: also shown in help. Configuration fails if the field is not set in TOML (or let to itds zero value).

Filters

An example code can be found at ./examples/filtering/filter.go

A filter must implement a baker.Filter interface:

type Filter interface {
    Process(r Record, next func(Record))
    Stats() FilterStats
}

While Stats() returns a struct used to collect metrics (see the Metrics chapter), the Process() function is where the filter logic is actually implemented.
Filters receive a Record and the next() function, that represents the next filtering function in the filter chain.
The filter can do whatever it likes with the Record, like adding or changing a value, dropping it (not calling the next() function) or even splitting a Record calling next() multiple times.

baker.FilterDesc

In case you plan to use a TOML configuration to build the Baker topology, the filter should also be described using a baker.FilterDesc struct. In fact a list of baker.FilterDesc will be used to populate baker.Components, which is an argument of baker.NewConfigFromToml.

type FilterDesc struct {
    Name   string
    New    func(FilterParams) (Filter, error)
    Config interface{}
    Help   string
}

Name

The Name of the filter must be unique as it will match the toml [filter] configuration:

[[filter]]
name = "FilterName"
    [filter.config]
    filterConfKey1 = "somevalue"
    filterConfKey2 = "someothervalue"

New

This is the constructor and returns the baker.Filter interface as well as a possible error.

Config

The filter can have its own configuration (as the [filter.config] fields above). The Config field will be populated with a pointer to the configuration struct provided.

The New function will receive a baker.FilterParams. Its DecodedConfig will host the filter configuration. It requires a type assertion to the filter configuration struct type to be used:

func NewMyCustomFilter(cfg baker.FilterParams) (baker.Filter, error) {
    if cfg.DecodedConfig == nil {
        cfg.DecodedConfig = &MyCustomFilterConfig{}
    }
    dcfg := cfg.DecodedConfig.(*MyCustomFilterConfig)
}

Help

The help string can be used to build an help output (see the help example).

Inputs

An input is described to baker by filling up a baker.InputDesc struct.
The New function must return a baker.Input component whose Run function represents the hearth of the input.
That function receives a channel where the data produced by the input must be pushed in form of a baker.Data.
The actual input data is a slice of bytes that will be parsed with Record.Parse() by Baker before sending it to the filter chain. The input can also add metadata to baker.Data. Metadata can be user-defined and filters must know how to read and use metadata defined by the input.

Outputs

An output must implement the Output interface:

<div class="highlight highlight-source-go position-relative" data-snippet-clipboard-copy-content="type Output interface {
Run(in <-chan OutputRecord, upch chan

type Output interface {
    Run(in <-chan OutputRecord, upch chan<- string)
    Stats() OutputStats
    CanShard() bool
}