pgjobs, a dead simple postgres job queueing mechanism

This project aims to be a blueprint for your own job queue solution with Go and PostgreSQL. It is recommended to fork this project and adjust the job queue to your own needs.

To create a fast queue, Postgres SKIP LOCKED feature is used. The technique is described here.

Example usage

A complete, runnable examples can be found under ./example/.

First define a pgjobs.Job, for example in ./jobs/emailUser.go:

package jobs

import (
	"encoding/json"
	"log"

	"github.com/rverton/pgjobs"
)

type EmailUser struct {
	Email string
}

func NewEmailUser(email string) *EmailUser {
	return &EmailUser{
		Email: email,
	}
}

// the action which should be executed
func (e EmailUser) Perform(attempt int32) error {
	log.Printf("emailing %v, attempt=%v", e.Email, attempt)
	return nil
}

// this is boilerplate code and does not need to be modified
func (e EmailUser) Load(data string) (pgjobs.Job, error) {
	var n EmailUser
	err := json.Unmarshal([]byte(data), &n)
	return n, err
}

You can then setup a queue, (optionally) enforce the jobs table schema, and work on queued jobs.

package main

import (
	"context"
	"database/sql"
	"example/jobs"
	"log"
	"os"

	_ "github.com/jackc/pgx/v5/stdlib"
	"github.com/rverton/pgjobs"
)

func main() {
	db, err := sql.Open("postgres", os.Getenv("DB_URL"))
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()

	// initiate queue with postgres connection
	queue := pgjobs.NewQueue(db)
	if err := queue.SetupSchema(ctx); err != nil {
		panic(err)
	}

	// enequeue an example job
	job := jobs.NewEmailUser("[email protected]")
	if err = queue.Enqueue(context.Background(), job, "default"); err != nil {
		log.Printf("error enqueueing: %v", err)
	}

	// start worker and pass all processable jobs
	queues := []string{"default"}
	if err := queue.Worker(ctx, queues, &jobs.EmailUser{}); err != nil {
		log.Println(err)
	}
}

ToDo

  • Remove github.com/lib/pq dependency
  • Make job processing more robust by using a transaction
  • Implement attempt handling
  • Add error handling and retries?

GitHub

View Github