queue
The queue package provides a generic, concurrent job processing system with a channel-based queue implementation.
Core Components:
Processor: Manages worker pool and coordinates job processing lifecycle. ImplementsRunnerinterface so it can be used as anapplicationservice.Handler: Interface for defining job processing logicProvider: Interface for custom queue storage implementationsChanQueue: Built-in channel-based queue implementation
Step-by-step guide
Section titled “Step-by-step guide”-
To use
Processorfirst you need to define job typetype job struct {data int} -
Then we need to implement handler. We will use
HandlerFunchelper later so it enough for us to define a functionsfunc jobHandler(ctx context.Context, job job) {log.InfoContext(ctx, "job handled", "data", job.data)} -
Then create new instance of
ChanQueueq := queue.NewChanQueue[job](10, 3 * time.Second)There we set channel buffer size to 10 and timeout to enqueue new jobs to 3 seconds. You should probably tweak those values to your needs.
-
Then let’s create new
Processorp := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)Besides handler and queue we set amount of workers and shutdown timeout to 1 second.
Processorwould use this time to drain jobs from queue after context cancellation. -
We are ready to start our processor and enqueue some jobs in it
ctx := context.Background()go p.Run(ctx)time.Sleep(time.Millisecond)p.Enqueue(ctx, job{data: 1})p.Enqueue(ctx, job{data: 2})p.Enqueue(ctx, job{data: 3})time.Sleep(time.Millisecond) -
Run this code and you will see output like this
time=2025-11-11T22:01:26.630+03:00 level=INFO msg="worker started"time=2025-11-11T22:01:26.630+03:00 level=INFO msg="worker started"time=2025-11-11T22:01:26.631+03:00 level=INFO msg="job handled" data=1time=2025-11-11T22:01:26.631+03:00 level=INFO msg="job handled" data=3time=2025-11-11T22:01:26.631+03:00 level=INFO msg="job handled" data=2
Full code
Section titled “Full code”package main
import ( "context" "time"
"github.com/platforma-dev/platforma/log" "github.com/platforma-dev/platforma/queue")
type job struct { data int}
func jobHandler(ctx context.Context, job job) { log.InfoContext(ctx, "job handled", "data", job.data)}
func main() { ctx := context.Background()
q := queue.NewChanQueue[job](10, 3*time.Second) p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)
go p.Run(ctx) time.Sleep(time.Millisecond)
p.Enqueue(ctx, job{data: 1}) p.Enqueue(ctx, job{data: 2}) p.Enqueue(ctx, job{data: 3})
time.Sleep(time.Millisecond)}