queue
The queue package provides a generic, concurrent job processing system with worker pools and graceful shutdown.
Core Components:
Processor[T]: Manages a pool of workers to process jobs from a queue. ImplementsRunnerinterface so it can be used as anapplicationservice.Handler[T]: Interface for processing jobs with aHandle(ctx context.Context, job T)method.HandlerFunc[T]: Function type that implementsHandlerfor inline handler definitions.Provider[T]: Interface for queue implementations, allowing custom backends.ChanQueue[T]: Built-in thread-safe channel-based queue implementation.ErrTimeout: Error returned when an enqueue operation times out.ErrClosedQueue: Error returned when attempting to operate on a closed queue.
Full package docs at pkg.go.dev
Step-by-step guide
Section titled “Step-by-step guide”-
Define your job type
type job struct {data int}Jobs can be any type. Use a struct for complex payloads.
-
Create a handler function
func jobHandler(ctx context.Context, job job) {log.InfoContext(ctx, "job handled", "data", job.data)}The handler receives the job and a context with worker ID for logging.
-
Create a queue
q := queue.NewChanQueue[job](10, 3*time.Second)First argument is the buffer size, second is the enqueue timeout. When the buffer is full, enqueue blocks until timeout.
-
Create a processor
p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)Arguments: handler, queue provider, number of workers, shutdown timeout. The shutdown timeout allows workers to drain remaining jobs on shutdown.
-
Start the processor
go p.Run(ctx)Run blocks until context is cancelled and all workers complete. Use a goroutine for non-blocking operation.
Expected output:
time=2025-01-01T12:00:00.000+00:00 level=INFO msg="worker started" workerID=abc-123time=2025-01-01T12:00:00.000+00:00 level=INFO msg="worker started" workerID=def-456 -
Enqueue jobs
p.Enqueue(ctx, job{data: 1})p.Enqueue(ctx, job{data: 2})Jobs are distributed to available workers. Enqueue returns an error if the queue is full (timeout) or closed.
Expected output:
time=2025-01-01T12:00:00.001+00:00 level=INFO msg="job handled" workerID=abc-123 data=1time=2025-01-01T12:00:00.001+00:00 level=INFO msg="job handled" workerID=def-456 data=2
Using with Application
Section titled “Using with Application”Since Processor implements the Runner interface, it can be registered as a service in an Application:
app := application.New()
q := queue.NewChanQueue[job](100, 5*time.Second)p := queue.New(queue.HandlerFunc[job](jobHandler), q, 4, 10*time.Second)
app.RegisterService("queue", p)
app.Run(ctx)The processor starts when the application runs and gracefully shuts down with it.
Custom queue providers
Section titled “Custom queue providers”Implement the Provider interface to use custom queue backends like Redis, RabbitMQ, or databases:
type Provider[T any] interface { Open(ctx context.Context) error Close(ctx context.Context) error EnqueueJob(ctx context.Context, job T) error GetJobChan(ctx context.Context) (chan T, error)}The Open and Close methods handle connection lifecycle. GetJobChan returns the channel that workers read from.
Error handling
Section titled “Error handling”The package provides two error types for queue operations:
| Error | Condition |
|---|---|
ErrTimeout | Enqueue operation timed out (buffer full) |
ErrClosedQueue | Attempted operation on a closed queue |
Workers recover from panics automatically and log the error without crashing the processor.
Complete example
Section titled “Complete example”package main
import ( "context" "time"
"github.com/platforma-dev/platforma/log" "github.com/platforma-dev/platforma/queue")
type job struct { data int}
func jobHandler(ctx context.Context, job job) { log.InfoContext(ctx, "job handled", "data", job.data)}
func main() { ctx := context.Background()
q := queue.NewChanQueue[job](10, 3*time.Second) p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)
go p.Run(ctx) time.Sleep(time.Millisecond)
p.Enqueue(ctx, job{data: 1}) p.Enqueue(ctx, job{data: 2}) p.Enqueue(ctx, job{data: 3})
time.Sleep(time.Millisecond)}