Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.

It supports
opentelemetry spans and metrics out of the box

It also supports building dynamic pipelines using

Components is a repository of different vertex and plugin implementations


Add the primary library to your project

  go get -u

Data is a library for getting and setting values in a map[string]interface{}


Basic receive -> process -> send Flow

  stream := NewStream("unique_id1", 
    func(c context.Context) chan []Data {
      channel := make(chan []Data)
      // setup channel to collect data as long as 
      // the context has not completed

      return channel
    &Option{FIFO: boolP(false)},
    &Option{Metrics: boolP(true)},
    &Option{Span: boolP(false)},

      func(m Data) error {
        var err error

        // some processing

        return err
    ).Publish("publish_left_id", publishFN(func(d []data.Data) error {
      // send the data somewhere

      return nil

  if err := stream.Run(context.Background()); err != nil {
    // Run will return an error in the case that 
    // one of the paths is not terminated (i.e. missing a Publish)


👤 Jonathan Whitaker