main

hedge

A library built on top of spindle and Cloud Spanner that provides rudimentary distributed computing facilities to Kubernetes deployments. Features include a consistent, append-only, Spanner-backed distributed key/value storage, a distributed locking/leader election mechanism through spindle, a simple member-to-leader communication channel, a broadcast (send-to-all) mechanism, and a distributed semaphore (WIP).

Why?

In a nutshell, I wanted something much simpler than using Raft (my progress on that front is quite slow), or worse, Paxos (Raft maybe as the long-term goal). And I wanted an easily-accessible storage that is a bit decoupled from the code (easier to edit, debug, backup, etc). We are already a heavy Spanner user, and spindle has been in our production for quite a while now: these two should be able to do it, preferably on a k8s Deployment; StatefulSets or DaemonSets shouldn’t be a requirement. Since then, additional features have been added, such as the Send() API.

What does it do?

Leader election is handled by spindle. Two APIs are provided for storage: Put() and Get(). All pods can serve the Get() calls, while only the leader handles the Put() APIs. If a non-leader pod calls Put(), that call is forwarded to the leader, who will do the actual write. All Put()‘s are append-only.

spindle‘s HasLock() function is also available for distributed locking due to struct embedding, although you can use spindle separately for that, if you prefer.

A Send() API is also provided for members to be able to send simple request/reply-type messages to the current leader at any time.

A Broadcast() API is also available for all pods. Note that due to the nature of k8s deployments (pods come and go) and the internal heartbeat delays, some pods might not receive the broadcast message at call time, although all pods will have the complete broadcast target list eventually.

Finally, a distributed semaphore is currently in the works and will be available shortly.

Prerequisites

  • All pods within the group should be able to contact each other via TCP (address:port).
  • Each hedge‘s instance id should be set using the pod’s cluster IP address:port.
  • For now, spindle‘s lock table and hedge‘s log table are within the same database.
  • Tables for spindle and hedge need to be created beforehand. See here for spindle‘s DDL. For hedge, see below:

-- 'logtable' name is just an example
CREATE TABLE logtable (
    id STRING(MAX),
    key STRING(MAX),
    value STRING(MAX),
    leader STRING(MAX),
    timestamp TIMESTAMP OPTIONS (allow_commit_timestamp=true),
) PRIMARY KEY (key, id)

How to use

Something like:

client, _ := spanner.NewClient(context.Background(), "your/spanner/database")
defer client.Close()

xdata := "some arbitrary data"
op := hedge.New(
    client,
    "1.2.3.4:8080", // you can use k8s downward API
    "locktable",
    "myspindlelock",
    "logtable",
    hedge.WithLeaderHandler( // if leader only, handles Send()
        xdata,
        func(data interface{}, msg []byte) ([]byte, error) {
            log.Println("[send] xdata:", data.(string))
            log.Println("[send] received:", string(msg))
            return []byte("hello " + string(msg)), nil
        },
    ),
    hedge.WithBroadcastHandler( // handles Broadcast()
        xdata,
        func(data interface{}, msg []byte) ([]byte, error) {
            log.Println("[broadcast] xdata:", data.(string))
            log.Println("[broadcast] received:", string(msg))
            return []byte("broadcast " + string(msg)), nil
        },
    ),
})

ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1) // optional wait
go op.Run(ctx, done)

// For storage, any pod should be able to call op.Put(...) or op.Get(...) here.
// For distributed locking, any pod can call op.HasLock() here.
// Calling op.Send(...) will be handled by the leader through the WithLeaderHandler callback.
// For broadcast, any pod can call op.Broadcast(...) here which will be handled by each
//   pod's WithBroadcastHandler callback.

cancel()
<-done

A sample deployment file for GKE is provided, although it needs a fair bit of editing (for auth) to be usable. It uses Workload Identity for authentication although you can update it to use other authentication methods as well. The service account needs to have Spanner and PubSub permissions.

Once deployed, you can test by sending PubSub messages to the created topic while checking the logs.

# Test the Put() API, key=hello, value=world
# Try running multiple times to see leader and non-leader pods handling the messages.
$ gcloud pubsub topics publish hedge-demo-pubctrl --message='put hello world'

# Test the Get() API, key=hello
# Try running multiple times to see leader and non-leader pods handling the messages.
$ gcloud pubsub topics publish hedge-demo-pubctrl --message='get hello'

# Test the Send() API
$ gcloud pubsub topics publish hedge-demo-pubctrl --message='send world'

# Test the Broadcast() API
$ gcloud pubsub topics publish hedge-demo-pubctrl --message='broadcast hello'

GitHub

https://github.com/flowerinthenight/hedge