Skip to content

queue

The queue package provides a generic, concurrent job processing system with a channel-based queue implementation.

Core Components:

  • Processor: Manages worker pool and coordinates job processing lifecycle. Implements Runner interface so it can be used as an application service.
  • Handler: Interface for defining job processing logic
  • Provider: Interface for custom queue storage implementations
  • ChanQueue: Built-in channel-based queue implementation
  1. To use Processor first you need to define job type

    type job struct {
    data int
    }
  2. Then we need to implement handler. We will use HandlerFunc helper later so it enough for us to define a functions

    func jobHandler(ctx context.Context, job job) {
    log.InfoContext(ctx, "job handled", "data", job.data)
    }
  3. Then create new instance of ChanQueue

    q := queue.NewChanQueue[job](10, 3 * time.Second)

    There we set channel buffer size to 10 and timeout to enqueue new jobs to 3 seconds. You should probably tweak those values to your needs.

  4. Then let’s create new Processor

    p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)

    Besides handler and queue we set amount of workers and shutdown timeout to 1 second. Processor would use this time to drain jobs from queue after context cancellation.

  5. We are ready to start our processor and enqueue some jobs in it

    ctx := context.Background()
    go p.Run(ctx)
    time.Sleep(time.Millisecond)
    p.Enqueue(ctx, job{data: 1})
    p.Enqueue(ctx, job{data: 2})
    p.Enqueue(ctx, job{data: 3})
    time.Sleep(time.Millisecond)
  6. Run this code and you will see output like this

    time=2025-11-11T22:01:26.630+03:00 level=INFO msg="worker started"
    time=2025-11-11T22:01:26.630+03:00 level=INFO msg="worker started"
    time=2025-11-11T22:01:26.631+03:00 level=INFO msg="job handled" data=1
    time=2025-11-11T22:01:26.631+03:00 level=INFO msg="job handled" data=3
    time=2025-11-11T22:01:26.631+03:00 level=INFO msg="job handled" data=2
queue.go
package main
import (
"context"
"time"
"github.com/platforma-dev/platforma/log"
"github.com/platforma-dev/platforma/queue"
)
type job struct {
data int
}
func jobHandler(ctx context.Context, job job) {
log.InfoContext(ctx, "job handled", "data", job.data)
}
func main() {
ctx := context.Background()
q := queue.NewChanQueue[job](10, 3*time.Second)
p := queue.New(queue.HandlerFunc[job](jobHandler), q, 2, time.Second)
go p.Run(ctx)
time.Sleep(time.Millisecond)
p.Enqueue(ctx, job{data: 1})
p.Enqueue(ctx, job{data: 2})
p.Enqueue(ctx, job{data: 3})
time.Sleep(time.Millisecond)
}