Tasqueue
Tasqueue is a simple, lightweight distributed job/worker implementation in Go
Concepts
tasqueue.Broker
is a generic interface that provides methods to enqueue and consume messages from a queue. Currently supported brokers are redis and nats-jetstream.tasqueue.Results
is a generic interface that provides methods to store the state/results of task messages. Currently supported result stores are redis and nats-jetstream.tasqueue.Handler
is a function type that accepts[]byte
payloads. Users need to register such handlers with the server. It is upto the handler to decode (if required) the[]byte
messages and process them in any manner.tasqueue.Task
holds the data for a basic unit of work. This data includes the handler name which will process the task, a[]byte
payload (encoded in any manner, if required). More options described below.
Basic example
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"github.com/kalbhor/tasqueue"
redis_broker "github.com/kalbhor/tasqueue/brokers/redis"
redis_results "github.com/kalbhor/tasqueue/results/redis"
)
type SumPayload struct {
Arg1 int `json:"arg1"`
Arg2 int `json:"arg2"`
}
// SumProcessor prints the sum of two integer arguements.
func SumProcessor(b []byte) error {
var pl SumPayload
if err := json.Unmarshal(b, &pl); err != nil {
return err
}
fmt.Println(pl.Arg1 + pl.Arg2)
return nil
}
func main() {
// Create a new tasqueue server with redis results & broker.
srv := tasqueue.NewServer(redis_broker.New(redis_broker.Options{
Addrs: []string{"127.0.0.1:6379"},
Password: "",
DB: 0,
}), redis_results.New(redis_results.Options{
Addrs: []string{"127.0.0.1:6379"},
Password: "",
DB: 0,
}))
// Register a handler called "add"
srv.RegisterHandler("add", SumProcessor)
// Encode the payload passed to the handler and create a task.
b, _ := json.Marshal(SumPayload{Arg1: 5, Arg2: 4})
t, err := tasqueue.NewTask("add", b)
if err != nil {
log.Fatal(err)
}
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
// Place the task
srv.AddTask(ctx, t)
// Start the tasqueue workers. (blocking function)
srv.Start(ctx)
fmt.Println("exit..")
}
Features and Options
Task
Features
- Task chains : A new “chain” of tasks can be formed by using
tasqueue.NewChain(tasks ...*Task)
. Each subsequent task will be placed after the successful execution of current task.
Options
- Cron based schedule :
tasqueue.Schedule("* * * * *")
- Custom Queue (important to run server on custom queue as well) :
tasqueue.CustomQueue("custom-q")
- Custom value for maximum retries :
tasqueue.MaxRetry(5)
Options can be passed while creating a new task : func NewTask(handler string, payload []byte, opts ...Opts)
Server/Worker
Options
- Custom Queue (important to set on task as well) :
tasqueue.CustomQueue("custom-q")
- Custom concurrency :
tasqueue.Concurrency(5)
Options can be passed while starting the server worker.
func (s *Server) Start(ctx context.Context, opts ...Opts)