c3d7dbc258
Return the SQL schema for all DS queries in request (to provide information to AI / Autocomplete for SQL expressions). All DS queries are treated as if they were inputs to SQL expressions in terms of conversion, regardless if they are selected in a query or not. Requires feature toggle queryService = true Endpoint is apis/query.grafana.app/v0alpha1/namespaces/default/sqlschemas --------- Co-authored-by: Todd Treece <360020+toddtreece@users.noreply.github.com>
171 lines
5.2 KiB
Go
171 lines
5.2 KiB
Go
package query
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strconv"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana/pkg/expr"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
errorsK8s "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apiserver/pkg/endpoints/request"
|
|
"k8s.io/apiserver/pkg/registry/rest"
|
|
|
|
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
service "github.com/grafana/grafana/pkg/services/query"
|
|
"github.com/grafana/grafana/pkg/web"
|
|
)
|
|
|
|
type sqlSchemaREST struct {
|
|
logger log.Logger
|
|
builder *QueryAPIBuilder
|
|
}
|
|
|
|
var (
|
|
_ rest.Storage = (*sqlSchemaREST)(nil)
|
|
_ rest.SingularNameProvider = (*sqlSchemaREST)(nil)
|
|
_ rest.Connecter = (*sqlSchemaREST)(nil)
|
|
_ rest.Scoper = (*sqlSchemaREST)(nil)
|
|
_ rest.StorageMetadata = (*sqlSchemaREST)(nil)
|
|
)
|
|
|
|
func newSQLSchemasREST(builder *QueryAPIBuilder) *sqlSchemaREST {
|
|
return &sqlSchemaREST{
|
|
logger: log.New("query.sqlschemas"),
|
|
builder: builder,
|
|
}
|
|
}
|
|
|
|
func (r *sqlSchemaREST) New() runtime.Object {
|
|
// This is added as the "ResponseType" regardless what ProducesObject() says :)
|
|
return &query.SQLSchemas{}
|
|
}
|
|
|
|
func (r *sqlSchemaREST) Destroy() {}
|
|
|
|
func (r *sqlSchemaREST) NamespaceScoped() bool {
|
|
return true
|
|
}
|
|
|
|
func (r *sqlSchemaREST) GetSingularName() string {
|
|
return "SQLSchema" // Used for the
|
|
}
|
|
|
|
func (r *sqlSchemaREST) ProducesMIMETypes(verb string) []string {
|
|
return []string{"application/json"} // and parquet!
|
|
}
|
|
|
|
func (r *sqlSchemaREST) ProducesObject(verb string) interface{} {
|
|
return &query.SQLSchemas{}
|
|
}
|
|
|
|
func (r *sqlSchemaREST) ConnectMethods() []string {
|
|
return []string{"POST"}
|
|
}
|
|
|
|
func (r *sqlSchemaREST) NewConnectOptions() (runtime.Object, bool, string) {
|
|
return nil, false, "" // true means you can use the trailing path as a variable
|
|
}
|
|
|
|
// called by mt query service and also when queryServiceFromUI is enabled, can be both mt and st
|
|
func (r *sqlSchemaREST) Connect(connectCtx context.Context, name string, _ runtime.Object, incomingResponder rest.Responder) (http.Handler, error) {
|
|
// See: /pkg/services/apiserver/builder/helper.go#L34
|
|
// The name is set with a rewriter hack
|
|
if name != "name" {
|
|
r.logger.Debug("Connect name is not name")
|
|
return nil, errorsK8s.NewNotFound(schema.GroupResource{}, name)
|
|
}
|
|
b := r.builder
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, httpreq *http.Request) {
|
|
ctx, span := b.tracer.Start(httpreq.Context(), "QueryService.GetSQLSchemas")
|
|
defer span.End()
|
|
ctx = request.WithNamespace(ctx, request.NamespaceValue(connectCtx))
|
|
traceId := span.SpanContext().TraceID()
|
|
connectLogger := b.log.New("traceId", traceId.String(), "rule_uid", httpreq.Header.Get("X-Rule-Uid"))
|
|
responder := newResponderWrapper(incomingResponder,
|
|
func(statusCode *int, obj runtime.Object) {
|
|
if *statusCode/100 == 4 {
|
|
span.SetStatus(codes.Error, strconv.Itoa(*statusCode))
|
|
}
|
|
|
|
if *statusCode >= 500 {
|
|
o, ok := obj.(*query.QueryDataResponse)
|
|
if ok && o.Responses != nil {
|
|
for refId, response := range o.Responses {
|
|
if response.ErrorSource == backend.ErrorSourceDownstream {
|
|
*statusCode = http.StatusBadRequest //force this to be a 400 since it's downstream
|
|
span.SetStatus(codes.Error, strconv.Itoa(*statusCode))
|
|
span.SetAttributes(attribute.String("error.source", "downstream"))
|
|
break
|
|
} else if response.Error != nil {
|
|
connectLogger.Debug("500 error without downstream error source", "error", response.Error, "errorSource", response.ErrorSource, "refId", refId)
|
|
span.SetStatus(codes.Error, "500 error without downstream error source")
|
|
} else {
|
|
span.SetStatus(codes.Error, "500 error without downstream error source and no Error message")
|
|
span.SetAttributes(attribute.String("error.ref_id", refId))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
|
|
func(err error) {
|
|
connectLogger.Error("error caught in handler", "err", err)
|
|
span.SetStatus(codes.Error, "query error")
|
|
|
|
if err == nil {
|
|
return
|
|
}
|
|
|
|
span.RecordError(err)
|
|
})
|
|
|
|
raw := &query.QueryDataRequest{}
|
|
err := web.Bind(httpreq, raw)
|
|
if err != nil {
|
|
connectLogger.Error("Hit unexpected error when reading query", "err", err)
|
|
err = errorsK8s.NewBadRequest("error reading query")
|
|
responder.Error(err)
|
|
return
|
|
}
|
|
|
|
qdr, err := handleSQLSchemaQuery(ctx, *raw, *b, httpreq, *responder, connectLogger)
|
|
if err != nil {
|
|
responder.Error(err)
|
|
return
|
|
}
|
|
|
|
responder.Object(200, &query.SQLSchemas{
|
|
SQLSchemas: qdr,
|
|
})
|
|
}), nil
|
|
}
|
|
|
|
func handlePreparedSQLSchema(ctx context.Context, pq *preparedQuery) (expr.SQLSchemas, error) {
|
|
resp, err := service.GetSQLSchemas(ctx, pq.logger, pq.cache, pq.exprSvc, pq.mReq, pq.builder, pq.headers)
|
|
pq.reportMetrics()
|
|
return resp, err
|
|
}
|
|
|
|
func handleSQLSchemaQuery(
|
|
ctx context.Context,
|
|
raw query.QueryDataRequest,
|
|
b QueryAPIBuilder,
|
|
httpreq *http.Request,
|
|
responder responderWrapper,
|
|
connectLogger log.Logger,
|
|
) (expr.SQLSchemas, error) {
|
|
pq, err := prepareQuery(ctx, raw, b, httpreq, connectLogger)
|
|
if err != nil {
|
|
responder.Error(err)
|
|
return nil, err
|
|
}
|
|
return handlePreparedSQLSchema(ctx, pq)
|
|
}
|