Watermill OpenTelemetry integration

Bringing distributed tracing support to Watermill with OpenTelemetry.

Usage

For publishers

package example

import (
    "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/garsue/watermillzap"
    wotel "github.com/voi-oss/watermill-opentelemetry/pkg/opentelemetry"
    "go.uber.org/zap"
)

type PublisherConfig struct {
	Name         string
	GCPProjectID string
}

// NewPublisher instantiates a GCP Pub/Sub Publisher with tracing capabilities.
func NewPublisher(logger *zap.Logger, config PublisherConfig) (message.Publisher, error) {
	publisher, err := googlecloud.NewPublisher(
        googlecloud.PublisherConfig{ProjectID: config.GCPProjectID},
        watermillzap.NewLogger(logger),
    )
	if err != nil {
		return nil, err
	}

	if config.Name == "" {
		return wotel.NewPublisherDecorator(publisher), nil
	}

	return wotel.NewNamedPublisherDecorator(config.Name, publisher), nil
}

For subscribers

A tracing middleware can be defined at the router level:

package example

import (
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
    wotel "github.com/voi-oss/watermill-opentelemetry/pkg/opentelemetry"
)

func InitTracedRouter() (*message.Router, error) {
	router, err := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})
	if err != nil {
		return nil, err
	}

	router.AddMiddleware(wotel.Trace())

	return router, nil
}

Alternatively, individual handlers can be traced:

package example

import (
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
    wotel "github.com/voi-oss/watermill-opentelemetry/pkg/opentelemetry"
)

func InitRouter() (*message.Router, error) {
	router, err := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})
	if err != nil {
		return nil, err
	}
    
    // subscriber definition omitted for clarity
    subscriber := (message.Subscriber)(nil)

	router.AddNoPublisherHandler(
        "handler_name",
        "subscribeTopic",
        subscriber,
        wotel.TraceNoPublishHandler(func(msg *message.Message) error {
            return nil
        }),
    )

	return router, nil
}

GitHub

https://github.com/voi-oss/watermill-opentelemetry