Skip to content

queue

The queue package provides a generic, concurrent job processing system with worker pools and graceful shutdown.

Core Components:

  • Processor[T]: Manages a pool of workers to process jobs from a queue. Implements Runner interface so it can be used as an application service.
  • Handler[T]: Interface for processing jobs with a Handle(ctx context.Context, job T) method.
  • HandlerFunc[T]: Function type that implements Handler for inline handler definitions.
  • Provider[T]: Interface for queue implementations, allowing custom backends.
  • ChanQueue[T]: Built-in thread-safe channel-based queue implementation.
  • ErrTimeout: Error returned when an enqueue operation times out.
  • ErrClosedQueue: Error returned when attempting to operate on a closed queue.

Full package docs at pkg.go.dev

  1. Define your job type

    type job struct {
    data int
    }

    Jobs can be any type. Use a struct for complex payloads.

  2. Create a handler function

    func jobHandler(ctx context.Context, job job) {
    log.InfoContext(ctx, "job handled", "data", job.data)
    }

    The handler receives the job and a context with worker ID for logging.

  3. Create a queue

    q := queue.NewChanQueue[job](10, 3*time.Second)

    First argument is the buffer size, second is the enqueue timeout. When the buffer is full, enqueue blocks until timeout.

  4. Create a processor

    p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)

    Arguments: handler, queue provider, number of workers, shutdown timeout. The shutdown timeout allows workers to drain remaining jobs on shutdown.

  5. Start the processor

    go p.Run(ctx)

    Run blocks until context is cancelled and all workers complete. Use a goroutine for non-blocking operation.

    Expected output:

    time=2025-01-01T12:00:00.000+00:00 level=INFO msg="worker started" workerID=abc-123
    time=2025-01-01T12:00:00.000+00:00 level=INFO msg="worker started" workerID=def-456
  6. Enqueue jobs

    p.Enqueue(ctx, job{data: 1})
    p.Enqueue(ctx, job{data: 2})

    Jobs are distributed to available workers. Enqueue returns an error if the queue is full (timeout) or closed.

    Expected output:

    time=2025-01-01T12:00:00.001+00:00 level=INFO msg="job handled" workerID=abc-123 data=1
    time=2025-01-01T12:00:00.001+00:00 level=INFO msg="job handled" workerID=def-456 data=2

Since Processor implements the Runner interface, it can be registered as a service in an Application:

app := application.New()
q := queue.NewChanQueue[job](100, 5*time.Second)
p := queue.New(queue.HandlerFunc[job](jobHandler), q, 4, 10*time.Second)
app.RegisterService("queue", p)
app.Run(ctx)

The processor starts when the application runs and gracefully shuts down with it.

Implement the Provider interface to use custom queue backends like Redis, RabbitMQ, or databases:

type Provider[T any] interface {
Open(ctx context.Context) error
Close(ctx context.Context) error
EnqueueJob(ctx context.Context, job T) error
GetJobChan(ctx context.Context) (chan T, error)
}

The Open and Close methods handle connection lifecycle. GetJobChan returns the channel that workers read from.

The package provides two error types for queue operations:

ErrorCondition
ErrTimeoutEnqueue operation timed out (buffer full)
ErrClosedQueueAttempted operation on a closed queue

Workers recover from panics automatically and log the error without crashing the processor.

queue.go
package main
import (
"context"
"time"
"github.com/platforma-dev/platforma/log"
"github.com/platforma-dev/platforma/queue"
)
type job struct {
data int
}
func jobHandler(ctx context.Context, job job) {
log.InfoContext(ctx, "job handled", "data", job.data)
}
func main() {
ctx := context.Background()
q := queue.NewChanQueue[job](10, 3*time.Second)
p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)
go p.Run(ctx)
time.Sleep(time.Millisecond)
p.Enqueue(ctx, job{data: 1})
p.Enqueue(ctx, job{data: 2})
p.Enqueue(ctx, job{data: 3})
time.Sleep(time.Millisecond)
}