feat(unified-storage): enhance gRPC client with dskit (#101035)
This commit is contained in:
committed by
GitHub
parent
b888f03d3f
commit
5897024dfe
@@ -5,7 +5,10 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"gocloud.dev/blob/fileblob"
|
||||
"google.golang.org/grpc"
|
||||
@@ -13,6 +16,9 @@ import (
|
||||
|
||||
authnlib "github.com/grafana/authlib/authn"
|
||||
"github.com/grafana/authlib/types"
|
||||
"github.com/grafana/dskit/flagext"
|
||||
"github.com/grafana/dskit/grpcclient"
|
||||
"github.com/grafana/dskit/middleware"
|
||||
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -39,6 +45,10 @@ type Options struct {
|
||||
Docs resource.DocumentBuilderSupplier
|
||||
}
|
||||
|
||||
type clientMetrics struct {
|
||||
requestDuration *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
// This adds a UnifiedStorage client into the wire dependency tree
|
||||
func ProvideUnifiedStorageClient(opts *Options) (resource.ResourceClient, error) {
|
||||
// See: apiserver.ApplyGrafanaConfig(cfg, features, o)
|
||||
@@ -104,11 +114,8 @@ func newClient(opts options.StorageOptions,
|
||||
return nil, fmt.Errorf("expecting address for storage_type: %s", opts.StorageType)
|
||||
}
|
||||
|
||||
// Create a connection to the gRPC server
|
||||
conn, err := grpc.NewClient(opts.Address,
|
||||
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
// Create a connection to the gRPC server.
|
||||
conn, err := grpcConn(opts.Address, reg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -153,3 +160,53 @@ func newResourceClient(conn *grpc.ClientConn, cfg *setting.Cfg, features feature
|
||||
}
|
||||
return resource.NewRemoteResourceClient(tracer, conn, clientCfgMapping(grpcutils.ReadGrpcClientConfig(cfg)), cfg.Env == setting.Dev)
|
||||
}
|
||||
|
||||
// grpcConn creates a new gRPC connection to the provided address.
|
||||
func grpcConn(address string, reg prometheus.Registerer) (*grpc.ClientConn, error) {
|
||||
// This works for now as the Provide function is only called once during startup.
|
||||
// We might eventually want to tight this factory to a struct for more runtime control.
|
||||
metrics := clientMetrics{
|
||||
requestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "resource_server_client_request_duration_seconds",
|
||||
Help: "Time spent executing requests to the resource server.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
|
||||
}, []string{"operation", "status_code"}),
|
||||
}
|
||||
|
||||
// Report gRPC status code errors as labels.
|
||||
unary, stream := instrument(metrics.requestDuration, middleware.ReportGRPCStatusOption)
|
||||
|
||||
cfg := grpcclient.Config{}
|
||||
// Set the defaults that are normally set by Config.RegisterFlags.
|
||||
flagext.DefaultValues(&cfg)
|
||||
|
||||
opts, err := cfg.DialOption(unary, stream)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not instrument grpc client: %w", err)
|
||||
}
|
||||
|
||||
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
|
||||
// Use round_robin to balances requests more evenly over the available Storage server.
|
||||
opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
|
||||
|
||||
// Disable looking up service config from TXT DNS records.
|
||||
// This reduces the number of requests made to the DNS servers.
|
||||
opts = append(opts, grpc.WithDisableServiceConfig())
|
||||
|
||||
// Create a connection to the gRPC server
|
||||
return grpc.NewClient(address, opts...)
|
||||
}
|
||||
|
||||
// instrument is the same as grpcclient.Instrument but without the middleware.ClientUserHeaderInterceptor
|
||||
// and middleware.StreamClientUserHeaderInterceptor as we don't need them.
|
||||
func instrument(requestDuration *prometheus.HistogramVec, instrumentationLabelOptions ...middleware.InstrumentationOption) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
|
||||
return []grpc.UnaryClientInterceptor{
|
||||
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
|
||||
middleware.UnaryClientInstrumentInterceptor(requestDuration, instrumentationLabelOptions...),
|
||||
}, []grpc.StreamClientInterceptor{
|
||||
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
|
||||
middleware.StreamClientInstrumentInterceptor(requestDuration, instrumentationLabelOptions...),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user