diff --git a/.github/workflows/github-actions-ci.yml b/.github/workflows/github-actions-ci.yml new file mode 100644 index 0000000..c08ad86 --- /dev/null +++ b/.github/workflows/github-actions-ci.yml @@ -0,0 +1,30 @@ +name: Build and run tests +run-name: Build +on: [push] +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - run: make install + + - name: golangci-lint + uses: golangci/golangci-lint-action@v6 + with: + version: v1.64.5 + + - run: make test + - run: make enforce + + services: + redis: + image: redis + ports: + - 6379:6379 + diff --git a/.golangci.yml b/.golangci.yml index 2ac7e78..787948e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,9 +1,9 @@ output: - format: tab + formats: + - format: tab linters: disable-all: true enable: - - deadcode - depguard - dupl - goconst @@ -11,24 +11,39 @@ linters: - gocyclo - gofmt - goimports - - golint + - revive - gosec - govet - ineffassign - - maligned - misspell - prealloc - - scopelint - - structcheck + - copyloopvar + - unused - typecheck - unconvert - - varcheck issues: exclude-use-default: false - max-per-linter: 0 + max-issues-per-linter: 0 max-same-issues: 0 exclude-rules: - path: _test\.go linters: + - revive - dupl - scopelint + +# golangci-lint config +linters-settings: + depguard: + rules: + prevent_unmaintained_packages: + list-mode: lax # allow unless explicitely denied + files: + - $all + - "!$test" + allow: + - $gostd + deny: + - pkg: io/ioutil + desc: "replaced by io and os packages since Go 1.16: https://tip.golang.org/doc/go1.16#ioutil" + diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 875ee01..0000000 --- a/.travis.yml +++ /dev/null @@ -1,18 +0,0 @@ -dist: trusty -sudo: false -language: go -addons: - apt: - packages: - - redis-server -go: "1.14.2" -env: - - GO111MODULE=on -install: - - make setup - - make install -script: - - make lint - - make test - - make enforce - - make coveralls diff --git a/consumer.go b/consumer.go index 73af3db..b5e9b76 100644 --- a/consumer.go +++ b/consumer.go @@ -1,13 +1,15 @@ package redisqueue import ( + "context" + "log/slog" "net" "os" "sync" "time" - "github.com/go-redis/redis/v7" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" ) // ConsumerFunc is a type alias for the functions that will be used to handle @@ -52,13 +54,19 @@ type ConsumerOptions struct { BufferSize int // Concurrency dictates how many goroutines to spawn to handle the messages. Concurrency int + + // MaxDeliveryCount is the maximum number of times a message can be delivered + // before it is considered failed. If this is set to 0, the message will be + // retried indefinitely + MaxDeliveryCount int64 + // RedisClient supersedes the RedisOptions field, and allows you to inject // an already-made Redis Client for use in the consumer. This may be either // the standard client or a cluster client. RedisClient redis.UniversalClient // RedisOptions allows you to configure the underlying Redis connection. // More info here: - // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options. + // https://pkg.go.dev/github.com/redis/go-redis/v9#Options. // // This field is used if RedisClient field is nil. RedisOptions *RedisOptions @@ -99,14 +107,14 @@ var defaultConsumerOptions = &ConsumerOptions{ // BufferSize to 100, and Concurrency to 10. In most production environments, // you'll want to use NewConsumerWithOptions. func NewConsumer() (*Consumer, error) { - return NewConsumerWithOptions(defaultConsumerOptions) + return NewConsumerWithOptions(context.Background(), defaultConsumerOptions) } // NewConsumerWithOptions creates a Consumer with custom ConsumerOptions. If // Name is left empty, it defaults to the hostname; if GroupName is left empty, // it defaults to "redisqueue"; if BlockingTimeout is 0, it defaults to 5 // seconds; if ReclaimInterval is 0, it defaults to 1 second. -func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) { +func NewConsumerWithOptions(ctx context.Context, options *ConsumerOptions) (*Consumer, error) { hostname, _ := os.Hostname() if options.Name == "" { @@ -130,7 +138,7 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) { r = newRedisClient(options.RedisOptions) } - if err := redisPreflightChecks(r); err != nil { + if err := redisPreflightChecks(ctx, r); err != nil { return nil, err } @@ -182,7 +190,7 @@ func (c *Consumer) Register(stream string, fn ConsumerFunc) { // Run will terminate early. The same will happen if an error occurs when // creating the consumer group in Redis. Run will block until Shutdown is called // and all of the in-flight messages have been processed. -func (c *Consumer) Run() { +func (c *Consumer) Run(ctx context.Context) { if len(c.consumers) == 0 { c.Errors <- errors.New("at least one consumer function needs to be registered") return @@ -190,7 +198,7 @@ func (c *Consumer) Run() { for stream, consumer := range c.consumers { c.streams = append(c.streams, stream) - err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, consumer.id).Err() + err := c.redis.XGroupCreateMkStream(ctx, stream, c.options.GroupName, consumer.id).Err() // ignoring the BUSYGROUP error makes this a noop if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { c.Errors <- errors.Wrap(err, "error creating consumer group") @@ -202,8 +210,8 @@ func (c *Consumer) Run() { c.streams = append(c.streams, ">") } - go c.reclaim() - go c.poll() + go c.reclaim(ctx) + go c.poll(ctx) stop := newSignalHandler() go func() { @@ -214,7 +222,7 @@ func (c *Consumer) Run() { c.wg.Add(c.options.Concurrency) for i := 0; i < c.options.Concurrency; i++ { - go c.work() + go c.work(ctx) } c.wg.Wait() @@ -237,7 +245,7 @@ func (c *Consumer) Shutdown() { // If VisibilityTimeout is 0, this function returns early and no messages are // reclaimed. It checks the list of pending messages according to // ReclaimInterval. -func (c *Consumer) reclaim() { +func (c *Consumer) reclaim(ctx context.Context) { if c.options.VisibilityTimeout == 0 { return } @@ -256,7 +264,7 @@ func (c *Consumer) reclaim() { end := "+" for { - res, err := c.redis.XPendingExt(&redis.XPendingExtArgs{ + res, err := c.redis.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: stream, Group: c.options.GroupName, Start: start, @@ -275,8 +283,17 @@ func (c *Consumer) reclaim() { msgs := make([]string, 0) for _, r := range res { + slog.Info("pending message", "id", r.ID, "count", r.RetryCount, "max", c.options.MaxDeliveryCount) + if c.options.MaxDeliveryCount > 0 && r.RetryCount >= c.options.MaxDeliveryCount { + slog.Info("message exceeded delivery count limit", "id", r.ID, "count", r.RetryCount, "max", c.options.MaxDeliveryCount) + err = c.redis.XAck(ctx, stream, c.options.GroupName, r.ID).Err() + if err != nil { + c.Errors <- errors.Wrapf(err, "error acknowledging after retry count exceeded for %q stream and %q message, ", stream, r.ID) + continue + } + } if r.Idle >= c.options.VisibilityTimeout { - claimres, err := c.redis.XClaim(&redis.XClaimArgs{ + claimres, err := c.redis.XClaim(ctx, &redis.XClaimArgs{ Stream: stream, Group: c.options.GroupName, Consumer: c.options.Name, @@ -297,7 +314,7 @@ func (c *Consumer) reclaim() { // exists, the only way we can get it out of the // pending state is to acknowledge it. if err == redis.Nil { - err = c.redis.XAck(stream, c.options.GroupName, r.ID).Err() + err = c.redis.XAck(ctx, stream, c.options.GroupName, r.ID).Err() if err != nil { c.Errors <- errors.Wrapf(err, "error acknowledging after failed claim for %q stream and %q message", stream, r.ID) continue @@ -324,18 +341,18 @@ func (c *Consumer) reclaim() { // messages for this consumer to process. It blocks for up to 5 seconds instead // of blocking indefinitely so that it can periodically check to see if Shutdown // was called. -func (c *Consumer) poll() { +func (c *Consumer) poll(ctx context.Context) { for { select { case <-c.stopPoll: // once the polling has stopped (i.e. there will be no more messages // put onto c.queue), stop all of the workers - for i := 0; i < c.options.Concurrency; i++ { + for range c.options.Concurrency { c.stopWorkers <- struct{}{} } return default: - res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{ + res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: c.options.GroupName, Consumer: c.options.Name, Streams: c.streams, @@ -378,7 +395,7 @@ func (c *Consumer) enqueue(stream string, msgs []redis.XMessage) { // channel, it calls the corrensponding ConsumerFunc depending on the stream it // came from. If no error is returned from the ConsumerFunc, the message is // acknowledged in Redis. -func (c *Consumer) work() { +func (c *Consumer) work(ctx context.Context) { defer c.wg.Done() for { @@ -389,7 +406,7 @@ func (c *Consumer) work() { c.Errors <- errors.Wrapf(err, "error calling ConsumerFunc for %q stream and %q message", msg.Stream, msg.ID) continue } - err = c.redis.XAck(msg.Stream, c.options.GroupName, msg.ID).Err() + err = c.redis.XAck(ctx, msg.Stream, c.options.GroupName, msg.ID).Err() if err != nil { c.Errors <- errors.Wrapf(err, "error acknowledging after success for %q stream and %q message", msg.Stream, msg.ID) continue diff --git a/consumer_test.go b/consumer_test.go index 844ae6f..956549e 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1,12 +1,15 @@ package redisqueue import ( + "context" + "fmt" + "log/slog" "os" "testing" "time" - "github.com/go-redis/redis/v7" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -22,14 +25,14 @@ func TestNewConsumer(t *testing.T) { func TestNewConsumerWithOptions(t *testing.T) { t.Run("creates a new consumer", func(tt *testing.T) { - c, err := NewConsumerWithOptions(&ConsumerOptions{}) + c, err := NewConsumerWithOptions(context.Background(), &ConsumerOptions{}) require.NoError(tt, err) assert.NotNil(tt, c) }) t.Run("sets defaults for Name, GroupName, BlockingTimeout, and ReclaimTimeout", func(tt *testing.T) { - c, err := NewConsumerWithOptions(&ConsumerOptions{}) + c, err := NewConsumerWithOptions(context.Background(), &ConsumerOptions{}) require.NoError(tt, err) hostname, err := os.Hostname() @@ -44,7 +47,7 @@ func TestNewConsumerWithOptions(t *testing.T) { t.Run("allows override of Name, GroupName, BlockingTimeout, ReclaimTimeout, and RedisClient", func(tt *testing.T) { rc := newRedisClient(nil) - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(context.Background(), &ConsumerOptions{ Name: "test_name", GroupName: "test_group_name", BlockingTimeout: 10 * time.Second, @@ -61,7 +64,7 @@ func TestNewConsumerWithOptions(t *testing.T) { }) t.Run("bubbles up errors", func(tt *testing.T) { - _, err := NewConsumerWithOptions(&ConsumerOptions{ + _, err := NewConsumerWithOptions(context.Background(), &ConsumerOptions{ RedisOptions: &RedisOptions{Addr: "localhost:0"}, }) require.Error(tt, err) @@ -138,12 +141,14 @@ func TestRun(t *testing.T) { assert.Equal(tt, "at least one consumer function needs to be registered", err.Error()) }() - c.Run() + c.Run(context.Background()) }) t.Run("calls the ConsumerFunc on for a message", func(tt *testing.T) { + ctx := context.Background() + // create a consumer - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ VisibilityTimeout: 60 * time.Second, BlockingTimeout: 10 * time.Millisecond, BufferSize: 100, @@ -156,11 +161,11 @@ func TestRun(t *testing.T) { require.NoError(tt, err) // create consumer group - c.redis.XGroupDestroy(tt.Name(), c.options.GroupName) - c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$") + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") // enqueue a message - err = p.Enqueue(&Message{ + err = p.Enqueue(ctx, &Message{ Stream: tt.Name(), Values: map[string]interface{}{"test": "value"}, }) @@ -181,12 +186,13 @@ func TestRun(t *testing.T) { }() // run the consumer - c.Run() + c.Run(ctx) }) t.Run("reclaims pending messages according to ReclaimInterval", func(tt *testing.T) { + ctx := context.Background() // create a consumer - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ VisibilityTimeout: 5 * time.Millisecond, BlockingTimeout: 10 * time.Millisecond, ReclaimInterval: 1 * time.Millisecond, @@ -200,15 +206,15 @@ func TestRun(t *testing.T) { require.NoError(tt, err) // create consumer group - c.redis.XGroupDestroy(tt.Name(), c.options.GroupName) - c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$") + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") // enqueue a message msg := &Message{ Stream: tt.Name(), Values: map[string]interface{}{"test": "value"}, } - err = p.Enqueue(msg) + err = p.Enqueue(ctx, msg) require.NoError(tt, err) // register a handler that will assert the message and then shut down @@ -220,7 +226,7 @@ func TestRun(t *testing.T) { }) // read the message but don't acknowledge it - res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{ + res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: c.options.GroupName, Consumer: "failed_consumer", Streams: []string{tt.Name(), ">"}, @@ -242,12 +248,75 @@ func TestRun(t *testing.T) { }() // run the consumer - c.Run() + c.Run(ctx) + }) + + t.Run("reclaims pending messages maximum MaxDeliveryCount", func(tt *testing.T) { + ctx := context.Background() + var maxDeliveryCount int64 = 3 + var visibilityTimeout time.Duration = 5 * time.Millisecond + var reclaimInterval time.Duration = 1 * time.Millisecond + + // create a consumer + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ + VisibilityTimeout: visibilityTimeout, + BlockingTimeout: 10 * time.Millisecond, + ReclaimInterval: 1 * time.Millisecond, + BufferSize: 100, + Concurrency: 10, + MaxDeliveryCount: maxDeliveryCount, + }) + require.NoError(tt, err) + + // create a producer + p, err := NewProducer() + require.NoError(tt, err) + + // create consumer group + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") + + // enqueue a message + msg := &Message{ + Stream: tt.Name(), + Values: map[string]interface{}{"test": "value"}, + } + err = p.Enqueue(ctx, msg) + require.NoError(tt, err) + + var deliveryCount int64 = 0 + + // register a handler that will assert the message and then shut down + // the consumer + c.Register(tt.Name(), func(m *Message) error { + slog.Info("message received", "id", m.ID, "delivery count", deliveryCount) + + deliveryCount++ + assert.Equal(tt, msg.ID, m.ID) + return fmt.Errorf("dummy error") + }) + + // // watch for consumer errors + // go func() { + // <-c.Errors + // }() + + // run the consumer + go c.Run(ctx) + + // wait for more than VisibilityTimeout + (ReclaimInterval*number_higher_than_max_dlivery_count) to ensure that + // the message was reclaimed more than MaxDeliveryCount + + time.Sleep(visibilityTimeout + (reclaimInterval * 10) + 1*time.Millisecond) + c.Shutdown() + assert.Equal(tt, maxDeliveryCount, deliveryCount) }) t.Run("doesn't reclaim if there is no VisibilityTimeout set", func(tt *testing.T) { + ctx := context.Background() + // create a consumer - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ BlockingTimeout: 10 * time.Millisecond, ReclaimInterval: 1 * time.Millisecond, BufferSize: 100, @@ -256,15 +325,15 @@ func TestRun(t *testing.T) { require.NoError(tt, err) // create a producer - p, err := NewProducerWithOptions(&ProducerOptions{ - StreamMaxLength: 2, - ApproximateMaxLength: false, + p, err := NewProducerWithOptions(ctx, &ProducerOptions{ + StreamMaxLength: 2, + UseApproximate: false, }) require.NoError(tt, err) // create consumer group - c.redis.XGroupDestroy(tt.Name(), c.options.GroupName) - c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$") + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") // enqueue a message msg1 := &Message{ @@ -275,7 +344,7 @@ func TestRun(t *testing.T) { Stream: tt.Name(), Values: map[string]interface{}{"test": "value2"}, } - err = p.Enqueue(msg1) + err = p.Enqueue(ctx, msg1) require.NoError(tt, err) // register a handler that will assert the message and then shut down @@ -287,7 +356,7 @@ func TestRun(t *testing.T) { }) // read the message but don't acknowledge it - res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{ + res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: c.options.GroupName, Consumer: "failed_consumer", Streams: []string{tt.Name(), ">"}, @@ -299,7 +368,7 @@ func TestRun(t *testing.T) { require.Equal(tt, msg1.ID, res[0].Messages[0].ID) // add another message to the stream to let the consumer consume it - err = p.Enqueue(msg2) + err = p.Enqueue(ctx, msg2) require.NoError(tt, err) // watch for consumer errors @@ -309,10 +378,10 @@ func TestRun(t *testing.T) { }() // run the consumer - c.Run() + c.Run(ctx) // check if the pending message is still there - pendingRes, err := c.redis.XPendingExt(&redis.XPendingExtArgs{ + pendingRes, err := c.redis.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: tt.Name(), Group: c.options.GroupName, Start: "-", @@ -325,8 +394,10 @@ func TestRun(t *testing.T) { }) t.Run("acknowledges pending messages that have already been deleted", func(tt *testing.T) { + ctx := context.Background() + // create a consumer - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ VisibilityTimeout: 5 * time.Millisecond, BlockingTimeout: 10 * time.Millisecond, ReclaimInterval: 1 * time.Millisecond, @@ -336,22 +407,22 @@ func TestRun(t *testing.T) { require.NoError(tt, err) // create a producer - p, err := NewProducerWithOptions(&ProducerOptions{ - StreamMaxLength: 1, - ApproximateMaxLength: false, + p, err := NewProducerWithOptions(ctx, &ProducerOptions{ + StreamMaxLength: 1, + UseApproximate: false, }) require.NoError(tt, err) // create consumer group - c.redis.XGroupDestroy(tt.Name(), c.options.GroupName) - c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$") + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") // enqueue a message msg := &Message{ Stream: tt.Name(), Values: map[string]interface{}{"test": "value"}, } - err = p.Enqueue(msg) + err = p.Enqueue(ctx, msg) require.NoError(tt, err) // register a noop handler that should never be called @@ -361,7 +432,7 @@ func TestRun(t *testing.T) { }) // read the message but don't acknowledge it - res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{ + res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: c.options.GroupName, Consumer: "failed_consumer", Streams: []string{tt.Name(), ">"}, @@ -373,7 +444,7 @@ func TestRun(t *testing.T) { require.Equal(tt, msg.ID, res[0].Messages[0].ID) // delete the message - err = c.redis.XDel(tt.Name(), msg.ID).Err() + err = c.redis.XDel(ctx, tt.Name(), msg.ID).Err() require.NoError(tt, err) // watch for consumer errors @@ -389,10 +460,10 @@ func TestRun(t *testing.T) { }() // run the consumer - c.Run() + c.Run(ctx) // check that there are no pending messages - pendingRes, err := c.redis.XPendingExt(&redis.XPendingExtArgs{ + pendingRes, err := c.redis.XPendingExt(ctx, &redis.XPendingExtArgs{ Stream: tt.Name(), Group: c.options.GroupName, Start: "-", @@ -404,8 +475,9 @@ func TestRun(t *testing.T) { }) t.Run("returns an error on a string panic", func(tt *testing.T) { + ctx := context.Background() // create a consumer - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ VisibilityTimeout: 60 * time.Second, BlockingTimeout: 10 * time.Millisecond, BufferSize: 100, @@ -418,11 +490,11 @@ func TestRun(t *testing.T) { require.NoError(tt, err) // create consumer group - c.redis.XGroupDestroy(tt.Name(), c.options.GroupName) - c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$") + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") // enqueue a message - err = p.Enqueue(&Message{ + err = p.Enqueue(ctx, &Message{ Stream: tt.Name(), Values: map[string]interface{}{"test": "value"}, }) @@ -444,12 +516,13 @@ func TestRun(t *testing.T) { }() // run the consumer - c.Run() + c.Run(ctx) }) t.Run("returns an error on an error panic", func(tt *testing.T) { + ctx := context.Background() // create a consumer - c, err := NewConsumerWithOptions(&ConsumerOptions{ + c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{ VisibilityTimeout: 60 * time.Second, BlockingTimeout: 10 * time.Millisecond, BufferSize: 100, @@ -462,11 +535,11 @@ func TestRun(t *testing.T) { require.NoError(tt, err) // create consumer group - c.redis.XGroupDestroy(tt.Name(), c.options.GroupName) - c.redis.XGroupCreateMkStream(tt.Name(), c.options.GroupName, "$") + c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName) + c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$") // enqueue a message - err = p.Enqueue(&Message{ + err = p.Enqueue(ctx, &Message{ Stream: tt.Name(), Values: map[string]interface{}{"test": "value"}, }) @@ -488,6 +561,6 @@ func TestRun(t *testing.T) { }() // run the consumer - c.Run() + c.Run(ctx) }) } diff --git a/doc.go b/doc.go index 8dfe806..b5bdd84 100644 --- a/doc.go +++ b/doc.go @@ -2,106 +2,106 @@ Package redisqueue provides a producer and consumer of a queue that uses Redis streams (https://redis.io/topics/streams-intro). -Features +# Features The features of this package include: - - A `Producer` struct to make enqueuing messages easy. - - A `Consumer` struct to make processing messages concurrenly. - - Claiming and acknowledging messages if there's no error, so that if a consumer - dies while processing, the message it was working on isn't lost. This - guarantees at least once delivery. - - A "visibility timeout" so that if a message isn't processed in a designated - time frame, it will be be processed by another consumer. - - A max length on the stream so that it doesn't store the messages indefinitely - and run out of memory. - - Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight - messages complete. - - A channel that will surface any errors so you can handle them centrally. - - Graceful handling of panics to avoid crashing the whole process. - - A concurrency setting to control how many goroutines are spawned to process - messages. - - A batch size setting to limit the total messages in flight. - - Support for multiple streams. - -Example + - A `Producer` struct to make enqueuing messages easy. + - A `Consumer` struct to make processing messages concurrenly. + - Claiming and acknowledging messages if there's no error, so that if a consumer + dies while processing, the message it was working on isn't lost. This + guarantees at least once delivery. + - A "visibility timeout" so that if a message isn't processed in a designated + time frame, it will be be processed by another consumer. + - A max length on the stream so that it doesn't store the messages indefinitely + and run out of memory. + - Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight + messages complete. + - A channel that will surface any errors so you can handle them centrally. + - Graceful handling of panics to avoid crashing the whole process. + - A concurrency setting to control how many goroutines are spawned to process + messages. + - A batch size setting to limit the total messages in flight. + - Support for multiple streams. + +# Example Here's an example of a producer that inserts 1000 messages into a queue: - package main - - import ( - "fmt" - - "github.com/robinjoseph08/redisqueue/v2" - ) - - func main() { - p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{ - StreamMaxLength: 10000, - ApproximateMaxLength: true, - }) - if err != nil { - panic(err) - } - - for i := 0; i < 1000; i++ { - err := p.Enqueue(&redisqueue.Message{ - Stream: "redisqueue:test", - Values: map[string]interface{}{ - "index": i, - }, - }) - if err != nil { - panic(err) - } - - if i%100 == 0 { - fmt.Printf("enqueued %d\n", i) - } - } - } + package main + + import ( + "fmt" + + "github.com/robinjoseph08/redisqueue/v3" + ) + + func main() { + p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{ + StreamMaxLength: 10000, + ApproximateMaxLength: true, + }) + if err != nil { + panic(err) + } + + for i := 0; i < 1000; i++ { + err := p.Enqueue(&redisqueue.Message{ + Stream: "redisqueue:test", + Values: map[string]interface{}{ + "index": i, + }, + }) + if err != nil { + panic(err) + } + + if i%100 == 0 { + fmt.Printf("enqueued %d\n", i) + } + } + } And here's an example of a consumer that reads the messages off of that queue: - package main - - import ( - "fmt" - "time" - - "github.com/robinjoseph08/redisqueue/v2" - ) - - func main() { - c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{ - VisibilityTimeout: 60 * time.Second, - BlockingTimeout: 5 * time.Second, - ReclaimInterval: 1 * time.Second, - BufferSize: 100, - Concurrency: 10, - }) - if err != nil { - panic(err) - } - - c.Register("redisqueue:test", process) - - go func() { - for err := range c.Errors { - // handle errors accordingly - fmt.Printf("err: %+v\n", err) - } - }() - - fmt.Println("starting") - c.Run() - fmt.Println("stopped") - } - - func process(msg *redisqueue.Message) error { - fmt.Printf("processing message: %v\n", msg.Values["index"]) - return nil - } + package main + + import ( + "fmt" + "time" + + "github.com/robinjoseph08/redisqueue/v3" + ) + + func main() { + c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{ + VisibilityTimeout: 60 * time.Second, + BlockingTimeout: 5 * time.Second, + ReclaimInterval: 1 * time.Second, + BufferSize: 100, + Concurrency: 10, + }) + if err != nil { + panic(err) + } + + c.Register("redisqueue:test", process) + + go func() { + for err := range c.Errors { + // handle errors accordingly + fmt.Printf("err: %+v\n", err) + } + }() + + fmt.Println("starting") + c.Run() + fmt.Println("stopped") + } + + func process(msg *redisqueue.Message) error { + fmt.Printf("processing message: %v\n", msg.Values["index"]) + return nil + } */ package redisqueue diff --git a/go.mod b/go.mod index c798d27..c148ca2 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,30 @@ module github.com/robinjoseph08/redisqueue/v2 -go 1.12 +go 1.23 require ( + github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f + github.com/mattn/goveralls v0.0.2 + github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.7.0 + github.com/stretchr/testify v1.5.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.7.0 // indirect - github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f - github.com/go-redis/redis/v7 v7.3.0 - github.com/golang/protobuf v1.3.3 // indirect github.com/imdario/mergo v0.3.7 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/mattn/go-colorable v0.1.2 // indirect - github.com/mattn/goveralls v0.0.2 + github.com/mattn/go-isatty v0.0.8 // indirect + github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect github.com/pborman/uuid v1.2.0 // indirect - github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.5.1 + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tsuyoshiwada/go-gitcmd v0.0.0-20180205145712-5f1f5f9475df // indirect github.com/urfave/cli v1.20.0 // indirect + golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db // indirect gopkg.in/AlecAivazis/survey.v1 v1.8.5 // indirect gopkg.in/kyokomi/emoji.v1 v1.5.1 // indirect diff --git a/go.sum b/go.sum index 2208032..ce12fd1 100644 --- a/go.sum +++ b/go.sum @@ -1,38 +1,30 @@ github.com/Netflix/go-expect v0.0.0-20180615182759-c93bf25de8e8 h1:xzYJEypr/85nBpB11F9br+3HUrpgb+fcm5iADzXXYEw= github.com/Netflix/go-expect v0.0.0-20180615182759-c93bf25de8e8/go.mod h1:oX5x61PbNXchhh0oikYAH+4Pcfw5LKv21+Jnpr6r6Pc= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f h1:8l4Aw3Jmx0pLKYMkY+1b6yBPgE+rzRtA5T3vqFyI2Z8= github.com/git-chglog/git-chglog v0.0.0-20190611050339-63a4e637021f/go.mod h1:Dcsy1kii/xFyNad5JqY/d0GO5mu91sungp5xotbm3Yk= -github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg= -github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= -github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= -github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ= github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= @@ -43,18 +35,14 @@ github.com/mattn/goveralls v0.0.2 h1:7eJB6EqsPhRVxvwEXGnqdO2sJI0PTsrWoTMXEk9/OQc github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= -github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= -github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= @@ -64,45 +52,22 @@ github.com/tsuyoshiwada/go-gitcmd v0.0.0-20180205145712-5f1f5f9475df/go.mod h1:p github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= -golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180606202747-9527bec2660b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY= -golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db h1:9hRk1xeL9LTT3yX/941DqeBz87XgHAQuj+TbimYJuiw= golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= gopkg.in/AlecAivazis/survey.v1 v1.8.5 h1:QoEEmn/d5BbuPIL2qvXwzJdttFFhRQFkaq+tEKb7SMI= gopkg.in/AlecAivazis/survey.v1 v1.8.5/go.mod h1:iBNOmqKz/NUbZx3bA+4hAGLRC7fSK7tgtVDT4tB22XA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/kyokomi/emoji.v1 v1.5.1 h1:beetH5mWDMzFznJ+Qzd5KVHp79YKhVUMcdO8LpRLeGw= gopkg.in/kyokomi/emoji.v1 v1.5.1/go.mod h1:N9AZ6hi1jHOPn34PsbpufQZUcKftSD7WgS2pgpmH4Lg= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/message.go b/message.go index 827b669..ddf2989 100644 --- a/message.go +++ b/message.go @@ -4,7 +4,10 @@ package redisqueue // When enqueuing, it's recommended to leave ID empty and let Redis generate it, // unless you know what you're doing. type Message struct { - ID string - Stream string - Values map[string]interface{} + ID string + Stream string + StreamMaxLength int64 + StreamMinID string + TrimLimit int64 + Values map[string]interface{} } diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000..ed714fd --- /dev/null +++ b/mise.toml @@ -0,0 +1,2 @@ +[tools] +go = "1.23" diff --git a/producer.go b/producer.go index b52c028..edbe5f4 100644 --- a/producer.go +++ b/producer.go @@ -1,7 +1,10 @@ package redisqueue import ( - "github.com/go-redis/redis/v7" + "cmp" + "context" + + "github.com/redis/go-redis/v9" ) // ProducerOptions provide options to configure the Producer. @@ -15,17 +18,27 @@ type ProducerOptions struct { // So ideally, you'll set this number to be as high as you can makee it. // More info here: https://redis.io/commands/xadd#capped-streams. StreamMaxLength int64 - // ApproximateMaxLength determines whether to use the ~ with the MAXLEN + + // StreamMinID sets the minimum ID that will be used when calling XADD. This + // is useful when you want to ensure that the stream is trimmed to a certain + // point. More info here: https://redis.io/commands/xadd#capped-streams. + StreamMinID string + + // UseApproximate determines whether to use the ~ with the MAXLEN and MINID // option. This allows the stream trimming to done in a more efficient // manner. More info here: https://redis.io/commands/xadd#capped-streams. - ApproximateMaxLength bool + UseApproximate bool + + // TrimLimit sets LIMIT for XADD + TrimLimit int64 + // RedisClient supersedes the RedisOptions field, and allows you to inject // an already-made Redis Client for use in the consumer. This may be either // the standard client or a cluster client. RedisClient redis.UniversalClient // RedisOptions allows you to configure the underlying Redis connection. // More info here: - // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options. + // https://pkg.go.dev/github.com/redis/go-redis/v9#Options. // // This field is used if RedisClient field is nil. RedisOptions *RedisOptions @@ -39,19 +52,19 @@ type Producer struct { } var defaultProducerOptions = &ProducerOptions{ - StreamMaxLength: 1000, - ApproximateMaxLength: true, + StreamMaxLength: 1000, + UseApproximate: true, } // NewProducer uses a default set of options to create a Producer. It sets -// StreamMaxLength to 1000 and ApproximateMaxLength to true. In most production +// StreamMaxLength to 1000 and UseApproximate to true. In most production // environments, you'll want to use NewProducerWithOptions. func NewProducer() (*Producer, error) { - return NewProducerWithOptions(defaultProducerOptions) + return NewProducerWithOptions(context.Background(), defaultProducerOptions) } // NewProducerWithOptions creates a Producer using custom ProducerOptions. -func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) { +func NewProducerWithOptions(ctx context.Context, options *ProducerOptions) (*Producer, error) { var r redis.UniversalClient if options.RedisClient != nil { @@ -60,7 +73,7 @@ func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) { r = newRedisClient(options.RedisOptions) } - if err := redisPreflightChecks(r); err != nil { + if err := redisPreflightChecks(ctx, r); err != nil { return nil, err } @@ -74,18 +87,23 @@ func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) { // msg.Stream. While you can set msg.ID, unless you know what you're doing, you // should let Redis auto-generate the ID. If an ID is auto-generated, it will be // set on msg.ID for your reference. msg.Values is also required. -func (p *Producer) Enqueue(msg *Message) error { +func (p *Producer) Enqueue(ctx context.Context, msg *Message) error { + maxLen := cmp.Or(msg.StreamMaxLength, p.options.StreamMaxLength) + minID := cmp.Or(msg.StreamMinID, p.options.StreamMinID) + if maxLen > 0 { + minID = "" + } + args := &redis.XAddArgs{ ID: msg.ID, Stream: msg.Stream, Values: msg.Values, + MaxLen: maxLen, + MinID: minID, + Approx: p.options.UseApproximate, + Limit: cmp.Or(msg.TrimLimit, p.options.TrimLimit), } - if p.options.ApproximateMaxLength { - args.MaxLenApprox = p.options.StreamMaxLength - } else { - args.MaxLen = p.options.StreamMaxLength - } - id, err := p.redis.XAdd(args).Result() + id, err := p.redis.XAdd(ctx, args).Result() if err != nil { return err } diff --git a/producer_test.go b/producer_test.go index 6c00b9a..2e09d02 100644 --- a/producer_test.go +++ b/producer_test.go @@ -1,6 +1,7 @@ package redisqueue import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -18,7 +19,7 @@ func TestNewProducer(t *testing.T) { func TestNewProducerWithOptions(t *testing.T) { t.Run("creates a new producer", func(tt *testing.T) { - p, err := NewProducerWithOptions(&ProducerOptions{}) + p, err := NewProducerWithOptions(context.Background(), &ProducerOptions{}) require.NoError(tt, err) assert.NotNil(tt, p) @@ -27,7 +28,7 @@ func TestNewProducerWithOptions(t *testing.T) { t.Run("allows custom *redis.Client", func(tt *testing.T) { rc := newRedisClient(nil) - p, err := NewProducerWithOptions(&ProducerOptions{ + p, err := NewProducerWithOptions(context.Background(), &ProducerOptions{ RedisClient: rc, }) require.NoError(tt, err) @@ -37,7 +38,7 @@ func TestNewProducerWithOptions(t *testing.T) { }) t.Run("bubbles up errors", func(tt *testing.T) { - _, err := NewProducerWithOptions(&ProducerOptions{ + _, err := NewProducerWithOptions(context.Background(), &ProducerOptions{ RedisOptions: &RedisOptions{Addr: "localhost:0"}, }) require.Error(tt, err) @@ -48,29 +49,31 @@ func TestNewProducerWithOptions(t *testing.T) { func TestEnqueue(t *testing.T) { t.Run("puts the message in the stream", func(tt *testing.T) { - p, err := NewProducerWithOptions(&ProducerOptions{}) + ctx := context.Background() + p, err := NewProducerWithOptions(context.Background(), &ProducerOptions{}) require.NoError(t, err) msg := &Message{ Stream: tt.Name(), Values: map[string]interface{}{"test": "value"}, } - err = p.Enqueue(msg) + err = p.Enqueue(ctx, msg) require.NoError(tt, err) - res, err := p.redis.XRange(msg.Stream, msg.ID, msg.ID).Result() + res, err := p.redis.XRange(ctx, msg.Stream, msg.ID, msg.ID).Result() require.NoError(tt, err) assert.Equal(tt, "value", res[0].Values["test"]) }) t.Run("bubbles up errors", func(tt *testing.T) { - p, err := NewProducerWithOptions(&ProducerOptions{ApproximateMaxLength: true}) + ctx := context.Background() + p, err := NewProducerWithOptions(context.Background(), &ProducerOptions{UseApproximate: true}) require.NoError(t, err) msg := &Message{ Stream: tt.Name(), } - err = p.Enqueue(msg) + err = p.Enqueue(ctx, msg) require.Error(tt, err) assert.Contains(tt, err.Error(), "wrong number of arguments") diff --git a/redis.go b/redis.go index 0a5a68c..10177e1 100644 --- a/redis.go +++ b/redis.go @@ -1,13 +1,14 @@ package redisqueue import ( + "context" "fmt" "regexp" "strconv" "strings" - "github.com/go-redis/redis/v7" "github.com/pkg/errors" + "github.com/redis/go-redis/v9" ) var redisVersionRE = regexp.MustCompile(`redis_version:(.+)`) @@ -29,8 +30,8 @@ func newRedisClient(options *RedisOptions) *redis.Client { // offers the functionality we need. Specifically, it also that it can connect // to the actual instance and that the instance supports Redis streams (i.e. // it's at least v5). -func redisPreflightChecks(client redis.UniversalClient) error { - info, err := client.Info("server").Result() +func redisPreflightChecks(ctx context.Context, client redis.UniversalClient) error { + info, err := client.Info(ctx, "server").Result() if err != nil { return err } diff --git a/redis_test.go b/redis_test.go index 4614ab7..10dc2fa 100644 --- a/redis_test.go +++ b/redis_test.go @@ -1,6 +1,7 @@ package redisqueue import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -12,14 +13,14 @@ func TestNewRedisClient(t *testing.T) { options := &RedisOptions{} r := newRedisClient(options) - err := r.Ping().Err() + err := r.Ping(context.Background()).Err() assert.NoError(tt, err) }) t.Run("defaults options if it's nil", func(tt *testing.T) { r := newRedisClient(nil) - err := r.Ping().Err() + err := r.Ping(context.Background()).Err() assert.NoError(tt, err) }) } @@ -29,7 +30,7 @@ func TestRedisPreflightChecks(t *testing.T) { options := &RedisOptions{Addr: "localhost:0"} r := newRedisClient(options) - err := redisPreflightChecks(r) + err := redisPreflightChecks(context.Background(), r) require.Error(tt, err) assert.Contains(tt, err.Error(), "dial tcp")