Skip to content

Commit f35f7cc

Browse files
nyerglerjpalawaga
authored andcommitted
Pass at least one key to script executions
This allows Redis to route our execution to the correct node when running in a Redis Cluster.
1 parent 271b06e commit f35f7cc

1 file changed

Lines changed: 11 additions & 4 deletions

File tree

redis_queue.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ type RedisQueue interface {
2929
MetricsExporter
3030
}
3131

32+
// scriptKey returns a slice of strings containing at least one of the keys to
33+
// be used by a script. This allows Redis route our script execution to the
34+
// correct node in the event we're using a namespace.
35+
func scriptKey(ns, queueID string) []string {
36+
return []string{strings.Join([]string{ns, "queue", queueID}, ":")}
37+
}
38+
3239
// NewRedisQueue creates a new queue stored in redis.
3340
func NewRedisQueue(client redis.UniversalClient) RedisQueue {
3441
enqueueScript := redis.NewScript(`
@@ -189,7 +196,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error {
189196
args[2+3*i+1] = job.ID
190197
args[2+3*i+2] = jobm
191198
}
192-
return q.enqueueScript.Run(context.Background(), q.client, nil, args...).Err()
199+
return q.enqueueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err()
193200
}
194201

195202
func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) {
@@ -205,7 +212,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro
205212
if err != nil {
206213
return nil, err
207214
}
208-
res, err := q.dequeueScript.Run(context.Background(), q.client, nil,
215+
res, err := q.dequeueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID),
209216
opt.Namespace,
210217
opt.QueueID,
211218
opt.At.Unix(),
@@ -249,7 +256,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error {
249256
for i, job := range jobs {
250257
args[2+i] = job.ID
251258
}
252-
return q.ackScript.Run(context.Background(), q.client, nil, args...).Err()
259+
return q.ackScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err()
253260
}
254261

255262
func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) {
@@ -265,7 +272,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error)
265272
for i, jobID := range jobIDs {
266273
args[1+i] = jobID
267274
}
268-
res, err := q.findScript.Run(context.Background(), q.client, nil, args...).Result()
275+
res, err := q.findScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result()
269276
if err != nil {
270277
return nil, err
271278
}

0 commit comments

Comments
 (0)