From 52b70ca9764049ed30afc889a941b9e23ea3548a Mon Sep 17 00:00:00 2001 From: Scott Lepper Date: Tue, 8 Oct 2024 09:43:23 -0400 Subject: [PATCH] Grafana indexing poc - load and watch index (#93843) * list resources and load into index * watch for changes and update index --- go.work.sum | 10 +- pkg/storage/unified/client.go | 5 +- pkg/storage/unified/resource/go.mod | 28 +++ pkg/storage/unified/resource/go.sum | 48 +++++ pkg/storage/unified/resource/index.go | 202 ++++++++++++++++++ pkg/storage/unified/resource/index_server.go | 203 +++++++++++++++++++ pkg/storage/unified/resource/server.go | 41 ++-- pkg/storage/unified/sql/server.go | 14 +- pkg/storage/unified/sql/service.go | 2 +- 9 files changed, 525 insertions(+), 28 deletions(-) create mode 100644 pkg/storage/unified/resource/index.go create mode 100644 pkg/storage/unified/resource/index_server.go diff --git a/go.work.sum b/go.work.sum index 95e844c912a..8fbaee04958 100644 --- a/go.work.sum +++ b/go.work.sum @@ -378,7 +378,6 @@ github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:9 github.com/blevesearch/goleveldb v1.0.1 h1:iAtV2Cu5s0GD1lwUiekkFHe2gTMCCNVj2foPclDLIFI= github.com/blevesearch/goleveldb v1.0.1/go.mod h1:WrU8ltZbIp0wAoig/MHbrPCXSOLpe79nz5lv5nqfYrQ= github.com/blevesearch/mmap-go v1.0.3/go.mod h1:pYvKl/grLQrBxuaRYgoTssa4rVujYYeenDp++2E+yvs= -github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac= github.com/blevesearch/snowball v0.6.1 h1:cDYjn/NCH+wwt2UdehaLpr2e4BwLIjN4V/TdLsL+B5A= github.com/blevesearch/snowball v0.6.1/go.mod h1:ZF0IBg5vgpeoUhnMza2v0A/z8m1cWPlwhke08LpNusg= github.com/blevesearch/stempel v0.2.0 h1:CYzVPaScODMvgE9o+kf6D4RJ/VRomyi9uHF+PtB+Afc= @@ -491,8 +490,6 @@ github.com/elastic/go-sysinfo v1.11.2/go.mod h1:GKqR8bbMK/1ITnez9NIsIfXQr25aLhRJ github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= github.com/elazarl/goproxy v0.0.0-20230731152917-f99041a5c027/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= -github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= -github.com/ettle/strcase v0.1.1 h1:htFueZyVeE1XNnMEfbqp5r67qAN/4r6ya1ysq8Q+Zcw= github.com/expr-lang/expr v1.16.2 h1:JvMnzUs3LeVHBvGFcXYmXo+Q6DPDmzrlcSBO6Wy3w4s= github.com/expr-lang/expr v1.16.2/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= @@ -587,8 +584,6 @@ github.com/grafana/alerting v0.0.0-20240830172655-aa466962ea18 h1:3cQ+d+fkNL2Eqp github.com/grafana/alerting v0.0.0-20240830172655-aa466962ea18/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= github.com/grafana/alerting v0.0.0-20240917171353-6c25eb6eff10 h1:oDbLKM34O+JUF9EQFS+9aYhdYoeNfUpXqNjFCLIxwF4= github.com/grafana/alerting v0.0.0-20240917171353-6c25eb6eff10/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/alerting v0.0.0-20240926233713-446ddd356f8d h1:HOK6RWTuVldWFtNbWHxPlTa2shZ+WsNJsxoRJhX56Zg= -github.com/grafana/alerting v0.0.0-20240926233713-446ddd356f8d/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 h1:AJKOtDKAOg8XNFnIZSmqqqutoTSxVlRs6vekL2p2KEY= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45/go.mod h1:01sXtHoRwI8W324IPAzuxDFOmALqYLCOhvSC2fUHWXc= @@ -852,6 +847,7 @@ github.com/stoewer/parquet-cli v0.0.7/go.mod h1:bskxHdj8q3H1EmfuCqjViFoeO3NEvs5l github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e h1:mOtuXaRAbVZsxAHVdPR3IjfmN8T1h2iczJLynhLybf8= github.com/substrait-io/substrait-go v0.4.2 h1:buDnjsb3qAqTaNbOR7VKmNgXf4lYQxWEcnSGUWBtmN8= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/tdewolff/minify/v2 v2.12.9 h1:dvn5MtmuQ/DFMwqf5j8QhEVpPX6fi3WGImhv8RUB4zA= github.com/tdewolff/minify/v2 v2.12.9/go.mod h1:qOqdlDfL+7v0/fyymB+OP497nIxJYSvX4MQWA8OoiXU= github.com/tdewolff/parse/v2 v2.6.8 h1:mhNZXYCx//xG7Yq2e/kVLNZw4YfYmeHbhx+Zc0OvFMA= @@ -1037,9 +1033,11 @@ golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1051,6 +1049,7 @@ golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457 h1:zf5N6UOrA487eEFacMePxjXAJctxKmyjKUsjA11Uzuk= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= @@ -1070,6 +1069,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf/go. google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= google.golang.org/genproto/googleapis/bytestream v0.0.0-20240730163845-b1a4ccb954bf h1:T4tsZBlZYXK3j40sQNP5MBO32I+rn6ypV1PpklsiV8k= google.golang.org/genproto/googleapis/bytestream v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:5/MT647Cn/GGhwTpXC7QqcaR5Cnee4v4MKCU1/nwnIQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index a48a08ee292..177af71a2f0 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -32,6 +32,7 @@ func ProvideUnifiedStorageClient( DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(cfg.DataPath, "grafana-apiserver")), Address: apiserverCfg.Key("address").MustString(""), } + ctx := context.Background() switch opts.StorageType { case options.StorageTypeFile: @@ -45,7 +46,7 @@ func ProvideUnifiedStorageClient( if err != nil { return nil, err } - backend, err := resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{ + backend, err := resource.NewCDKBackend(ctx, resource.CDKBackendOptions{ Bucket: bucket, }) if err != nil { @@ -76,7 +77,7 @@ func ProvideUnifiedStorageClient( // Use the local SQL default: - server, err := sql.NewResourceServer(db, cfg, features, tracer) + server, err := sql.NewResourceServer(ctx, db, cfg, features, tracer) if err != nil { return nil, err } diff --git a/pkg/storage/unified/resource/go.mod b/pkg/storage/unified/resource/go.mod index ac411a5fcd9..3be85811afb 100644 --- a/pkg/storage/unified/resource/go.mod +++ b/pkg/storage/unified/resource/go.mod @@ -17,8 +17,35 @@ require ( k8s.io/apimachinery v0.31.1 ) +require ( + github.com/RoaringBitmap/roaring v1.9.3 // indirect + github.com/bits-and-blooms/bitset v1.12.0 // indirect + github.com/blevesearch/bleve_index_api v1.1.10 // indirect + github.com/blevesearch/geo v0.1.20 // indirect + github.com/blevesearch/go-faiss v1.0.20 // indirect + github.com/blevesearch/go-porterstemmer v1.0.3 // indirect + github.com/blevesearch/gtreap v0.1.1 // indirect + github.com/blevesearch/mmap-go v1.0.4 // indirect + github.com/blevesearch/scorch_segment_api/v2 v2.2.15 // indirect + github.com/blevesearch/segment v0.9.1 // indirect + github.com/blevesearch/snowballstem v0.9.0 // indirect + github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect + github.com/blevesearch/vellum v1.0.10 // indirect + github.com/blevesearch/zapx/v11 v11.3.10 // indirect + github.com/blevesearch/zapx/v12 v12.3.10 // indirect + github.com/blevesearch/zapx/v13 v13.3.10 // indirect + github.com/blevesearch/zapx/v14 v14.3.10 // indirect + github.com/blevesearch/zapx/v15 v15.3.13 // indirect + github.com/blevesearch/zapx/v16 v16.1.5 // indirect + github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/mschoch/smat v0.2.0 // indirect + go.etcd.io/bbolt v1.3.9 // indirect +) + require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/blevesearch/bleve/v2 v2.4.2 github.com/bufbuild/protocompile v0.4.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -29,6 +56,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.6.0 github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/jhump/protoreflect v1.15.1 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/pkg/storage/unified/resource/go.sum b/pkg/storage/unified/resource/go.sum index e9b7986db13..391c6c08d49 100644 --- a/pkg/storage/unified/resource/go.sum +++ b/pkg/storage/unified/resource/go.sum @@ -13,6 +13,8 @@ cloud.google.com/go/iam v1.1.13/go.mod h1:K8mY0uSXwEXS30KrnVb+j54LB/ntfZu1dr+4zF cloud.google.com/go/storage v1.43.0 h1:CcxnSohZwizt4LCzQHWvBf1/kvtHUn7gk9QERXPyXFs= cloud.google.com/go/storage v1.43.0/go.mod h1:ajvxEa7WmZS1PxvKRq4bq0tFT3vMd502JwstCcYv0Q0= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM= +github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= @@ -55,6 +57,44 @@ github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/blevesearch/bleve/v2 v2.4.2 h1:NooYP1mb3c0StkiY9/xviiq2LGSaE8BQBCc/pirMx0U= +github.com/blevesearch/bleve/v2 v2.4.2/go.mod h1:ATNKj7Yl2oJv/lGuF4kx39bST2dveX6w0th2FFYLkc8= +github.com/blevesearch/bleve_index_api v1.1.10 h1:PDLFhVjrjQWr6jCuU7TwlmByQVCSEURADHdCqVS9+g0= +github.com/blevesearch/bleve_index_api v1.1.10/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= +github.com/blevesearch/geo v0.1.20 h1:paaSpu2Ewh/tn5DKn/FB5SzvH0EWupxHEIwbCk/QPqM= +github.com/blevesearch/geo v0.1.20/go.mod h1:DVG2QjwHNMFmjo+ZgzrIq2sfCh6rIHzy9d9d0B59I6w= +github.com/blevesearch/go-faiss v1.0.20 h1:AIkdTQFWuZ5LQmKQSebgMR4RynGNw8ZseJXaan5kvtI= +github.com/blevesearch/go-faiss v1.0.20/go.mod h1:jrxHrbl42X/RnDPI+wBoZU8joxxuRwedrxqswQ3xfU8= +github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo= +github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M= +github.com/blevesearch/gtreap v0.1.1 h1:2JWigFrzDMR+42WGIN/V2p0cUvn4UP3C4Q5nmaZGW8Y= +github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgYICSZ3w0tYk= +github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= +github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= +github.com/blevesearch/scorch_segment_api/v2 v2.2.15 h1:prV17iU/o+A8FiZi9MXmqbagd8I0bCqM7OKUYPbnb5Y= +github.com/blevesearch/scorch_segment_api/v2 v2.2.15/go.mod h1:db0cmP03bPNadXrCDuVkKLV6ywFSiRgPFT1YVrestBc= +github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU= +github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw= +github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s= +github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs= +github.com/blevesearch/upsidedown_store_api v1.0.2 h1:U53Q6YoWEARVLd1OYNc9kvhBMGZzVrdmaozG2MfoB+A= +github.com/blevesearch/upsidedown_store_api v1.0.2/go.mod h1:M01mh3Gpfy56Ps/UXHjEO/knbqyQ1Oamg8If49gRwrQ= +github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxyMIPI= +github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k= +github.com/blevesearch/zapx/v11 v11.3.10 h1:hvjgj9tZ9DeIqBCxKhi70TtSZYMdcFn7gDb71Xo/fvk= +github.com/blevesearch/zapx/v11 v11.3.10/go.mod h1:0+gW+FaE48fNxoVtMY5ugtNHHof/PxCqh7CnhYdnMzQ= +github.com/blevesearch/zapx/v12 v12.3.10 h1:yHfj3vXLSYmmsBleJFROXuO08mS3L1qDCdDK81jDl8s= +github.com/blevesearch/zapx/v12 v12.3.10/go.mod h1:0yeZg6JhaGxITlsS5co73aqPtM04+ycnI6D1v0mhbCs= +github.com/blevesearch/zapx/v13 v13.3.10 h1:0KY9tuxg06rXxOZHg3DwPJBjniSlqEgVpxIqMGahDE8= +github.com/blevesearch/zapx/v13 v13.3.10/go.mod h1:w2wjSDQ/WBVeEIvP0fvMJZAzDwqwIEzVPnCPrz93yAk= +github.com/blevesearch/zapx/v14 v14.3.10 h1:SG6xlsL+W6YjhX5N3aEiL/2tcWh3DO75Bnz77pSwwKU= +github.com/blevesearch/zapx/v14 v14.3.10/go.mod h1:qqyuR0u230jN1yMmE4FIAuCxmahRQEOehF78m6oTgns= +github.com/blevesearch/zapx/v15 v15.3.13 h1:6EkfaZiPlAxqXz0neniq35my6S48QI94W/wyhnpDHHQ= +github.com/blevesearch/zapx/v15 v15.3.13/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg= +github.com/blevesearch/zapx/v16 v16.1.5 h1:b0sMcarqNFxuXvjoXsF8WtwVahnxyhEvBSRJi/AUHjU= +github.com/blevesearch/zapx/v16 v16.1.5/go.mod h1:J4mSF39w1QELc11EWRSBFkPeZuO7r/NPKkHzDCoiaI8= github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -86,6 +126,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 h1:gtexQ/VGyN+VVFRXSFiguSNcXmS6rkKT+X7FdIrTtfo= +github.com/golang/geo v0.0.0-20210211234256-740aa86cb551/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -104,6 +146,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -162,6 +206,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= @@ -198,6 +244,8 @@ github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcY github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 h1:9G6E0TXzGFVfTnawRzrPl83iHOAV7L8NJiR8RSGYV1g= diff --git a/pkg/storage/unified/resource/index.go b/pkg/storage/unified/resource/index.go new file mode 100644 index 00000000000..39886ad875c --- /dev/null +++ b/pkg/storage/unified/resource/index.go @@ -0,0 +1,202 @@ +package resource + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" + "github.com/google/uuid" +) + +type Shard struct { + index bleve.Index + path string + batch *bleve.Batch +} + +type Index struct { + shards map[string]Shard + opts Opts + s *server +} + +func NewIndex(s *server, opts Opts) *Index { + idx := &Index{ + s: s, + opts: opts, + shards: make(map[string]Shard), + } + return idx +} + +func (i *Index) Init(ctx context.Context) error { + resourceTypes := fetchResourceTypes() + for _, rt := range resourceTypes { + r := &ListRequest{Options: rt} + list, err := i.s.List(ctx, r) + if err != nil { + return err + } + + for _, obj := range list.Items { + res, err := getResource(obj.Value) + if err != nil { + return err + } + + shard, err := i.getShard(tenant(res)) + if err != nil { + return err + } + err = shard.batch.Index(res.Metadata.Uid, obj) + if err != nil { + return err + } + } + + for _, shard := range i.shards { + err := shard.index.Batch(shard.batch) + if err != nil { + return err + } + shard.batch.Reset() + } + } + + return nil +} + +func (i *Index) Index(ctx context.Context, data *Data) error { + res, err := getResource(data.Value.Value) + if err != nil { + return err + } + tenant := tenant(res) + shard, err := i.getShard(tenant) + if err != nil { + return err + } + err = shard.index.Index(res.Metadata.Uid, data.Value.Value) + if err != nil { + return err + } + return nil +} + +func (i *Index) Delete(ctx context.Context, uid string, key *ResourceKey) error { + shard, err := i.getShard(key.Namespace) + if err != nil { + return err + } + err = shard.index.Delete(uid) + if err != nil { + return err + } + return nil +} + +func tenant(res *Resource) string { + return res.Metadata.Namespace +} + +type Metadata struct { + Name string + Namespace string + Uid string + CreationTimestamp string + Labels map[string]string + Annotations map[string]string +} + +type Resource struct { + Kind string + ApiVersion string + Metadata Metadata +} + +type Opts struct { + Workers int // This controls how many goroutines are used to index objects + BatchSize int // This is the batch size for how many objects to add to the index at once + Concurrent bool +} + +func createFileIndex() (bleve.Index, string, error) { + indexPath := fmt.Sprintf("%s%s.bleve", os.TempDir(), uuid.New().String()) + index, err := bleve.New(indexPath, createIndexMappings()) + if err != nil { + log.Fatalf("Failed to create index: %v", err) + } + return index, indexPath, err +} + +// TODO: clean this up. it was copied from owens performance test +func createIndexMappings() *mapping.IndexMappingImpl { + //Create mapping for the name and creationTimestamp fields in the metadata + nameFieldMapping := bleve.NewTextFieldMapping() + creationTimestampFieldMapping := bleve.NewDateTimeFieldMapping() + metaMapping := bleve.NewDocumentMapping() + metaMapping.AddFieldMappingsAt("name", nameFieldMapping) + metaMapping.AddFieldMappingsAt("creationTimestamp", creationTimestampFieldMapping) + metaMapping.Dynamic = false + + //Create a sub-document mapping for the metadata field + objectMapping := bleve.NewDocumentMapping() + objectMapping.AddSubDocumentMapping("metadata", metaMapping) + + // Map top level fields - just kind for now + kindFieldMapping := bleve.NewTextFieldMapping() + objectMapping.AddFieldMappingsAt("kind", kindFieldMapping) + objectMapping.Dynamic = false + + // Create the index mapping + indexMapping := bleve.NewIndexMapping() + indexMapping.DefaultMapping = objectMapping + indexMapping.DefaultMapping.Dynamic = false + + return indexMapping +} + +func getResource(data []byte) (*Resource, error) { + res := &Resource{} + err := json.Unmarshal(data, res) + if err != nil { + return nil, err + } + return res, nil +} + +func (i *Index) getShard(tenant string) (Shard, error) { + shard, ok := i.shards[tenant] + if ok { + return shard, nil + } + index, path, err := createFileIndex() + if err != nil { + return Shard{}, err + } + + shard = Shard{ + index: index, + path: path, + batch: index.NewBatch(), + } + // TODO: do we need to lock this? + i.shards[tenant] = shard + return shard, nil +} + +// TODO - fetch from api +func fetchResourceTypes() []*ListOptions { + items := []*ListOptions{} + items = append(items, &ListOptions{ + Key: &ResourceKey{ + Group: "playlist.grafana.app", + Resource: "playlists", + }, + }) + return items +} diff --git a/pkg/storage/unified/resource/index_server.go b/pkg/storage/unified/resource/index_server.go new file mode 100644 index 00000000000..ea45e335db8 --- /dev/null +++ b/pkg/storage/unified/resource/index_server.go @@ -0,0 +1,203 @@ +package resource + +import ( + "context" + "errors" + "log" + "strings" + + "google.golang.org/grpc" +) + +type IndexServer struct { + ResourceServer + s *server + index *Index + ws *indexWatchServer +} + +func (is IndexServer) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) { + res := &SearchResponse{} + return res, nil +} + +func (is IndexServer) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) { + return nil, nil +} + +func (is IndexServer) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) { + return nil, nil +} + +// Load the index +func (is *IndexServer) Load(ctx context.Context) error { + is.index = NewIndex(is.s, Opts{}) + err := is.index.Init(ctx) + if err != nil { + return err + } + return nil +} + +// Watch resources for changes and update the index +func (is *IndexServer) Watch(ctx context.Context) error { + rtList := fetchResourceTypes() + for _, rt := range rtList { + wr := &WatchRequest{ + Options: rt, + } + + go func() { + // TODO: handle error + err := is.s.Watch(wr, is.ws) + if err != nil { + log.Printf("Error watching resource %v", err) + } + }() + } + return nil +} + +// Init sets the resource server on the index server +// so we can call the resource server from the index server +// TODO: a chicken and egg problem - index server needs the resource server but the resource server is created with the index server +func (is *IndexServer) Init(ctx context.Context, rs *server) error { + is.s = rs + is.ws = &indexWatchServer{ + is: is, + context: ctx, + } + return nil +} + +func NewResourceIndexServer() ResourceIndexServer { + return &IndexServer{} +} + +type ResourceIndexer interface { + Index(ctx context.Context) (*Index, error) +} + +type indexWatchServer struct { + grpc.ServerStream + context context.Context + is *IndexServer +} + +func (f *indexWatchServer) Send(we *WatchEvent) error { + if we.Type == WatchEvent_ADDED { + return f.Add(we) + } + + if we.Type == WatchEvent_DELETED { + return f.Delete(we) + } + + if we.Type == WatchEvent_MODIFIED { + return f.Update(we) + } + + return nil +} + +func (f *indexWatchServer) RecvMsg(m interface{}) error { + return nil +} + +func (f *indexWatchServer) SendMsg(m interface{}) error { + return errors.New("not implemented") +} + +func (f *indexWatchServer) Context() context.Context { + if f.context == nil { + f.context = context.Background() + } + return f.context +} + +func (f *indexWatchServer) Index() *Index { + return f.is.index +} + +func (f *indexWatchServer) Add(we *WatchEvent) error { + data, err := getData(we.Resource) + if err != nil { + return err + } + err = f.Index().Index(f.context, data) + if err != nil { + return err + } + return nil +} + +func (f *indexWatchServer) Delete(we *WatchEvent) error { + // TODO: this seems flakey. Does a delete have a Resource or Previous? + // both cases have happened ( maybe because Georges pr was reverted ) + rs := we.Resource + if rs == nil { + rs = we.Previous + } + if rs == nil { + return errors.New("resource not found") + } + data, err := getData(rs) + if err != nil { + return err + } + err = f.Index().Delete(f.context, data.Uid, data.Key) + if err != nil { + return err + } + return nil +} + +func (f *indexWatchServer) Update(we *WatchEvent) error { + data, err := getData(we.Resource) + if err != nil { + return err + } + err = f.Index().Delete(f.context, data.Uid, data.Key) + if err != nil { + return err + } + err = f.Index().Index(f.context, data) + if err != nil { + return err + } + return nil +} + +type Data struct { + Key *ResourceKey + Value *ResourceWrapper + Uid string +} + +func getGroup(r *Resource) string { + v := strings.Split(r.ApiVersion, "/") + if len(v) > 0 { + return v[0] + } + return "" +} + +func getData(wr *WatchEvent_Resource) (*Data, error) { + r, err := getResource(wr.Value) + if err != nil { + return nil, err + } + + key := &ResourceKey{ + Group: getGroup(r), + Resource: r.Kind, + Namespace: r.Metadata.Namespace, + Name: r.Metadata.Name, + } + + value := &ResourceWrapper{ + ResourceVersion: wr.Version, + Value: wr.Value, + } + return &Data{Key: key, Value: value, Uid: r.Metadata.Uid}, nil +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index c44c1ac1ebc..0a8da09d322 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -100,25 +100,6 @@ type ResourceServerOptions struct { Now func() int64 } -type indexServer struct{} - -func (s indexServer) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) { - res := &SearchResponse{} - return res, nil -} - -func (s indexServer) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) { - return nil, nil -} - -func (s indexServer) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) { - return nil, nil -} - -func NewResourceIndexServer() ResourceIndexServer { - return indexServer{} -} - func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { if opts.Tracer == nil { opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") @@ -707,6 +688,28 @@ func (s *server) Origin(ctx context.Context, req *OriginRequest) (*OriginRespons return s.index.Origin(ctx, req) } +// Index returns the search index. If the index is not initialized, it will be initialized. +func (s *server) Index(ctx context.Context) (*Index, error) { + index := s.index.(*IndexServer) + if index.index == nil { + err := index.Init(ctx, s) + if err != nil { + return nil, err + } + + err = index.Load(ctx) + if err != nil { + return nil, err + } + + err = index.Watch(ctx) + if err != nil { + return nil, err + } + } + return index.index, nil +} + // IsHealthy implements ResourceServer. func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) { if err := s.Init(ctx); err != nil { diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index f2cd8f27c5b..bdb108fd47e 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -2,6 +2,7 @@ package sql import ( "context" + "errors" "github.com/grafana/authlib/claims" infraDB "github.com/grafana/grafana/pkg/infra/db" @@ -13,7 +14,7 @@ import ( ) // Creates a new ResourceServer -func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) { +func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) { opts := resource.ResourceServerOptions{ Tracer: tracer, } @@ -32,6 +33,17 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, features featuremgmt.Fea if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) { opts.Index = resource.NewResourceIndexServer() + server, err := resource.NewResourceServer(opts) + if err != nil { + return nil, err + } + // initialze the search index + indexer, ok := server.(resource.ResourceIndexer) + if !ok { + return nil, errors.New("index server does not implement ResourceIndexer") + } + _, err = indexer.Index(ctx) + return server, err } if features.IsEnabledGlobally(featuremgmt.FlagKubernetesFolders) { diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index ecd22176cc4..72d1cb0b279 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -84,7 +84,7 @@ func ProvideUnifiedStorageGrpcService( } func (s *service) start(ctx context.Context) error { - server, err := NewResourceServer(s.db, s.cfg, s.features, s.tracing) + server, err := NewResourceServer(ctx, s.db, s.cfg, s.features, s.tracing) if err != nil { return err }