From 5897024dfe0af9d98813601ee02ea9dea2d357ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Philippe=20Qu=C3=A9m=C3=A9ner?= Date: Thu, 20 Feb 2025 12:34:52 +0100 Subject: [PATCH] feat(unified-storage): enhance gRPC client with dskit (#101035) --- go.mod | 4 +-- pkg/storage/unified/client.go | 67 ++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 06cd809c8e6..2b11e0fabe6 100644 --- a/go.mod +++ b/go.mod @@ -447,9 +447,9 @@ require ( github.com/oklog/ulid/v2 v2.1.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect - github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e // indirect + github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e // @grafana/grafana-search-and-storage github.com/opentracing-contrib/go-stdlib v1.0.0 // indirect - github.com/opentracing/opentracing-go v1.2.0 // indirect + github.com/opentracing/opentracing-go v1.2.0 // @grafana/grafana-search-and-storage github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index 7b34b95ab92..b4200f31dee 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -5,7 +5,10 @@ import ( "fmt" "path/filepath" + otgrpc "github.com/opentracing-contrib/go-grpc" + "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "gocloud.dev/blob/fileblob" "google.golang.org/grpc" @@ -13,6 +16,9 @@ import ( authnlib "github.com/grafana/authlib/authn" "github.com/grafana/authlib/types" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/middleware" infraDB "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/tracing" @@ -39,6 +45,10 @@ type Options struct { Docs resource.DocumentBuilderSupplier } +type clientMetrics struct { + requestDuration *prometheus.HistogramVec +} + // This adds a UnifiedStorage client into the wire dependency tree func ProvideUnifiedStorageClient(opts *Options) (resource.ResourceClient, error) { // See: apiserver.ApplyGrafanaConfig(cfg, features, o) @@ -104,11 +114,8 @@ func newClient(opts options.StorageOptions, return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType) } - // Create a connection to the gRPC server - conn, err := grpc.NewClient(opts.Address, - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) + // Create a connection to the gRPC server. + conn, err := grpcConn(opts.Address, reg) if err != nil { return nil, err } @@ -153,3 +160,53 @@ func newResourceClient(conn *grpc.ClientConn, cfg *setting.Cfg, features feature } return resource.NewRemoteResourceClient(tracer, conn, clientCfgMapping(grpcutils.ReadGrpcClientConfig(cfg)), cfg.Env == setting.Dev) } + +// grpcConn creates a new gRPC connection to the provided address. +func grpcConn(address string, reg prometheus.Registerer) (*grpc.ClientConn, error) { + // This works for now as the Provide function is only called once during startup. + // We might eventually want to tight this factory to a struct for more runtime control. + metrics := clientMetrics{ + requestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "resource_server_client_request_duration_seconds", + Help: "Time spent executing requests to the resource server.", + Buckets: prometheus.ExponentialBuckets(0.008, 4, 7), + }, []string{"operation", "status_code"}), + } + + // Report gRPC status code errors as labels. + unary, stream := instrument(metrics.requestDuration, middleware.ReportGRPCStatusOption) + + cfg := grpcclient.Config{} + // Set the defaults that are normally set by Config.RegisterFlags. + flagext.DefaultValues(&cfg) + + opts, err := cfg.DialOption(unary, stream) + if err != nil { + return nil, fmt.Errorf("could not instrument grpc client: %w", err) + } + + opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler())) + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + + // Use round_robin to balances requests more evenly over the available Storage server. + opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) + + // Disable looking up service config from TXT DNS records. + // This reduces the number of requests made to the DNS servers. + opts = append(opts, grpc.WithDisableServiceConfig()) + + // Create a connection to the gRPC server + return grpc.NewClient(address, opts...) +} + +// instrument is the same as grpcclient.Instrument but without the middleware.ClientUserHeaderInterceptor +// and middleware.StreamClientUserHeaderInterceptor as we don't need them. +func instrument(requestDuration *prometheus.HistogramVec, instrumentationLabelOptions ...middleware.InstrumentationOption) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + return []grpc.UnaryClientInterceptor{ + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.UnaryClientInstrumentInterceptor(requestDuration, instrumentationLabelOptions...), + }, []grpc.StreamClientInterceptor{ + otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), + middleware.StreamClientInstrumentInterceptor(requestDuration, instrumentationLabelOptions...), + } +}