[release-12.1.3] Chore: Update Redis library to v9 (#112367)
This commit is contained in:
committed by
GitHub
parent
dc5eaa55a2
commit
c62cfd57b4
@@ -68,7 +68,7 @@ require (
|
||||
github.com/go-openapi/loads v0.22.0 // @grafana/alerting-backend
|
||||
github.com/go-openapi/runtime v0.28.0 // @grafana/alerting-backend
|
||||
github.com/go-openapi/strfmt v0.23.0 // @grafana/alerting-backend
|
||||
github.com/go-redis/redis/v8 v8.11.5 // @grafana/grafana-backend-group
|
||||
github.com/go-redis/redis/v8 v8.11.5 // indirect; @grafana/grafana-backend-group
|
||||
github.com/go-sourcemap/sourcemap v2.1.4+incompatible // @grafana/grafana-backend-group
|
||||
github.com/go-sql-driver/mysql v1.9.3 // @grafana/grafana-search-and-storage
|
||||
github.com/go-stack/stack v1.8.1 // @grafana/grafana-backend-group
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestIntegrationRedisCacheStorage(t *testing.T) {
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
||||
@@ -14,9 +14,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/centrifugal/centrifuge"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gobwas/glob"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
@@ -4,11 +4,12 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/live/orgchannel"
|
||||
)
|
||||
@@ -73,40 +74,34 @@ func (c *RedisFrameCache) Update(ctx context.Context, orgID int64, channel strin
|
||||
|
||||
key := c.getCacheKey(orgchannel.PrependOrgID(orgID, channel))
|
||||
|
||||
pipe := c.redisClient.TxPipeline()
|
||||
defer func() { _ = pipe.Close() }()
|
||||
|
||||
pipe.HGetAll(ctx, key)
|
||||
pipe.HMSet(ctx, key, map[string]string{
|
||||
"schema": stringSchema,
|
||||
"frame": string(jsonFrame.Bytes(data.IncludeAll)),
|
||||
var mapReply *redis.MapStringStringCmd
|
||||
replies, err := c.redisClient.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
mapReply = pipe.HGetAll(ctx, key)
|
||||
pipe.HMSet(ctx, key, map[string]string{
|
||||
"schema": stringSchema,
|
||||
"frame": string(jsonFrame.Bytes(data.IncludeAll)),
|
||||
})
|
||||
pipe.Expire(ctx, key, frameCacheTTL)
|
||||
return nil
|
||||
})
|
||||
pipe.Expire(ctx, key, frameCacheTTL)
|
||||
|
||||
replies, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(replies) == 0 {
|
||||
return false, errors.New("no replies in response")
|
||||
}
|
||||
reply := replies[0]
|
||||
if mapReply.Err() != nil {
|
||||
return false, fmt.Errorf("error getting existing frame from redis: %w", mapReply.Err())
|
||||
}
|
||||
|
||||
if reply.Err() != nil {
|
||||
result, err := mapReply.Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if mapReply, ok := reply.(*redis.StringStringMapCmd); ok {
|
||||
result, err := mapReply.Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
return result["schema"] != stringSchema, nil
|
||||
if len(result) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
return true, nil
|
||||
return result["schema"] != stringSchema, nil
|
||||
}
|
||||
|
||||
func (c *RedisFrameCache) getCacheKey(channelID string) string {
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package managedstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -40,7 +41,7 @@ func TestIntegrationRedisCacheStorage(t *testing.T) {
|
||||
require.NotNil(t, c)
|
||||
testFrameCache(t, c)
|
||||
|
||||
keys, err := redisClient.Keys(redisClient.Context(), "*").Result()
|
||||
keys, err := redisClient.Keys(t.Context(), "*").Result()
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
@@ -54,13 +55,18 @@ func TestIntegrationRedisCacheStorage(t *testing.T) {
|
||||
|
||||
func redisCleanup(t *testing.T, redisClient *redis.Client, prefix string) func() {
|
||||
return func() {
|
||||
keys, err := redisClient.Keys(redisClient.Context(), prefix+"*").Result()
|
||||
ctx := t.Context()
|
||||
ctx = context.WithoutCancel(ctx)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
keys, err := redisClient.Keys(ctx, prefix+"*").Result()
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
_, err := redisClient.Del(redisClient.Context(), key).Result()
|
||||
_, err := redisClient.Del(ctx, key).Result()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user