From df0d1a2134cb5909847d93751556d9c7518434f3 Mon Sep 17 00:00:00 2001 From: Victoria Nadasdi Date: Mon, 20 May 2024 14:10:54 +0000 Subject: [PATCH] feat: parse prefix from redis URI for queues (#3836) For security reasons, scoping access to a redis server via ACL rules is a good practice. Some parts of the codebase handles prefix like cache[^1] and session[^2], but the queue module doesn't. This patch adds this missing functionality to the queue module. Note about relevant test: I tried to keep the PR as small as possible (and reasonable), and not change how the test runs. Updated the existing test to use the same redis address and basically duplicated the test with the extra flag. It does NOT test if the keys are correct, it ensures only it works as expected. To make assertions about the keys, the whole test has to be updated as the general wrapper doesn't allow the main test to check anything provider (redis) specific property. That's not something I wanted to take on now. [^1]: https://codeberg.org/forgejo/forgejo/src/commit/e4c3c039bee28bc10f5e6fe3a3a98812d01ce4be/modules/cache/cache_redis.go#L139-L150 [^2]: https://codeberg.org/forgejo/forgejo/src/commit/e4c3c039bee28bc10f5e6fe3a3a98812d01ce4be/modules/session/redis.go#L122-L129 Signed-off-by: Victoria Nadasdi Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/3836 Reviewed-by: Earl Warren Co-authored-by: Victoria Nadasdi Co-committed-by: Victoria Nadasdi --- .forgejo/workflows/testing.yml | 3 ++ modules/queue/base_redis.go | 41 ++++++++++++++++----- modules/queue/base_redis_test.go | 63 +++++++++++++++++++++++++++++--- release-notes/8.0.0/feat/3836.md | 1 + 4 files changed, 93 insertions(+), 15 deletions(-) create mode 100644 release-notes/8.0.0/feat/3836.md diff --git a/.forgejo/workflows/testing.yml b/.forgejo/workflows/testing.yml index cd955c01af..79c695e277 100644 --- a/.forgejo/workflows/testing.yml +++ b/.forgejo/workflows/testing.yml @@ -53,6 +53,8 @@ jobs: MINIO_DOMAIN: minio MINIO_ROOT_USER: 123456 MINIO_ROOT_PASSWORD: 12345678 + redis: + image: redis:7.2.4 steps: - uses: https://code.forgejo.org/actions/checkout@v3 - uses: https://code.forgejo.org/actions/setup-go@v4 @@ -82,6 +84,7 @@ jobs: env: RACE_ENABLED: 'true' TAGS: bindata + TEST_REDIS_SERVER: redis:6379 test-mysql: if: ${{ !startsWith(vars.ROLE, 'forgejo-') }} runs-on: docker diff --git a/modules/queue/base_redis.go b/modules/queue/base_redis.go index a1e234943d..abab0f5c5e 100644 --- a/modules/queue/base_redis.go +++ b/modules/queue/base_redis.go @@ -19,6 +19,7 @@ type baseRedis struct { client redis.UniversalClient isUnique bool cfg *BaseConfig + prefix string mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together } @@ -27,6 +28,22 @@ var _ baseQueue = (*baseRedis)(nil) func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) { client := nosql.GetManager().GetRedisClient(cfg.ConnStr) + prefix := "" + uri := nosql.ToRedisURI(cfg.ConnStr) + + for key, value := range uri.Query() { + switch key { + case "prefix": + if len(value) > 0 { + prefix = value[0] + + // As we are not checking any other values, if we found this one, we can + // exit from the loop. + // If a new key check is required, remove this break. + break + } + } + } var err error for i := 0; i < 10; i++ { @@ -41,7 +58,7 @@ func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) { return nil, err } - return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil + return &baseRedis{cfg: cfg, client: client, isUnique: unique, prefix: prefix}, nil } func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) { @@ -52,12 +69,16 @@ func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) { return newBaseRedisGeneric(cfg, true) } +func (q *baseRedis) prefixedName(name string) string { + return q.prefix + name +} + func (q *baseRedis) PushItem(ctx context.Context, data []byte) error { return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) { q.mu.Lock() defer q.mu.Unlock() - cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result() + cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result() if err != nil { return false, err } @@ -66,7 +87,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error { } if q.isUnique { - added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result() + added, err := q.client.SAdd(ctx, q.prefixedName(q.cfg.SetFullName), data).Result() if err != nil { return false, err } @@ -74,7 +95,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error { return false, ErrAlreadyInQueue } } - return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err() + return false, q.client.RPush(ctx, q.prefixedName(q.cfg.QueueFullName), data).Err() }) } @@ -83,7 +104,7 @@ func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) { q.mu.Lock() defer q.mu.Unlock() - data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes() + data, err = q.client.LPop(ctx, q.prefixedName(q.cfg.QueueFullName)).Bytes() if err == redis.Nil { return true, nil, nil } @@ -92,7 +113,7 @@ func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) { } if q.isUnique { // the data has been popped, even if there is any error we can't do anything - _ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err() + _ = q.client.SRem(ctx, q.prefixedName(q.cfg.SetFullName), data).Err() } return false, data, err }) @@ -104,13 +125,13 @@ func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) { if !q.isUnique { return false, nil } - return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result() + return q.client.SIsMember(ctx, q.prefixedName(q.cfg.SetFullName), data).Result() } func (q *baseRedis) Len(ctx context.Context) (int, error) { q.mu.Lock() defer q.mu.Unlock() - cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result() + cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result() return int(cnt), err } @@ -124,10 +145,10 @@ func (q *baseRedis) RemoveAll(ctx context.Context) error { q.mu.Lock() defer q.mu.Unlock() - c1 := q.client.Del(ctx, q.cfg.QueueFullName) + c1 := q.client.Del(ctx, q.prefixedName(q.cfg.QueueFullName)) // the "set" must be cleared after the "list" because there is no transaction. // it's better to have duplicate items than losing items. - c2 := q.client.Del(ctx, q.cfg.SetFullName) + c2 := q.client.Del(ctx, q.prefixedName(q.cfg.SetFullName)) if c1.Err() != nil { return c1.Err() } diff --git a/modules/queue/base_redis_test.go b/modules/queue/base_redis_test.go index be8bfbfe37..ed01a990ba 100644 --- a/modules/queue/base_redis_test.go +++ b/modules/queue/base_redis_test.go @@ -16,6 +16,17 @@ import ( "github.com/stretchr/testify/assert" ) +const defaultTestRedisServer = "127.0.0.1:6379" + +func testRedisHost() string { + value := os.Getenv("TEST_REDIS_SERVER") + if value != "" { + return value + } + + return defaultTestRedisServer +} + func waitRedisReady(conn string, dur time.Duration) (ready bool) { ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -47,25 +58,67 @@ func redisServerCmd(t *testing.T) *exec.Cmd { } func TestBaseRedis(t *testing.T) { + redisAddress := "redis://" + testRedisHost() + "/0" + queueSettings := setting.QueueSettings{ + Length: 10, + ConnStr: redisAddress, + } + var redisServer *exec.Cmd + if !waitRedisReady(redisAddress, 0) { + redisServer = redisServerCmd(t) + + if redisServer == nil { + t.Skip("redis-server not found in Forgejo test yet") + return + } + + assert.NoError(t, redisServer.Start()) + if !assert.True(t, waitRedisReady(redisAddress, 5*time.Second), "start redis-server") { + return + } + } + defer func() { if redisServer != nil { _ = redisServer.Process.Signal(os.Interrupt) _ = redisServer.Wait() } }() - if !waitRedisReady("redis://127.0.0.1:6379/0", 0) { + + testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false) + testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true) +} + +func TestBaseRedisWithPrefix(t *testing.T) { + redisAddress := "redis://" + testRedisHost() + "/0?prefix=forgejo:queue:" + queueSettings := setting.QueueSettings{ + Length: 10, + ConnStr: redisAddress, + } + + var redisServer *exec.Cmd + if !waitRedisReady(redisAddress, 0) { redisServer = redisServerCmd(t) - if true { + + if redisServer == nil { t.Skip("redis-server not found in Forgejo test yet") return } + assert.NoError(t, redisServer.Start()) - if !assert.True(t, waitRedisReady("redis://127.0.0.1:6379/0", 5*time.Second), "start redis-server") { + if !assert.True(t, waitRedisReady(redisAddress, 5*time.Second), "start redis-server") { return } } - testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", setting.QueueSettings{Length: 10}), false) - testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", setting.QueueSettings{Length: 10}), true) + defer func() { + if redisServer != nil { + _ = redisServer.Process.Signal(os.Interrupt) + _ = redisServer.Wait() + } + }() + + testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false) + testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true) } diff --git a/release-notes/8.0.0/feat/3836.md b/release-notes/8.0.0/feat/3836.md new file mode 100644 index 0000000000..1052c6d22a --- /dev/null +++ b/release-notes/8.0.0/feat/3836.md @@ -0,0 +1 @@ +Parse prefix parameter from redis URI for queues and use that as prefix to keys