Files
grafana/pkg/services/live/live.go
T
Mariell Hoversholm 77fa2271be AppPlatform: Introduce experimental Github integration for dashboard configuration management (#96329)
* [Provisioning] Pay back some technical debt (#100720)

* Handle pagination in github client

* Add some unit test coverage

* Remove unknown repository

* Remove unknown leftover

* Revert "Add some unit test coverage"

This reverts commit 420c9674d2.

* Revert "Revert "Add some unit test coverage""

This reverts commit f7eca41957.

* Revert unit tests in github package

* Remove S3 case as it's now deprecated

* [Provisioning] Consolidate job status report in JobProgressRecorder (#100718)

* Log also successful operation

* Consolidate stop logic under TooManyErrors

* Use error for TooManyErrors

* Pass the progress recorder

* Define JobProgressRecorder interface

* Do not expect workers to return status

* Remove scenarios due to pointers

* Use recorder to manage the entire state

* Provisioning: Support rotating secrets (#100705)

* Provisioning: Refactor webhook to another interface (#100733)

* POC/Provisioning: Remove S3 references (#100734)

* Remove unused script

* Remove s3 references

* Provisioning: Keep the existing k8s name if it is specified in metadata (#100672)

* keep name

* keep name

* Revert "keep name"

This reverts commit 29f87bcaeb.

* Commit stale go.mod

* Keep name also for sync deletions

---------

Co-authored-by: Roberto Jimenez Sanchez <roberto.jimenez@grafana.com>

* Update preview banner copy

* Query Library: Move backend to enterprise (#100371)

* Fix wire

* Fix jobs table re-renders

* Provisioning: Refactor history to its own interface (#100735)

* Provisioning: Refactor history to its own interface

* refactor: use VersionedRepository

* Update API

* Provisioning: Test the GitHub client directly (#100808)

* Provisioning: Test the GitHub client directly

Instead of mocking the abstracted client, test it as well by mocking the underlying GitHub client. This also lets us
remove the mock for the abstracted client.

* refactor: move out helpers

* chore: set dependency owner

* Provisioning: Better clone/push error support (#100854)

* Provisioning: Replace searcher with one that knows about modes (#100857)

* Provisioning: Start in "mode5" when nothing exists in legacy (#100862)

* [Provisioning] Fix duplicate sync jobs triggered in controller (#100870)

* Improve logging on reasons why the controller triggered

* Fix messaging for sync job

* fix lint

* Provisioning: Move legacy export/import into a single migrate job (#100865)

* [Provisioning] Miscellanenous bug fixes and improvements (#100976)

* Error if found duplicate ID

* Fix issue with manual test button

* Fix issue with health errors not going away

* Display status in sync overview

* Use patch operations instead

* Trigger sync job after status update

* Convert Export Tab into modal

* Remove unused FieldSet import

* Only last 8 jobs

* Remove Links card

* Use button for Github Source Code

* Add actions to resources page

* Add resource column to Repository Resources

* Display Job Spec in RecentJobs

* Display dates in history page

* Display Avatar if available

* Improve styling of the avatar

* Update betterer

* Remove duplicate history header in history

* Commit betterer

* Address code styling issues

* update flags

* github v69

* v69

* POC/Provisioning: Add wizard (#100596)

* Chore: make update-workspace

* Chore: Fix lints (#101039)

* Provisioning: Workflows as write access (#101031)

* workflow as write access

* workflow as write access

* workflow as write access

* Update pkg/registry/apis/provisioning/repository/test.go

Co-authored-by: Mariell Hoversholm <mariell.hoversholm@grafana.com>

* POC/Provisioning: Add wizard (#100596)

* update refs

* update refs

* lint fix

* lint fix

* lint fix

* default everythign to read only

* reuse form components

* remove main

---------

Co-authored-by: Mariell Hoversholm <mariell.hoversholm@grafana.com>
Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* merge main

* Fix workflow types

* Betterer

* [Provisioning] Fix webhook and finalizer issues (#101052)

Fix webhook and finalizer issue

The maximum number of webhooks per repository is 20

* [Provisioning] Fix issue with last ref (#101056)

* Fix issue with last ref

* Update frontend code

* Fix the local tmp test

* Use lastRef

* POC/Provisioning: Simplify connect step (#101064)

* Fix sending workflows

* Use write for local

* Move connect action to the next step

* Remove wizard props

* Typo

* Redirect to wizard

* Show repo link after successful export

* Provisioning: Avoid starting sync jobs when using legacy storage (#101114)

* avoid starting sync jobs on legacy

* newlines

* Provisioning: Onboarding landing page (#101112)

* add landing page before wizard

* Update onboarding page

* Update URL

* Remove unused

* Add deleteAll button

* Improved text

* betterer

---------

Co-authored-by: Clarity-89 <homes89@ukr.net>

* Provisioning: use the sync job to finish the migrate job (#101107)

* Provisioning: Show progress more often (#101128)

* show progress bar earlier

* show progress bar earlier

* update wording to be less specific

* POC/Provisioning: Enable sync (#101131)

* update preview banner

* actualy remove and don't crash without provisioning flag

* Update db banner

* Provisioning: Export oldest items first (#101189)

* Provisioning: better branch handling (#101188)

* add missing file

* Provisioning: Fix tests (#101197)

* Provisioning: Refactor tests to be multiple functions

* Provisioning: Fix tests

* fix: make github-example sync

* fix misspell

* Provisioning: avoid migration wizard if things are already in unified storage (#101204)

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* more lint

* POC/Provisioning: Handle connect step errors (#101192)

* Provisioning: Unify status handling in repository and migrate steps

* Refactor: Move WorkflowsField outside RepositoryStep and use proper type

* Refactor: Improve repository verification error handling and UI

* Refactor: Simplify repository verification error handling

* Refactor: Simplify RepositoryStep component structure

* Refactor: Improve error handling in RepositoryStep

* Refactor: Remove redundant repository creation logic from ProvisioningWizard

* Refactor: Simplify RequestErrorAlert component

* show github error

* now will verify

* test .git

* recover from bad config

* Update error handling

* Remove unused prop

* merge upstream

* Show migration summary

* Update text

* Improve text

* Betterer

* [Provisioning] Review controller changes (#101216)

* Review health check conditions

* Move down the logic to set up the sync status

* Skip if it's only a health check rerun

* Fix health check conditions

* Preserve last ref

* Format code

* Rename to shouldSkipSync

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
Co-authored-by: Roberto Jiménez Sánchez <roberto.jimenez@grafana.com>

* fix promotion step

* In the promotion pipeline, publish should depend on build

* fix promotion pipeline

* [Provisioning] Use smaller methods to process repository events (#101240)

* update codegen

* merge main

* Provisioning: Avoid localhost error loop (#101253)

* Provisioning: Update the recent jobs formatting (#101250)

format history

* [Provisioning] Refactor Pull Request & Lint worker (#101273)

* Refactor the code

* Refactor into separate files

* Consolidate linter flag in one spot

* Use global feature flags

* Commit betterer

* Remove from JSON the intermidiate flag

* Use again spec

* Clean up

* Revert changes in test

* POC/Provisioning: Remove sync confirm modal (#101281)

* [Provisioning] Remove linting from MVP (#101286)

* Remove Linting backend

* Re-generate client

* POC/Provisioning: Unify tags (#101218)

* Unify tags

* add both tags

* add tag types

* Check for the redirect only once

* Add fetch settings with delay hook

* Refetch settings

* Split hooks into separate files

* Cleanup

* Prettier

* Prettier

* Remove lint code

* Betterer

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* Provisioning: Update token instructions (#101280)

* Provisioning: Use blob storage rather than local file system to save images (#101298)

* [Provisioning] Add more explicit setup warnings if webhook integration and image rendering are disabled (#101304)

* Setup warnings if renders or webhooks are not possible

* Improve display

* Use a single Alert

* Make design more compact

* Only display local config is missing critical feature toggles

* Improve styling of required flags

* Add file name to the custom ini itself

* Add copy button

* Add FeatureSection Component

* Commit betterer

* Use an interactive table

* Use a modal for instructions

* Use the same modal for example config

* Improve setup steps

* Improve stepper

* Copy code ctrl + c

* Make it more compact

* Select feature to enable

* Improve the height of alerts

* Separate components

* Better warnings

* Improve the page

* Improve the cards

* Improve cards even more

* Improve cards

* Improve cards

* Optional copy in code block

* Add side bar with steps

* Improve styling

* Style modal

* Clean up Code

* Remove index file

* Simplify Step Component

* Commit betterer

* Simplify components

* Use CodeEditor and Clipboard components

* Do not show scrollback on minimum size

* Fix positioning of footer

* Separate Component for Feature

* Use different styling

* Commit betterer

* Use more Grafana components in the FeatureCard

* Separate sidebar into own components

* Simplify sidebar code

* Commit betterer

* Remove connector

* Simplify styling further

* Use cards

* Improve code

* Use more grafana component in InstructionsModal

* Further simplify

* Simplify the code

* Simplify style

* Clean up

* Simplify the Wizard

* Use little icons

* Improve feature cards

* Improve cards

* Commit betterer

* Add description to feature setup

* Improve instructions for snapshot preview

* Move all files into Setup folder

* Commit betterer

* Clean up the warnings code

* Improve coding

* Move sidebar item to separate fiel

* Rename components

* Fix issues

* Use stack instead

* Improve style

* Don't show setup button if configured already

* Simplify again CSS

* Use secondary actions

* Style a bit more

* Improve wording

* Update warning

* Refer to docs in Image Renderer

* More clean up

* Revert changes in generated client

* Fix typos and imports

* Fix lint errors

* Provisioning: better error support (#101490)

* update openapi snapshot

* fix build

* Provisioning: Only show setup page when feature toggles are missing (#101502)

* form fixing

* form fixing

* always send UID

* Same onboarding page regarless of migration (#101557)

* backend building... frontend still broken

* rename sync with main

* Provisioning: Update dashboard badge (#101599)

* Rename to push / pull everything user-facing (#101577)

* Rename to push / pull all everything user-facing

* Use automatic pulling wording

* Provisioning: Migrate when using unified storage (#101572)

* migrate when not unified

* Update pkg/registry/apis/provisioning/register.go

Co-authored-by: Roberto Jiménez Sánchez <roberto.jimenez@grafana.com>

* variables

* merge main

---------

Co-authored-by: Roberto Jiménez Sánchez <roberto.jimenez@grafana.com>

* Add tabs and features tab to listing page (#101570)

* List features in onboarding page (#101558)

* merge main

* POC/Provisioning: Check if the instance is provisioned (#101601)

* Check if the instance is provisioned

* Fix lints

* Fix getting config for new dashboard

* Fixes after merge

* More fixes

* Show success message

* Fix default value

* Add test

* Fix lints

* Provisioning: Include URLs in ResourceWrapper response (#101511)

* Convert Migrate wizard into a Connection Wizard (#101575)

* Convert Migrate wizard into a Connection Wizard

* Remove duplicate empty state

* Allow users to select target in the first step

* Remove file created by merge

* Select target based on existing connections

* Default option for targets and explainatory alert

* Do not display connect button if single connection

* Display target as tag in repository card

* Add Pull Step

* Fix linting

* User decides if migrate or connect

* Improve style based based on review

* Provisioning: Return upsert resource when writing (#101574)

* [Provisioning] Getting Started Page and Tab (#101701)

* merge main

* fix go.mod

* Provisioning: Redirect to the new URL after save (#101757)

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* POC/Provisioning: create folder (#101619)

* Add NewProvisionedFolderForm

* Cleanup

* Add folder api

* Register API

* Do not show provisioned badge for instances

* Enable saving

* Show saved alert

* Fixes

* Fix deps

* Cleanup

* Add test

* Add test

* Updates

* Fix test

* Fix import

* [Provisioning] Display instance connection directly in home (#101720)

* Show tabs directly in home page for instance sync

* Display actions also in the home page

* Add delete button to actions

* Fix issue with files tab

* Display tabs also in instant sync

* Generate connection name for instance sync

* Fix issues when no repositories

* Set active tab

* Remove leftover

* Rename component for listing folder repository

* Fix linting issues

* Fix merge error

* Rename to HomePage

* Split folder list into separate component

* Create own component for repository card

* Improve RepositoryCard readability

* Improve RepositoryCard styling

* Make view primary button

* Fix syntax

* Fix generateName

* merge main

* Provisioning: Move folder management into its own helper (#101864)

* folder

* update folder links

* Fix test

* update

* cleanup

---------

Co-authored-by: Clarity-89 <homes89@ukr.net>

* fixed folder issue

* POC/Provisioning: Create folder from root (#101921)

* Enable creating folder at the root

* Fix test

* Add interceptor

* Provisioning: Expose stats (#101927)

* [Provisioning] Unified onboarding wizard (#101952)

* Spike the solution

* More work

* Add more situations

* Attempt to display count of dashboards and folders

* Attempt with file count

* Do not display options if not possible

* Improve styling resources

* Use another API

* Fix issue with selection

* Style a bit

* Fix more issues

* Make the sync step work

* Improve links

* Use LinkButton

* Start pull automatically

* Start migration automatically

* Fix issue with options

* Fix issues

* Fix loading error

* Improve more things

* Improve styling

* Improve messaging

* Set the autofocus

* Fix some issues

* Fix issue with disabled options

* Only resources

* Finish settings depending on configuration

* Move title to wizard

* Fix title

* Improve styling

* Badge

* Explain on hover

* Improve styling

* Disabled at the bottom

* History & identifiers

* Improve wording

* Add padding left and right disable options

* Delete repository

* Improve buttons

* Give index time to catch up

* Improve buttons

* Handle steps with only forms

* Fix issue with initial migrate or pull

* Commit betterer

* Error messages

* Use memo

* Revampt that a bit

* Attempt to simplify the state and components

* Improve the component for Migrate

* Commit betterer

* Fix issue in next button

* Clean up more

* Start for boostrap step

* Fix issue with running status

* Fix issue with loading bootstrapping

* Improve loading

* Improve more the loading

* Fix issue with loading

* Empty tree

* Handle error

* Fix issue with looping

* Remove commented out lines

* Add comment

* Remove accidental file

* Fix imports

* Improve MigrateStep and PullStep

* Use hook for step status

* JobStep component

* Refactor data fetching

* Validate with Github

* Fix issue with failed error

* Fix next on success

* Address small comments

* Separate file for WizardContent

* Fix linting

* Use step approach also for bootstrap

* Make the logic for moving between steps clearer

* Fix navigation issue

* Clean up some logic

* Use useAsync for JobStep steps

* Revert "Use useAsync for JobStep steps"

This reverts commit 242a275cc9.

* Provisioning: use service to get counts (#101972)

counts

* must migrate when using legacy storage

* Revert "Revert "Use useAsync for JobStep steps""

This reverts commit a420d0ac36.

* Fix async conditions

* Organize imports

* Separate component for BootstrapOptionCard

* BootstrapOptionsList

* Remove duplicate definitions

---------

Co-authored-by: Clarity-89 <homes89@ukr.net>
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* Merge

* Fix utils

* POC/Provisioning: Fix folder path for dashboard (#101997)

* Provisioning: Fix folder path for dashboards

* Fix isNew

* Update test

* Fix any error

* Betterer

* [Provisioning] Improve progress recording and updates (#102035)

* do not validate on delete

* Provisioning: Implement authorizer for remaining resources (#101945)

* feat: implement authorizer for remaining resources

* fix: don't allow viewers to write files

* security: harden blob id fetching

* add integration test for admin vs viewer

* feat: only Get is a valid verb for reads in our subresources

Co-Authored-By: Ryan McKinley <ryantxu@gmail.com>

* feat: allow render for all requests

* refactor: use guards

Not changing code that goes `if a { } else if b { } else { }` as the semantic meaning of the different branches is
easier to parse.

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* POC/Provisioning: Improve wizard setup (#102066)

* wizard actions

* workign better

* remove more memo

* show polling interval

* cleanup

* finalizers

* Update public/app/features/provisioning/Wizard/BootstrapStep.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/BootstrapStep.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/BootstrapStep.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/BootstrapStep.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/BootstrapStep.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/WizardContent.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/WizardContent.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* Update public/app/features/provisioning/Wizard/BootstrapStep.tsx

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>

* updates from alex

* updates from alex

* Simplify actions

* Extract props

* history supported form legacy only (for now)

* More refactor

* change order

* Fix cleanup finalizer

* show kinds

* fix lint

---------

Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>
Co-authored-by: Clarity-89 <homes89@ukr.net>
Co-authored-by: Roberto Jimenez Sanchez <roberto.jimenez@grafana.com>

* Fix resource condition (#102086)

* Fix job summary stats issue (#102084)

* merge main

* Repository link should point to configured branch (#102092)

* Add getRepoHref

* Fix random string generation

* Redirect to Home on repository deletion (#102096)

* Fix extra commas in pull request comment (#102108)

* Fix image rendering endpoint (#102107)

* POC/Provisioning: Support migrate... when starting with unified storage (#102097)

* use same clone

* now using upsert

* Fix lint

---------

Co-authored-by: Roberto Jimenez Sanchez <roberto.jimenez@grafana.com>

* Clean up unprovisioned resources after unified storage migration (#102126)

* Clean up unprovisioned resources after migrate

* Clean up unprovisioned resources after migrate

* Update pkg/registry/apis/provisioning/jobs/migrate/resources.go

* Reset summary between export and pull (#102101)

* Reset summary between export and pull

* Add reset results to unified storage migration

* Provisioning: always dirty (#102151)

* fix test version

* log the watch line

* POC/Provisioning: Disable repository list watch (#102169)

* Disable watch for repo list endpoint

* Add comment

* Remove another watch

* Provisioning: Avoid calling test on every update (#102161)

test less often

* Provisioning: Support prefixes in GitHub repositories (#101969)

* feat: add a Prefix property to GitHub repo spec

* feat: make nested folders work properly

* feat: use subdir for go-git export

* fix: placeholder for prefix should be grafana/

* feat: rename prefix to path

* fix: json name should be path, too

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* chore: regen apis

* fix: copy 'path'

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* Merge

* Fix duplicate type

* Provisioning: Test export functionality (#101336)

* Provisioning: Test export functionality

* fix: use context.Background

* test: add more cases for local repo path resolving

* test: rework test inputs

* test: try to make github test work

* fix: clear global state

* Update api client imports

* Fix import

* Fix test

* Update codegen

* Provisioning: Make it green (#102271)

* chore: remove unused functions

* chore: update betterer results

* chore: update openapi spec

* chore: yarn generate-apis

* fix: specify default false if undefined

* Use AnnoKeyManagerIdentity

* Add manager kind

* POC/Provisioning: Update component structure (#102297)

* Update project structure

* Update imports

* Remove unused components

* Copy fixes

* Typo

* More copy fixes

* Betterer

* Update test

* merge main

* Provisioning: Replace hardcoded clients with discovery client (#101918)

* disco client

* discovery client

* merge main

* merge main

* keep factory

* keep factory

* find preffered version for delete factory

* use same folders request

* merge main

* with integration test

* POC/Provisioning: Compare spec in test rather than raw JSON (#102352)

* compare spec not json

* compare spec not json

* [Provisioning] Add in-code TODOs in API Server area (#102360)

* Add TODOs for files endpoint

* Add TODO history endpoint

* Add TODO to move files logic to resource package

* Add TODO to not use private fields directly

* Remove unnecessary checks in list connector

* Add pagination TODO in lister

* Add TODO to rename resources

* Add todo about cloning too early

* Add TODO to propose to merge sync and migrate endpoints

* Add TODOs in register

* Add more TODOs in connectors & routes

* Add TODOs about prefix

* Change it to remove

* Update pkg/registry/apis/provisioning/test.go

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* POC/Provisioning: Switch folders to use managedBy (#102362)

* Folders: Switch to managedBy

* Fix create folder

* Fix tests

* Do not allow changing folder from dashboard settings

* Update imports

* Update provisioned meta

* Do not show provisioned badge for child folders

* Fix folder title

* Update folder actions

* Update new provisioned folder form

* Remove unused code

* Fix condition

* Reset default values on change

* Remove duplicate nav item

* Add managedBy to DashboardQueryResult

* Provisioning: support watch over live (in feature branch) (#102408)

* Provisioning: watch cleanup (#102424)

* fix lint

* Provisioning: Add basic usage stats (#102405)

* [Provisioning] Add limitations to Github Repository (#102451)

* Put limits to Github

* File is too large

* Move constants

* Embed ListOptions again

* Remove TODO

* Provisioning: Pick a better default title (#102516)

better title

* Provisioning: sanitize pull request urls (#102517)

* [Provisioning] Clean up clone after export and migrate (#102467)

* Remove clone directory on clone failure

* Defer remove clones

* Log error if removal fails

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* [Provisioning] Limit path length and depth in APIs (#102472)

* Limit filepath length in files API calls

* Add common utility to deal with paths

* Use the existing function

* Fix import

* Update pkg/registry/apis/provisioning/safepath/limit.go

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* Fix issue after website commit

* Fix linting issue in test

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* Add timeout, JSON check and max body size to endpoints (#102443)

* Add timeout, JSON check and max body size to endpoints

* Use http.MaxBytesReader instead

* Use MaxBytesReader also for reading the entire body

* Add empty line

* Add unit tests

* Fix integration tests

* Update pkg/registry/apis/provisioning/render.go

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* Do not use utils :)

* Fix comment on unmarshalJSON

* 25MB for webhook events

* Remove content type check for files write

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* fix imports

* Provisioning: Remove export option from the UI (#102511)

* [Provisioning] Remove unused checkout method in go-git (#102460)

* [Provisioning] Limit max number of repositories to 10 (#102542)

* Limit to maximum 10 repositories in backend

* Change messaging

* Do not display connect button if more than 10

* Only fetch settings once

* watch repos

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* [Provisioning] Limit size and time to git clone and push in go-git (#102458)

* Limit git clone and push time and size

* Fix linting

* Use transport instead to limit

* Remove not supported

* Add TODO to make timeout configurable

* chore: make update-workspace

* Provisioning: Implement a new job queue (#102446)

* feat: implement a new job queue

Outstanding problems:

  * Status isn't saved.
  * Progress updates don't work (due to status not being saved probably?).

* feat: properly save status

* chore: document label

* chore: assumptions do hold

* fix: support multi-tenant job drivers

* fix: use namespace=*

* fix: set resource back to pointer when updating job progress

If we don't do this, we start rejecting job progress updates as the version falls out of sync.

* feat: make job APIs read-only

* fix: complete job when worker returns

* fix: set namespace on requests from controller

* test: check historic jobs

* chore: regen apis

* feat: start augmenting frontend

* feat: add jobs to authorizer

* feat: use watch from input

* fix: make frontend subscribe to historic jobs

* fix: lint

* chore: yarn prettier:write

* fix: frontend lints

* test: allow for empty state in historicjobs

* test: set content type for export request

* fix: always set job name on insert

* fix: import

* fix: use dashes not colons

* fix: job status should expect a historic job transition

* fix: allow PR jobs from multiple PRs

* feat: same name for sync and migrate jobs

* feat: generate a job name in the store

* refactor: rename to persistentStore

* feat: remove status subresources on jobs

* feat: join jobs into one card

* chore: regen openapi snapshot

---------

Co-authored-by: Roberto Jimenez Sanchez <roberto.jimenez@grafana.com>

* Provisioning: Use a complete storage for jobs (#102605)

* feat: add a complete strategy to apiserver

* feat: use the complete storage strategy for jobs

* test: behaviour changed in main

* [Provisioning] Consolidate file path handling (#102617)

* Add more cases for validation

* Call the method dir

* Clean files endpoint

* Simplify further

* Fix issues with folder

* Add Dir function

* Use walk function in folders

* Move things from ID

* Fix some tests

* Add tree

* Sync worker and changes

* Add more TODOs

* Add normal join

* Remove things in local

* Consolidate single Join

* Call it safe

* Add new IsPathSupported action

* Move the depth to resources

* Add more cases

* Improve trie implementation

* Add tests trie

* Fix trie tests

* Improve trie tests

* Add tests for walk

* Fix linting

* Add unit tests filepath

* Remove TODO

* Remove another TODO

* Unsupported file extension error

* Add documentation for IsPathSupported

* Filepath unit tests

* Use safepath to validate github path

* Remove TODO in wrapper

* Use trailing slash in folder internal object

* Fix changes test

* Include dot

* Add TODO to explore own type for path

* Fix frontend lint

* Fix unit tests

* Fix provisioning integration tests

---------

Co-authored-by: Roberto Jiménez Sánchez <roberto.jimenez@grafana.com>
Co-authored-by: Alex Khomenko <Clarity-89@users.noreply.github.com>
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
Co-authored-by: Clarity-89 <homes89@ukr.net>
Co-authored-by: Andrej Ocenas <mr.ocenas@gmail.com>
Co-authored-by: Kevin Minehart <kmineh0151@gmail.com>
2025-03-25 08:59:03 +01:00

1483 lines
53 KiB
Go

package live
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/centrifugal/centrifuge"
"github.com/go-redis/redis/v8"
"github.com/gobwas/glob"
jsoniter "github.com/json-iterator/go"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/apimachinery/errutil"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/middleware/requestmeta"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/apiserver"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/live/database"
"github.com/grafana/grafana/pkg/services/live/features"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/liveplugin"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/live/pipeline"
"github.com/grafana/grafana/pkg/services/live/pushws"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/grafana/grafana/pkg/services/live/survey"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/services/query"
"github.com/grafana/grafana/pkg/services/secrets"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
"github.com/grafana/grafana/pkg/web"
)
var (
logger = log.New("live")
loggerCF = log.New("live.centrifuge")
)
// CoreGrafanaScope list of core features
type CoreGrafanaScope struct {
Features map[string]model.ChannelHandlerFactory
// The generic service to advertise dashboard changes
Dashboards DashboardActivityChannel
}
func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, routeRegister routing.RouteRegister,
pluginStore pluginstore.Store, pluginClient plugins.Client, cacheService *localcache.CacheService,
dataSourceCache datasources.CacheService, sqlStore db.DB, secretsService secrets.Service,
usageStatsService usagestats.Service, queryDataService query.Service, toggles featuremgmt.FeatureToggles,
accessControl accesscontrol.AccessControl, dashboardService dashboards.DashboardService, annotationsRepo annotations.Repository,
orgService org.Service, configProvider apiserver.RestConfigProvider) (*GrafanaLive, error) {
g := &GrafanaLive{
Cfg: cfg,
Features: toggles,
PluginContextProvider: plugCtxProvider,
RouteRegister: routeRegister,
pluginStore: pluginStore,
pluginClient: pluginClient,
CacheService: cacheService,
DataSourceCache: dataSourceCache,
SQLStore: sqlStore,
SecretsService: secretsService,
queryDataService: queryDataService,
channels: make(map[string]model.ChannelHandler),
GrafanaScope: CoreGrafanaScope{
Features: make(map[string]model.ChannelHandlerFactory),
},
usageStatsService: usageStatsService,
orgService: orgService,
keyPrefix: "gf_live",
}
if cfg.LiveHAPrefix != "" {
g.keyPrefix = cfg.LiveHAPrefix + ".gf_live"
}
logger.Debug("GrafanaLive initialization", "ha", g.IsHA())
// Node is the core object in Centrifuge library responsible for many useful
// things. For example Node allows to publish messages to channels from server
// side with its Publish method.
node, err := centrifuge.New(centrifuge.Config{
LogHandler: handleLog,
LogLevel: centrifuge.LogLevelError,
Metrics: centrifuge.MetricsConfig{
MetricsNamespace: "grafana_live",
},
ClientQueueMaxSize: 4194304, // 4MB
// Use reasonably large expiration interval for stream meta key,
// much bigger than maximum HistoryLifetime value in Node config.
// This way stream meta data will expire, in some cases you may want
// to prevent its expiration setting this to zero value.
HistoryMetaTTL: 7 * 24 * time.Hour,
})
if err != nil {
return nil, err
}
g.node = node
redisHealthy := false
if g.IsHA() {
// Configure HA with Redis. In this case Centrifuge nodes
// will be connected over Redis PUB/SUB. Presence will work
// globally since kept inside Redis.
err := setupRedisLiveEngine(g, node)
if err != nil {
logger.Error("failed to setup redis live engine: %v", err)
} else {
redisHealthy = true
}
}
channelLocalPublisher := liveplugin.NewChannelLocalPublisher(node, nil)
var managedStreamRunner *managedstream.Runner
var redisClient *redis.Client
if g.IsHA() && redisHealthy {
redisClient = redis.NewClient(&redis.Options{
Addr: g.Cfg.LiveHAEngineAddress,
Password: g.Cfg.LiveHAEnginePassword,
})
cmd := redisClient.Ping(context.Background())
if _, err := cmd.Result(); err != nil {
logger.Error("live engine failed to ping redis, proceeding without live ha, error: %v", err)
redisClient = nil
}
}
if redisClient != nil {
managedStreamRunner = managedstream.NewRunner(
g.Publish,
channelLocalPublisher,
managedstream.NewRedisFrameCache(redisClient, g.keyPrefix),
)
} else {
managedStreamRunner = managedstream.NewRunner(
g.Publish,
channelLocalPublisher,
managedstream.NewMemoryFrameCache(),
)
}
g.ManagedStreamRunner = managedStreamRunner
g.contextGetter = liveplugin.NewContextGetter(g.PluginContextProvider, g.DataSourceCache)
pipelinedChannelLocalPublisher := liveplugin.NewChannelLocalPublisher(node, g.Pipeline)
numLocalSubscribersGetter := liveplugin.NewNumLocalSubscribersGetter(node)
g.runStreamManager = runstream.NewManager(pipelinedChannelLocalPublisher, numLocalSubscribersGetter, g.contextGetter)
// Initialize the main features
dash := &features.DashboardHandler{
Publisher: g.Publish,
ClientCount: g.ClientCount,
Store: sqlStore,
DashboardService: dashboardService,
AccessControl: accessControl,
}
g.storage = database.NewStorage(g.SQLStore, g.CacheService)
g.GrafanaScope.Dashboards = dash
g.GrafanaScope.Features["dashboard"] = dash
g.GrafanaScope.Features["broadcast"] = features.NewBroadcastRunner(g.storage)
// Testing watch with just the provisioning support -- this will be removed when it is well validated
if toggles.IsEnabledGlobally(featuremgmt.FlagProvisioning) {
g.GrafanaScope.Features["watch"] = features.NewWatchRunner(g.Publish, configProvider)
}
g.surveyCaller = survey.NewCaller(managedStreamRunner, node)
err = g.surveyCaller.SetupHandlers()
if err != nil {
return nil, err
}
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
// different goroutines (belonging to different client connections). This is also
// true for other event handlers.
node.OnConnect(func(client *centrifuge.Client) {
numConnections := g.node.Hub().NumClients()
if g.Cfg.LiveMaxConnections >= 0 && numConnections > g.Cfg.LiveMaxConnections {
logger.Warn(
"Max number of Live connections reached, increase max_connections in [live] configuration section",
"user", client.UserID(), "client", client.ID(), "limit", g.Cfg.LiveMaxConnections,
)
client.Disconnect(centrifuge.DisconnectConnectionLimit)
return
}
var semaphore chan struct{}
if clientConcurrency > 1 {
semaphore = make(chan struct{}, clientConcurrency)
}
logger.Debug("Client connected", "user", client.UserID(), "client", client.ID())
connectedAt := time.Now()
// Called when client issues RPC (async request over Live connection).
client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() {
cb(g.handleOnRPC(client, e))
})
if err != nil {
cb(centrifuge.RPCReply{}, err)
}
})
// Called when client subscribes to the channel.
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() {
cb(g.handleOnSubscribe(context.Background(), client, e))
})
if err != nil {
cb(centrifuge.SubscribeReply{}, err)
}
})
// Called when a client publishes to the channel.
// In general, we should prefer writing to the HTTP API, but this
// allows some simple prototypes to work quickly.
client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() {
cb(g.handleOnPublish(context.Background(), client, e))
})
if err != nil {
cb(centrifuge.PublishReply{}, err)
}
})
client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
reason := e.Disconnect.Reason
if e.Disconnect.Code == 3001 { // Shutdown
return
}
logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "reason", reason, "elapsed", time.Since(connectedAt))
})
})
// Run node. This method does not block.
if err := node.Run(); err != nil {
return nil, err
}
appURL, err := url.Parse(g.Cfg.AppURL)
if err != nil {
return nil, fmt.Errorf("error parsing AppURL %s: %w", g.Cfg.AppURL, err)
}
originPatterns := g.Cfg.LiveAllowedOrigins
originGlobs, _ := setting.GetAllowedOriginGlobs(originPatterns) // error already checked on config load.
checkOrigin := getCheckOriginFunc(appURL, originPatterns, originGlobs)
wsCfg := centrifuge.WebsocketConfig{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
MessageSizeLimit: cfg.LiveMessageSizeLimit,
}
// Use a pure websocket transport.
wsHandler := centrifuge.NewWebsocketHandler(node, wsCfg)
pushWSHandler := pushws.NewHandler(g.ManagedStreamRunner, pushws.Config{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
})
pushPipelineWSHandler := pushws.NewPipelinePushHandler(g.Pipeline, pushws.Config{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
})
g.websocketHandler = func(ctx *contextmodel.ReqContext) {
user := ctx.SignedInUser
id, _ := user.GetInternalID()
// Centrifuge expects Credentials in context with a current user ID.
cred := &centrifuge.Credentials{
UserID: strconv.FormatInt(id, 10),
}
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
newCtx = livecontext.SetContextSignedUser(newCtx, user)
r := ctx.Req.WithContext(newCtx)
wsHandler.ServeHTTP(ctx.Resp, r)
}
g.pushWebsocketHandler = func(ctx *contextmodel.ReqContext) {
user := ctx.SignedInUser
newCtx := livecontext.SetContextSignedUser(ctx.Req.Context(), user)
newCtx = livecontext.SetContextStreamID(newCtx, web.Params(ctx.Req)[":streamId"])
r := ctx.Req.WithContext(newCtx)
pushWSHandler.ServeHTTP(ctx.Resp, r)
}
g.pushPipelineWebsocketHandler = func(ctx *contextmodel.ReqContext) {
user := ctx.SignedInUser
newCtx := livecontext.SetContextSignedUser(ctx.Req.Context(), user)
newCtx = livecontext.SetContextChannelID(newCtx, web.Params(ctx.Req)["*"])
r := ctx.Req.WithContext(newCtx)
pushPipelineWSHandler.ServeHTTP(ctx.Resp, r)
}
g.RouteRegister.Group("/api/live", func(group routing.RouteRegister) {
group.Get("/ws", g.websocketHandler)
}, middleware.ReqSignedIn, requestmeta.SetSLOGroup(requestmeta.SLOGroupNone))
g.RouteRegister.Group("/api/live", func(group routing.RouteRegister) {
group.Get("/push/:streamId", g.pushWebsocketHandler)
group.Get("/pipeline/push/*", g.pushPipelineWebsocketHandler)
}, middleware.ReqOrgAdmin, requestmeta.SetSLOGroup(requestmeta.SLOGroupNone))
g.registerUsageMetrics()
return g, nil
}
func setupRedisLiveEngine(g *GrafanaLive, node *centrifuge.Node) error {
redisAddress := g.Cfg.LiveHAEngineAddress
redisPassword := g.Cfg.LiveHAEnginePassword
redisShardConfigs := []centrifuge.RedisShardConfig{
{Address: redisAddress, Password: redisPassword},
}
redisShards := make([]*centrifuge.RedisShard, 0, len(redisShardConfigs))
for _, redisConf := range redisShardConfigs {
redisShard, err := centrifuge.NewRedisShard(node, redisConf)
if err != nil {
return fmt.Errorf("error connecting to Live Redis: %v", err)
}
redisShards = append(redisShards, redisShard)
}
broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
Prefix: g.keyPrefix,
Shards: redisShards,
})
if err != nil {
return fmt.Errorf("error creating Live Redis broker: %v", err)
}
node.SetBroker(broker)
presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{
Prefix: g.keyPrefix,
Shards: redisShards,
})
if err != nil {
return fmt.Errorf("error creating Live Redis presence manager: %v", err)
}
node.SetPresenceManager(presenceManager)
return nil
}
// GrafanaLive manages live real-time connections to Grafana (over WebSocket at this moment).
// The main concept here is Channel. Connections can subscribe to many channels. Each channel
// can have different permissions and properties but once a connection subscribed to a channel
// it starts receiving all messages published into this channel. Thus GrafanaLive is a PUB/SUB
// server.
type GrafanaLive struct {
PluginContextProvider *plugincontext.Provider
Cfg *setting.Cfg
Features featuremgmt.FeatureToggles
RouteRegister routing.RouteRegister
CacheService *localcache.CacheService
DataSourceCache datasources.CacheService
SQLStore db.DB
SecretsService secrets.Service
pluginStore pluginstore.Store
pluginClient plugins.Client
queryDataService query.Service
orgService org.Service
keyPrefix string
node *centrifuge.Node
surveyCaller *survey.Caller
// Websocket handlers
websocketHandler interface{}
pushWebsocketHandler interface{}
pushPipelineWebsocketHandler interface{}
// Full channel handler
channels map[string]model.ChannelHandler
channelsMu sync.RWMutex
// The core internal features
GrafanaScope CoreGrafanaScope
ManagedStreamRunner *managedstream.Runner
Pipeline *pipeline.Pipeline
pipelineStorage pipeline.Storage
contextGetter *liveplugin.ContextGetter
runStreamManager *runstream.Manager
storage *database.Storage
usageStatsService usagestats.Service
usageStats usageStats
}
// DashboardActivityChannel is a service to advertise dashboard activity
type DashboardActivityChannel interface {
// Called when a dashboard is saved -- this includes the error so we can support a
// gitops workflow that knows if the value was saved to the local database or not
// in many cases all direct save requests will fail, but the request should be forwarded
// to any gitops observers
DashboardSaved(orgID int64, requester identity.Requester, message string, dashboard *dashboards.Dashboard, err error) error
// Called when a dashboard is deleted
DashboardDeleted(orgID int64, requester identity.Requester, uid string) error
// Experimental! Indicate is GitOps is active. This really means
// someone is subscribed to the `grafana/dashboards/gitops` channel
HasGitOpsObserver(orgID int64) bool
}
func (g *GrafanaLive) getStreamPlugin(ctx context.Context, pluginID string) (backend.StreamHandler, error) {
plugin, exists := g.pluginStore.Plugin(ctx, pluginID)
if !exists {
return nil, fmt.Errorf("plugin not found: %s", pluginID)
}
if plugin.SupportsStreaming() {
return g.pluginClient, nil
}
return nil, fmt.Errorf("%s plugin does not implement StreamHandler: %#v", pluginID, plugin)
}
func (g *GrafanaLive) Run(ctx context.Context) error {
eGroup, eCtx := errgroup.WithContext(ctx)
eGroup.Go(func() error {
updateStatsTicker := time.NewTicker(time.Minute * 30)
defer updateStatsTicker.Stop()
for {
select {
case <-updateStatsTicker.C:
g.sampleLiveStats()
case <-ctx.Done():
return ctx.Err()
}
}
})
if g.runStreamManager != nil {
// Only run stream manager if GrafanaLive properly initialized.
eGroup.Go(func() error {
return g.runStreamManager.Run(eCtx)
})
}
return eGroup.Wait()
}
func getCheckOriginFunc(appURL *url.URL, originPatterns []string, originGlobs []glob.Glob) func(r *http.Request) bool {
return func(r *http.Request) bool {
origin := r.Header.Get("Origin")
if origin == "" {
return true
}
if len(originPatterns) == 1 && originPatterns[0] == "*" {
// fast path for *.
return true
}
originURL, err := url.Parse(strings.ToLower(origin))
if err != nil {
logger.Warn("Failed to parse request origin", "error", err, "origin", origin)
return false
}
if strings.EqualFold(originURL.Host, r.Host) {
return true
}
ok, err := checkAllowedOrigin(origin, originURL, appURL, originGlobs)
if err != nil {
logger.Warn("Error parsing request origin", "error", err, "origin", origin)
return false
}
if !ok {
logger.Warn("Request Origin is not authorized", "origin", origin, "host", r.Host, "appUrl", appURL.String(), "allowedOrigins", strings.Join(originPatterns, ","))
return false
}
return true
}
}
func checkAllowedOrigin(origin string, originURL *url.URL, appURL *url.URL, originGlobs []glob.Glob) (bool, error) {
// Try to match over configured [server] root_url first.
if originURL.Port() == "" {
if strings.EqualFold(originURL.Scheme, appURL.Scheme) && strings.EqualFold(originURL.Host, appURL.Hostname()) {
return true, nil
}
} else {
if strings.EqualFold(originURL.Scheme, appURL.Scheme) && strings.EqualFold(originURL.Host, appURL.Host) {
return true, nil
}
}
// If there is still no match try [live] allowed_origins patterns.
for _, pattern := range originGlobs {
if pattern.Match(origin) {
return true, nil
}
}
return false, nil
}
var clientConcurrency = 12
func (g *GrafanaLive) IsHA() bool {
return g.Cfg != nil && g.Cfg.LiveHAEngine != ""
}
func runConcurrentlyIfNeeded(ctx context.Context, semaphore chan struct{}, fn func()) error {
if cap(semaphore) > 1 {
select {
case semaphore <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
go func() {
defer func() { <-semaphore }()
fn()
}()
} else {
// No need in separate goroutines.
fn()
}
return nil
}
func (g *GrafanaLive) HandleDatasourceDelete(orgID int64, dsUID string) {
if g.runStreamManager == nil {
return
}
err := g.runStreamManager.HandleDatasourceDelete(orgID, dsUID)
if err != nil {
logger.Error("Error handling datasource delete", "error", err)
}
}
func (g *GrafanaLive) HandleDatasourceUpdate(orgID int64, dsUID string) {
if g.runStreamManager == nil {
return
}
err := g.runStreamManager.HandleDatasourceUpdate(orgID, dsUID)
if err != nil {
logger.Error("Error handling datasource update", "error", err)
}
}
// Use a configuration that's compatible with the standard library
// to minimize the risk of introducing bugs. This will make sure
// that map keys is ordered.
var jsonStd = jsoniter.ConfigCompatibleWithStandardLibrary
func (g *GrafanaLive) handleOnRPC(client *centrifuge.Client, e centrifuge.RPCEvent) (centrifuge.RPCReply, error) {
logger.Debug("Client calls RPC", "user", client.UserID(), "client", client.ID(), "method", e.Method)
if e.Method != "grafana.query" {
return centrifuge.RPCReply{}, centrifuge.ErrorMethodNotFound
}
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "method", e.Method)
return centrifuge.RPCReply{}, centrifuge.ErrorInternal
}
var req dtos.MetricRequest
err := json.Unmarshal(e.Data, &req)
if err != nil {
return centrifuge.RPCReply{}, centrifuge.ErrorBadRequest
}
resp, err := g.queryDataService.QueryData(client.Context(), user, false, req)
if err != nil {
logger.Error("Error query data", "user", client.UserID(), "client", client.ID(), "method", e.Method, "error", err)
if errors.Is(err, datasources.ErrDataSourceAccessDenied) {
return centrifuge.RPCReply{}, &centrifuge.Error{Code: uint32(http.StatusForbidden), Message: http.StatusText(http.StatusForbidden)}
}
var gfErr errutil.Error
if errors.As(err, &gfErr) && gfErr.Reason.Status() == errutil.StatusBadRequest {
return centrifuge.RPCReply{}, &centrifuge.Error{Code: uint32(http.StatusBadRequest), Message: http.StatusText(http.StatusBadRequest)}
}
return centrifuge.RPCReply{}, centrifuge.ErrorInternal
}
data, err := jsonStd.Marshal(resp)
if err != nil {
logger.Error("Error marshaling query response", "user", client.UserID(), "client", client.ID(), "method", e.Method, "error", err)
return centrifuge.RPCReply{}, centrifuge.ErrorInternal
}
return centrifuge.RPCReply{
Data: data,
}, nil
}
func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
// See a detailed comment for StripOrgID about orgID management in Live.
orgID, channel, err := orgchannel.StripOrgID(e.Channel)
if err != nil {
logger.Error("Error parsing channel", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if user.GetOrgID() != orgID {
logger.Info("Error subscribing: wrong orgId", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied
}
var reply model.SubscribeReply
var status backend.SubscribeStreamStatus
var ruleFound bool
if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.GetOrgID(), channel)
if err != nil {
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
ruleFound = ok
if ok {
if rule.SubscribeAuth != nil {
ok, err := rule.SubscribeAuth.CanSubscribe(client.Context(), user)
if err != nil {
logger.Error("Error checking subscribe permissions", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if !ok {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(backend.SubscribeStreamStatusPermissionDenied)
return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
}
if len(rule.Subscribers) > 0 {
var err error
for _, sub := range rule.Subscribers {
reply, status, err = sub.Subscribe(client.Context(), pipeline.Vars{
OrgID: orgID,
Channel: channel,
}, e.Data)
if err != nil {
logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if status != backend.SubscribeStreamStatusOK {
break
}
}
}
}
}
if !ruleFound {
handler, addr, err := g.GetChannelHandler(ctx, user, channel)
if err != nil {
if errors.Is(err, live.ErrInvalidChannelID) {
logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(http.StatusBadRequest), Message: "invalid channel ID"}
}
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
reply, status, err = handler.OnSubscribe(client.Context(), user, model.SubscribeEvent{
Channel: channel,
Path: addr.Path,
Data: e.Data,
})
if err != nil {
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
}
if status != backend.SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(status)
logger.Debug("Return custom subscribe error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EmitPresence: reply.Presence,
EmitJoinLeave: reply.JoinLeave,
PushJoinLeave: reply.JoinLeave,
EnableRecovery: reply.Recover,
Data: reply.Data,
},
}, nil
}
func (g *GrafanaLive) handleOnPublish(ctx context.Context, client *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
// See a detailed comment for StripOrgID about orgID management in Live.
orgID, channel, err := orgchannel.StripOrgID(e.Channel)
if err != nil {
logger.Error("Error parsing channel", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if user.GetOrgID() != orgID {
logger.Info("Error subscribing: wrong orgId", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied
}
if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.GetOrgID(), channel)
if err != nil {
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if ok {
if rule.PublishAuth != nil {
ok, err := rule.PublishAuth.CanPublish(client.Context(), user)
if err != nil {
logger.Error("Error checking publish permissions", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if !ok {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(backend.PublishStreamStatusPermissionDenied)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
} else {
if !user.HasRole(org.RoleAdmin) {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(backend.PublishStreamStatusPermissionDenied)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
}
_, err := g.Pipeline.ProcessInput(client.Context(), user.GetOrgID(), channel, e.Data)
if err != nil {
logger.Error("Error processing input", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
return centrifuge.PublishReply{
Result: &centrifuge.PublishResult{},
}, nil
}
}
handler, addr, err := g.GetChannelHandler(ctx, user, channel)
if err != nil {
if errors.Is(err, live.ErrInvalidChannelID) {
logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(http.StatusBadRequest), Message: "invalid channel ID"}
}
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
reply, status, err := handler.OnPublish(client.Context(), user, model.PublishEvent{
Channel: channel,
Path: addr.Path,
Data: e.Data,
})
if err != nil {
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if status != backend.PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(status)
logger.Debug("Return custom publish error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
centrifugeReply := centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: reply.HistorySize,
HistoryTTL: reply.HistoryTTL,
},
}
if reply.Data != nil {
// If data is not nil then we published it manually and tell Centrifuge
// publication result so Centrifuge won't publish itself.
result, err := g.node.Publish(e.Channel, reply.Data)
if err != nil {
logger.Error("Error publishing", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err, "data", string(reply.Data))
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
centrifugeReply.Result = &result
}
logger.Debug("Publication successful", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifugeReply, nil
}
func subscribeStatusToHTTPError(status backend.SubscribeStreamStatus) (int, string) {
switch status {
case backend.SubscribeStreamStatusNotFound:
return http.StatusNotFound, http.StatusText(http.StatusNotFound)
case backend.SubscribeStreamStatusPermissionDenied:
return http.StatusForbidden, http.StatusText(http.StatusForbidden)
default:
logger.Warn("Unknown subscribe status", "status", status)
return http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)
}
}
func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string) {
switch status {
case backend.PublishStreamStatusNotFound:
return http.StatusNotFound, http.StatusText(http.StatusNotFound)
case backend.PublishStreamStatusPermissionDenied:
return http.StatusForbidden, http.StatusText(http.StatusForbidden)
default:
logger.Warn("Unknown publish status", "status", status)
return http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)
}
}
// GetChannelHandler gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandler(ctx context.Context, user identity.Requester, channel string) (model.ChannelHandler, live.Channel, error) {
// Parse the identifier ${scope}/${namespace}/${path}
addr, err := live.ParseChannel(channel)
if err != nil {
return nil, live.Channel{}, err
}
g.channelsMu.RLock()
c, ok := g.channels[channel]
g.channelsMu.RUnlock() // defer? but then you can't lock further down
if ok {
logger.Debug("Found cached channel handler", "channel", channel)
return c, addr, nil
}
g.channelsMu.Lock()
defer g.channelsMu.Unlock()
c, ok = g.channels[channel] // may have filled in while locked
if ok {
logger.Debug("Found cached channel handler", "channel", channel)
return c, addr, nil
}
getter, err := g.GetChannelHandlerFactory(ctx, user, addr.Scope, addr.Namespace)
if err != nil {
return nil, addr, fmt.Errorf("error getting channel handler factory: %w", err)
}
// First access will initialize.
c, err = getter.GetHandlerForPath(addr.Path)
if err != nil {
return nil, addr, fmt.Errorf("error getting handler for path: %w", err)
}
logger.Info("Initialized channel handler", "channel", channel, "address", addr)
g.channels[channel] = c
return c, addr, nil
}
// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace.
// It gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, user identity.Requester, scope string, namespace string) (model.ChannelHandlerFactory, error) {
switch scope {
case live.ScopeGrafana:
return g.handleGrafanaScope(user, namespace)
case live.ScopeWatch:
return g.handleWatchScope()
case live.ScopePlugin:
return g.handlePluginScope(ctx, user, namespace)
case live.ScopeDatasource:
return g.handleDatasourceScope(ctx, user, namespace)
case live.ScopeStream:
return g.handleStreamScope(user, namespace)
default:
return nil, fmt.Errorf("invalid scope: %q", scope)
}
}
func (g *GrafanaLive) handleGrafanaScope(_ identity.Requester, namespace string) (model.ChannelHandlerFactory, error) {
if p, ok := g.GrafanaScope.Features[namespace]; ok {
return p, nil
}
return nil, fmt.Errorf("unknown feature: %q", namespace)
}
func (g *GrafanaLive) handleWatchScope() (model.ChannelHandlerFactory, error) {
if p, ok := g.GrafanaScope.Features["watch"]; ok {
return p, nil
}
return nil, fmt.Errorf("watch not registered")
}
func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ identity.Requester, namespace string) (model.ChannelHandlerFactory, error) {
streamHandler, err := g.getStreamPlugin(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("can't find stream plugin: %s", namespace)
}
return features.NewPluginRunner(
namespace,
"", // No instance uid for non-datasource plugins.
g.runStreamManager,
g.contextGetter,
streamHandler,
), nil
}
func (g *GrafanaLive) handleStreamScope(u identity.Requester, namespace string) (model.ChannelHandlerFactory, error) {
return g.ManagedStreamRunner.GetOrCreateStream(u.GetOrgID(), live.ScopeStream, namespace)
}
func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, user identity.Requester, namespace string) (model.ChannelHandlerFactory, error) {
ds, err := g.DataSourceCache.GetDatasourceByUID(ctx, namespace, user, false)
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
}
streamHandler, err := g.getStreamPlugin(ctx, ds.Type)
if err != nil {
return nil, fmt.Errorf("can't find stream plugin: %s", ds.Type)
}
return features.NewPluginRunner(
ds.Type,
ds.UID,
g.runStreamManager,
g.contextGetter,
streamHandler,
), nil
}
// Publish sends the data to the channel without checking permissions etc.
func (g *GrafanaLive) Publish(orgID int64, channel string, data []byte) error {
_, err := g.node.Publish(orgchannel.PrependOrgID(orgID, channel), data)
return err
}
// ClientCount returns the number of clients.
func (g *GrafanaLive) ClientCount(orgID int64, channel string) (int, error) {
p, err := g.node.Presence(orgchannel.PrependOrgID(orgID, channel))
if err != nil {
return 0, err
}
return len(p.Presence), nil
}
func (g *GrafanaLive) HandleHTTPPublish(ctx *contextmodel.ReqContext) response.Response {
cmd := dtos.LivePublishCmd{}
if err := web.Bind(ctx.Req, &cmd); err != nil {
return response.Error(http.StatusBadRequest, "bad request data", err)
}
addr, err := live.ParseChannel(cmd.Channel)
if err != nil {
return response.Error(http.StatusBadRequest, "invalid channel ID", nil)
}
logger.Debug("Publish API cmd", "identity", ctx.SignedInUser.GetID(), "channel", cmd.Channel)
user := ctx.SignedInUser
channel := cmd.Channel
if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.GetOrgID(), channel)
if err != nil {
logger.Error("Error getting channel rule", "user", user, "channel", channel, "error", err)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if ok {
if rule.PublishAuth != nil {
ok, err := rule.PublishAuth.CanPublish(ctx.Req.Context(), user)
if err != nil {
logger.Error("Error checking publish permissions", "user", user, "channel", channel, "error", err)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if !ok {
return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil)
}
} else {
if !user.HasRole(org.RoleAdmin) {
return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil)
}
}
_, err := g.Pipeline.ProcessInput(ctx.Req.Context(), user.GetOrgID(), channel, cmd.Data)
if err != nil {
logger.Error("Error processing input", "user", user, "channel", channel, "error", err)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
return response.JSON(http.StatusOK, dtos.LivePublishResponse{})
}
}
channelHandler, addr, err := g.GetChannelHandler(ctx.Req.Context(), ctx.SignedInUser, cmd.Channel)
if err != nil {
logger.Error("Error getting channels handler", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
reply, status, err := channelHandler.OnPublish(ctx.Req.Context(), ctx.SignedInUser, model.PublishEvent{Channel: cmd.Channel, Path: addr.Path, Data: cmd.Data})
if err != nil {
logger.Error("Error calling OnPublish", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if status != backend.PublishStreamStatusOK {
code, text := publishStatusToHTTPError(status)
return response.Error(code, text, nil)
}
if reply.Data != nil {
err = g.Publish(ctx.SignedInUser.GetOrgID(), cmd.Channel, cmd.Data)
if err != nil {
logger.Error("Error publish to channel", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
}
logger.Debug("Publication successful", "identity", ctx.SignedInUser.GetID(), "channel", cmd.Channel)
return response.JSON(http.StatusOK, dtos.LivePublishResponse{})
}
type streamChannelListResponse struct {
Channels []*managedstream.ManagedChannel `json:"channels"`
}
// HandleListHTTP returns metadata so the UI can build a nice form
func (g *GrafanaLive) HandleListHTTP(c *contextmodel.ReqContext) response.Response {
var channels []*managedstream.ManagedChannel
var err error
if g.IsHA() {
channels, err = g.surveyCaller.CallManagedStreams(c.SignedInUser.GetOrgID())
} else {
channels, err = g.ManagedStreamRunner.GetManagedChannels(c.SignedInUser.GetOrgID())
}
if err != nil {
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err)
}
info := streamChannelListResponse{
Channels: channels,
}
return response.JSONStreaming(http.StatusOK, info)
}
// HandleInfoHTTP special http response for
func (g *GrafanaLive) HandleInfoHTTP(ctx *contextmodel.ReqContext) response.Response {
path := web.Params(ctx.Req)["*"]
if path == "grafana/dashboards/gitops" {
return response.JSON(http.StatusOK, util.DynMap{
"active": g.GrafanaScope.Dashboards.HasGitOpsObserver(ctx.SignedInUser.GetOrgID()),
})
}
return response.JSONStreaming(http.StatusNotFound, util.DynMap{
"message": "Info is not supported for this channel",
})
}
// HandleChannelRulesListHTTP ...
func (g *GrafanaLive) HandleChannelRulesListHTTP(c *contextmodel.ReqContext) response.Response {
result, err := g.pipelineStorage.ListChannelRules(c.Req.Context(), c.SignedInUser.GetOrgID())
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to get channel rules", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"rules": result,
})
}
type ConvertDryRunRequest struct {
ChannelRules []pipeline.ChannelRule `json:"channelRules"`
Channel string `json:"channel"`
Data string `json:"data"`
}
type ConvertDryRunResponse struct {
ChannelFrames []*pipeline.ChannelFrame `json:"channelFrames"`
}
type DryRunRuleStorage struct {
ChannelRules []pipeline.ChannelRule
}
func (s *DryRunRuleStorage) GetWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigGetCmd) (pipeline.WriteConfig, bool, error) {
return pipeline.WriteConfig{}, false, errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) CreateWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigCreateCmd) (pipeline.WriteConfig, error) {
return pipeline.WriteConfig{}, errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) UpdateWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigUpdateCmd) (pipeline.WriteConfig, error) {
return pipeline.WriteConfig{}, errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) DeleteWriteConfig(_ context.Context, _ int64, _ pipeline.WriteConfigDeleteCmd) error {
return errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) CreateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRuleCreateCmd) (pipeline.ChannelRule, error) {
return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) UpdateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRuleUpdateCmd) (pipeline.ChannelRule, error) {
return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) DeleteChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRuleDeleteCmd) error {
return errors.New("not implemented by dry run rule storage")
}
func (s *DryRunRuleStorage) ListWriteConfigs(_ context.Context, _ int64) ([]pipeline.WriteConfig, error) {
return nil, nil
}
func (s *DryRunRuleStorage) ListChannelRules(_ context.Context, _ int64) ([]pipeline.ChannelRule, error) {
return s.ChannelRules, nil
}
// HandlePipelineConvertTestHTTP ...
func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var req ConvertDryRunRequest
err = json.Unmarshal(body, &req)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding request", err)
}
storage := &DryRunRuleStorage{
ChannelRules: req.ChannelRules,
}
builder := &pipeline.StorageRuleBuilder{
Node: g.node,
ManagedStream: g.ManagedStreamRunner,
FrameStorage: pipeline.NewFrameStorage(),
Storage: storage,
ChannelHandlerGetter: g,
}
channelRuleGetter := pipeline.NewCacheSegmentedTree(builder)
pipe, err := pipeline.New(channelRuleGetter)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error creating pipeline", err)
}
rule, ok, err := channelRuleGetter.Get(c.SignedInUser.GetOrgID(), req.Channel)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error getting channel rule", err)
}
if !ok {
return response.Error(http.StatusNotFound, "No rule found", nil)
}
if rule.Converter == nil {
return response.Error(http.StatusNotFound, "No converter found", nil)
}
channelFrames, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.SignedInUser.GetOrgID(), req.Channel, []byte(req.Data))
if err != nil {
return response.Error(http.StatusInternalServerError, "Error converting data", err)
}
return response.JSON(http.StatusOK, ConvertDryRunResponse{
ChannelFrames: channelFrames,
})
}
// HandleChannelRulesPostHTTP ...
func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var cmd pipeline.ChannelRuleCreateCmd
err = json.Unmarshal(body, &cmd)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
}
rule, err := g.pipelineStorage.CreateChannelRule(c.Req.Context(), c.SignedInUser.GetOrgID(), cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to create channel rule", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"rule": rule,
})
}
// HandleChannelRulesPutHTTP ...
func (g *GrafanaLive) HandleChannelRulesPutHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var cmd pipeline.ChannelRuleUpdateCmd
err = json.Unmarshal(body, &cmd)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
}
if cmd.Pattern == "" {
return response.Error(http.StatusBadRequest, "Rule pattern required", nil)
}
rule, err := g.pipelineStorage.UpdateChannelRule(c.Req.Context(), c.SignedInUser.GetOrgID(), cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to update channel rule", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"rule": rule,
})
}
// HandleChannelRulesDeleteHTTP ...
func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var cmd pipeline.ChannelRuleDeleteCmd
err = json.Unmarshal(body, &cmd)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding channel rule", err)
}
if cmd.Pattern == "" {
return response.Error(http.StatusBadRequest, "Rule pattern required", nil)
}
err = g.pipelineStorage.DeleteChannelRule(c.Req.Context(), c.SignedInUser.GetOrgID(), cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to delete channel rule", err)
}
return response.JSON(http.StatusOK, util.DynMap{})
}
// HandlePipelineEntitiesListHTTP ...
func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *contextmodel.ReqContext) response.Response {
return response.JSON(http.StatusOK, util.DynMap{
"subscribers": pipeline.SubscribersRegistry,
"dataOutputs": pipeline.DataOutputsRegistry,
"converters": pipeline.ConvertersRegistry,
"frameProcessors": pipeline.FrameProcessorsRegistry,
"frameOutputs": pipeline.FrameOutputsRegistry,
})
}
// HandleWriteConfigsListHTTP ...
func (g *GrafanaLive) HandleWriteConfigsListHTTP(c *contextmodel.ReqContext) response.Response {
backends, err := g.pipelineStorage.ListWriteConfigs(c.Req.Context(), c.SignedInUser.GetOrgID())
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to get write configs", err)
}
result := make([]pipeline.WriteConfigDto, 0, len(backends))
for _, b := range backends {
result = append(result, pipeline.WriteConfigToDto(b))
}
return response.JSON(http.StatusOK, util.DynMap{
"writeConfigs": result,
})
}
// HandleWriteConfigsPostHTTP ...
func (g *GrafanaLive) HandleWriteConfigsPostHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var cmd pipeline.WriteConfigCreateCmd
err = json.Unmarshal(body, &cmd)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding write config create command", err)
}
result, err := g.pipelineStorage.CreateWriteConfig(c.Req.Context(), c.SignedInUser.GetOrgID(), cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to create write config", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"writeConfig": pipeline.WriteConfigToDto(result),
})
}
// HandleWriteConfigsPutHTTP ...
func (g *GrafanaLive) HandleWriteConfigsPutHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var cmd pipeline.WriteConfigUpdateCmd
err = json.Unmarshal(body, &cmd)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding write config update command", err)
}
if cmd.UID == "" {
return response.Error(http.StatusBadRequest, "UID required", nil)
}
existingBackend, ok, err := g.pipelineStorage.GetWriteConfig(c.Req.Context(), c.SignedInUser.GetOrgID(), pipeline.WriteConfigGetCmd{
UID: cmd.UID,
})
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to get write config", err)
}
if ok {
if cmd.SecureSettings == nil {
cmd.SecureSettings = map[string]string{}
}
secureJSONData, err := g.SecretsService.DecryptJsonData(c.Req.Context(), existingBackend.SecureSettings)
if err != nil {
logger.Error("Error decrypting secure settings", "error", err)
return response.Error(http.StatusInternalServerError, "Error decrypting secure settings", err)
}
for k, v := range secureJSONData {
if _, ok := cmd.SecureSettings[k]; !ok {
cmd.SecureSettings[k] = v
}
}
}
result, err := g.pipelineStorage.UpdateWriteConfig(c.Req.Context(), c.SignedInUser.GetOrgID(), cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to update write config", err)
}
return response.JSON(http.StatusOK, util.DynMap{
"writeConfig": pipeline.WriteConfigToDto(result),
})
}
// HandleWriteConfigsDeleteHTTP ...
func (g *GrafanaLive) HandleWriteConfigsDeleteHTTP(c *contextmodel.ReqContext) response.Response {
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var cmd pipeline.WriteConfigDeleteCmd
err = json.Unmarshal(body, &cmd)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding write config delete command", err)
}
if cmd.UID == "" {
return response.Error(http.StatusBadRequest, "UID required", nil)
}
err = g.pipelineStorage.DeleteWriteConfig(c.Req.Context(), c.SignedInUser.GetOrgID(), cmd)
if err != nil {
return response.Error(http.StatusInternalServerError, "Failed to delete write config", err)
}
return response.JSON(http.StatusOK, util.DynMap{})
}
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)
for k, v := range msg.Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append(arr, k, v)
}
switch msg.Level {
case centrifuge.LogLevelDebug:
loggerCF.Debug(msg.Message, arr...)
case centrifuge.LogLevelError:
loggerCF.Error(msg.Message, arr...)
case centrifuge.LogLevelInfo:
loggerCF.Info(msg.Message, arr...)
default:
loggerCF.Debug(msg.Message, arr...)
}
}
func (g *GrafanaLive) sampleLiveStats() {
numClients := g.node.Hub().NumClients()
numUsers := g.node.Hub().NumUsers()
numChannels := g.node.Hub().NumChannels()
var numNodes int
if info, err := g.node.Info(); err == nil {
numNodes = len(info.Nodes)
}
g.usageStats.sampleCount++
g.usageStats.numClientsSum += numClients
g.usageStats.numUsersSum += numUsers
if numClients > g.usageStats.numClientsMax {
g.usageStats.numClientsMax = numClients
}
if numUsers > g.usageStats.numUsersMax {
g.usageStats.numUsersMax = numUsers
}
if numNodes > g.usageStats.numNodesMax {
g.usageStats.numNodesMax = numNodes
}
if numChannels > g.usageStats.numChannelsMax {
g.usageStats.numChannelsMax = numChannels
}
}
func (g *GrafanaLive) resetLiveStats() {
g.usageStats = usageStats{}
}
func getHistogramMetric(val int, bounds []int, metricPrefix string) string {
for _, bound := range bounds {
if val <= bound {
return metricPrefix + "le_" + strconv.Itoa(bound)
}
}
return metricPrefix + "le_inf"
}
func (g *GrafanaLive) collectLiveStats(_ context.Context) (map[string]interface{}, error) {
liveUsersAvg := 0
liveClientsAvg := 0
if g.usageStats.sampleCount > 0 {
liveUsersAvg = g.usageStats.numUsersSum / g.usageStats.sampleCount
liveClientsAvg = g.usageStats.numClientsSum / g.usageStats.sampleCount
}
var liveEnabled int
if g.Cfg.LiveMaxConnections != 0 {
liveEnabled = 1
}
var liveHAEnabled int
if g.Cfg.LiveHAEngine != "" {
liveHAEnabled = 1
}
metrics := map[string]interface{}{
"stats.live_enabled.count": liveEnabled,
"stats.live_ha_enabled.count": liveHAEnabled,
"stats.live_samples.count": g.usageStats.sampleCount,
"stats.live_users_max.count": g.usageStats.numUsersMax,
"stats.live_users_avg.count": liveUsersAvg,
"stats.live_clients_max.count": g.usageStats.numClientsMax,
"stats.live_clients_avg.count": liveClientsAvg,
"stats.live_channels_max.count": g.usageStats.numChannelsMax,
"stats.live_nodes_max.count": g.usageStats.numNodesMax,
}
metrics[getHistogramMetric(g.usageStats.numClientsMax, []int{0, 10, 100, 1000, 10000, 100000}, "stats.live_clients_")] = 1
metrics[getHistogramMetric(g.usageStats.numUsersMax, []int{0, 10, 100, 1000, 10000, 100000}, "stats.live_users_")] = 1
metrics[getHistogramMetric(g.usageStats.numChannelsMax, []int{0, 10, 100, 1000, 10000, 100000}, "stats.live_channels_")] = 1
metrics[getHistogramMetric(g.usageStats.numNodesMax, []int{1, 3, 9}, "stats.live_nodes_")] = 1
return metrics, nil
}
func (g *GrafanaLive) registerUsageMetrics() {
g.usageStatsService.RegisterSendReportCallback(g.resetLiveStats)
g.usageStatsService.RegisterMetricsFunc(g.collectLiveStats)
}
type usageStats struct {
numClientsMax int
numClientsSum int
numUsersMax int
numUsersSum int
sampleCount int
numNodesMax int
numChannelsMax int
}