diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9db89b1..d240079 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: push: branches: - - '*' + - "*" paths-ignore: - README.md pull_request: @@ -38,7 +38,7 @@ jobs: - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: "3.11" - name: Update plugin.json run: python3 scripts/update_plugin_metadata.py env: @@ -49,8 +49,8 @@ jobs: - name: Setup Node.js environment uses: actions/setup-node@v4 with: - node-version: '20' - cache: 'npm' + node-version: "20" + cache: "npm" - name: Install dependencies run: npm ci @@ -76,7 +76,7 @@ jobs: if: steps.check-for-backend.outputs.has-backend == 'true' uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: "1.21" - name: Test backend if: steps.check-for-backend.outputs.has-backend == 'true' @@ -172,21 +172,21 @@ jobs: - name: Setup Node.js environment uses: actions/setup-node@v4 with: - node-version: '20' - cache: 'npm' + node-version: "20" + cache: "npm" - name: Install dev dependencies run: npm ci - name: Start Grafana run: | - docker compose pull - DEVELOPMENT=false GRAFANA_VERSION=${{ matrix.GRAFANA_IMAGE.VERSION }} GRAFANA_IMAGE=${{ matrix.GRAFANA_IMAGE.NAME }} docker compose up -d + docker compose -f docker-compose.test.yaml pull + DEVELOPMENT=false GRAFANA_VERSION=${{ matrix.GRAFANA_IMAGE.VERSION }} GRAFANA_IMAGE=${{ matrix.GRAFANA_IMAGE.NAME }} docker compose -f docker-compose.test.yaml up -d - name: Wait for Grafana to start uses: nev7n/wait_for_response@v1 with: - url: 'http://localhost:3000/' + url: "http://localhost:3000/" responseCode: 200 timeout: 60000 interval: 500 @@ -204,7 +204,7 @@ jobs: docker logs mongodb-datasource-grafana >& grafana-server.log - name: Stop grafana docker - run: docker compose down + run: docker compose -f docker-compose.test.yaml down - name: Upload server log uses: actions/upload-artifact@v4 @@ -222,4 +222,4 @@ jobs: # with: # name: playwright-report-${{ matrix.GRAFANA_IMAGE.NAME }}-v${{ matrix.GRAFANA_IMAGE.VERSION }}-${{github.run_id}} # path: playwright-report/ - # retention-days: 5 \ No newline at end of file + # retention-days: 5 diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b45a4f..15a4a56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,32 +1,56 @@ # Changelog +## 0.3.0 - 2025-01-11 + +### Added + +- Added Live Streaming support based on [Mongo Change Streams](https://www.mongodb.com/docs/manual/changeStreams/)(https://github.com/haohanyang/mongodb-datasource/pull/31) + +### Changed + +- Removed "Query" button. Built-in Query/Refresh button is recommended instead(https://github.com/haohanyang/mongodb-datasource/pull/31) + ## 0.2.1 - 2024-12-14 + ### Added -* Optional Mongo aggregate options(https://github.com/haohanyang/mongodb-datasource/pull/33) -* Query Button(https://github.com/haohanyang/mongodb-datasource/pull/35) + +- Optional Mongo aggregate options(https://github.com/haohanyang/mongodb-datasource/pull/33) +- Query Button(https://github.com/haohanyang/mongodb-datasource/pull/35) + ### Changed -* Query won't be executed automatically after focus changes. User needs to manually click buttons(https://github.com/haohanyang/mongodb-datasource/pull/35) -* UI improvement(https://github.com/haohanyang/mongodb-datasource/pull/33) + +- Query won't be executed automatically after focus changes. User needs to manually click buttons(https://github.com/haohanyang/mongodb-datasource/pull/35) +- UI improvement(https://github.com/haohanyang/mongodb-datasource/pull/33) ## 0.2.0 - 2024-12-06 + ### Added -* Enabled Grafana Alerting([67358d5c](https://github.com/haohanyang/mongodb-datasource/commit/67358d5cb1ada5571697de21016f2acf5dbc1234)) + +- Enabled Grafana Alerting([67358d5c](https://github.com/haohanyang/mongodb-datasource/commit/67358d5cb1ada5571697de21016f2acf5dbc1234)) + ### Changed -* Improved query variable([#28](https://github.com/haohanyang/mongodb-datasource/pull/28)) +- Improved query variable([#28](https://github.com/haohanyang/mongodb-datasource/pull/28)) ## 0.1.2 - 2024-11-10 + ### Changed -* Increased code editor's font size([84c7de5d](https://github.com/haohanyang/mongodb-datasource/commit/84c7de5df5035bd4c3214908eb6a389b53732cde)) -* Make `_id` the first column in the table if exists([#26](https://github.com/haohanyang/mongodb-datasource/pull/26)) -* Fix BSON array conversion to JSON([#25](https://github.com/haohanyang/mongodb-datasource/pull/25)) + +- Increased code editor's font size([84c7de5d](https://github.com/haohanyang/mongodb-datasource/commit/84c7de5df5035bd4c3214908eb6a389b53732cde)) +- Make `_id` the first column in the table if exists([#26](https://github.com/haohanyang/mongodb-datasource/pull/26)) +- Fix BSON array conversion to JSON([#25](https://github.com/haohanyang/mongodb-datasource/pull/25)) ## 0.1.1 - 2024-10-25 -### Added + +### Added + - [Query Variable](https://grafana.com/docs/grafana/latest/dashboards/variables/add-template-variables/#add-a-query-variable) support - A quick start script `quick_start.py` to quickly start Grafana and MongoDB containers + ### Changed -* Added "(Optinal)" to connection string configuration tooltip + +- Added "(Optinal)" to connection string configuration tooltip + ## 0.1.0 - 2024-10-13 Initial release. diff --git a/README.md b/README.md index d07bedc..152a2b1 100644 --- a/README.md +++ b/README.md @@ -4,46 +4,63 @@ Integrate MongoDB to Grafana. A free, open source, community-driven alternative to Grafana Lab's MongoDB enterprise plugin and MongoDB Atlas Charts. -一个开源的可视化MongoDB数据库的Grafana插件。 +一个开源的可视化 MongoDB 数据库的 Grafana 插件。 This plugin enables you to query and visualize data from your MongoDB databases directly within Grafana. Leverage the flexibility of MongoDB's aggregation pipeline to create insightful dashboards and panels. +#### Ordinary Query + ![screenshot](/static/screenshot-2.png) +#### Live Streaming + +![screenshot](/static/screenshot-3.png) + ## Features - **Flexible Querying:** Query data using MongoDB's aggregation pipeline syntax in JSON or JavaScript. Support query variables to create dynamic dashboards. - **Time Series & Table Data:** Visualize time-based data or display results in tabular format for various Grafana panels. +- **Live Streaming Support** (Experimental) Watch [MongoDB Change Streams](https://www.mongodb.com/docs/manual/changeStreams/) to monitor MongoDB operations and data in real-time. - **MongoDB Atlas Support** Connect to MongoDB Atlas Services. - **Grafana Alerting Support** Set up alerting rules based on query result - **[Legacy Plugin](https://github.com/JamesOsgood/mongodb-grafana) Compatibility:** Easy migrate from the [legacy plugin](https://github.com/JamesOsgood/mongodb-grafana) with support for its query syntax. ## Requirements: -* Grafana >= 10.4.0 -* MongoDB >= 3.6 + +- Grafana >= 10.4.0 +- MongoDB >= 3.6 + ## Authentication methods -* No authentication -* Username/Password authentication + +- No authentication +- Username/Password authentication ## Getting Started + ### Quick start + Run the script [quick_start.py](scripts/quick_start.py) in the root directory to start MongoDB and Grafana containers with the plugin + ``` python3 scripts/quick_start.py ``` + +Visit Grafana on http://localhost:3000. Add a new MongoDB data source with host `mongo`, port `27017`, and enter the collection name. If you want to use [MongoDB Compass Web](https://github.com/haohanyang/compass-web) GUI to manage the database, uncomment `compass` service in [docker-compose.prod.yaml](/docker-compose.prod.yaml) + ### Full steps + 1. **Download:** Obtain the latest plugin build from the [Release page](https://github.com/haohanyang/mongodb-datasource/releases) or [workflow artifacts](https://github.com/haohanyang/mongodb-datasource/actions?query=branch%3Amaster). -2. **Install:** +2. **Install:** - Extract the downloaded archive (`haohanyang-mongodb-datasource-.zip`) into your Grafana plugins directory (`/var/lib/grafana/plugins` or similar). - Ensure the plugin binaries (`mongodb-datasource/gpx_mongodb_datasource_*`) have execute permissions (`chmod +x`). - Configure the plugin as a data source within Grafana, providing your MongoDB connection details. -Refer to the [example docker-compose.prod.yaml](/docker-compose.prod.yaml) file for a production-ready setup. - -3. **Start Querying:** - - Select your MongoDB data source in a Grafana panel. - - Use the query editor to write your aggregation pipeline queries (see Query Language below). +Refer to the [example docker-compose.prod.yaml](/docker-compose.prod.yaml) file for a production-ready setup. + +3. **Start Querying:** + - Select your MongoDB data source in a Grafana panel. + - Use the query editor to write your aggregation pipeline queries (see Query Language below). ## Query Language @@ -52,156 +69,207 @@ Refer to the [example docker-compose.prod.yaml](/docker-compose.prod.yaml) file Provide the collection name and your MongoDB aggregation pipeline in standard JSON format. **Example:** Retrieve 10 AirBnB listings scraped within the selected time range: + ```json [ - { - "$match": { - "last_scraped": { - "$gt": { - "$date": { - "$numberLong": "$__from" - } - }, - "$lt": { - "$date": { - "$numberLong": "$__to" - } - } - } + { + "$match": { + "last_scraped": { + "$gt": { + "$date": { + "$numberLong": "$__from" + } + }, + "$lt": { + "$date": { + "$numberLong": "$__to" + } } - }, - { - "$limit": 10 + } } + }, + { + "$limit": 10 + } ] ``` ### JavaScript (Legacy & ShadowRealm) -- **Legacy:** Maintain compatibility with the [legacy plugin](https://github.com/JamesOsgood/mongodb-grafana)'s syntax: - ```javascript - db.listingsAndReviews.aggregate([ /* Your aggregation pipeline (JSON) */ ]); - ``` - This gives the same result as the previous JSON query. - ```js - db.listingsAndReviews.aggregate([ - { - "$match": { - "last_scraped": { - "$gt": { - "$date": { - "$numberLong": "$__from" - } - }, - "$lt": { - "$date": { - "$numberLong": "$__to" - } - } - } - } - }, - { - "$limit": 10 - } - ]) - ``` -- **ShadowRealm (Secure):** Define an `aggregate()` function that returns your pipeline. The function executes within a [ShadowRealm](https://github.com/tc39/proposal-shadowrealm) sandboxed environment. - ```javascript - function aggregate() { - // ... your logic based on template variables ... - return [ /* Your aggregation pipeline */ ]; - } - ``` - In this example, only the admin user to can view the query result. - ```js - function aggregate() { - const user = "${__user.login}" - let filter = {} - if (user !== "admin") { - filter = { - noop: true - } - } - return [ - { - $match: filter - }, - { - $limit: 10 - } - ] - } - ``` +- **Legacy:** Maintain compatibility with the [legacy plugin](https://github.com/JamesOsgood/mongodb-grafana)'s syntax: + ```javascript + db.listingsAndReviews.aggregate([ + /* Your aggregation pipeline (JSON) */ + ]); + ``` + This gives the same result as the previous JSON query. + ```js + db.listingsAndReviews.aggregate([ + { + $match: { + last_scraped: { + $gt: { + $date: { + $numberLong: '$__from', + }, + }, + $lt: { + $date: { + $numberLong: '$__to', + }, + }, + }, + }, + }, + { + $limit: 10, + }, + ]); + ``` +- **ShadowRealm (Secure):** Define an `aggregate()` function that returns your pipeline. The function executes within a [ShadowRealm](https://github.com/tc39/proposal-shadowrealm) sandboxed environment. + ```javascript + function aggregate() { + // ... your logic based on template variables ... + return [ + /* Your aggregation pipeline */ + ]; + } + ``` + In this example, only the admin user to can view the query result. + ```js + function aggregate() { + const user = '${__user.login}'; + let filter = {}; + if (user !== 'admin') { + filter = { + noop: true, + }; + } + return [ + { + $match: filter, + }, + { + $limit: 10, + }, + ]; + } + ``` ### Query Types -- **Time series:** For time-based visualizations. Your query must return documents with `ts` (timestamp) and `value` fields. An optional `name` field enables grouping by category. +- **Time series:** For time-based visualizations. Your query must return documents with `ts` (timestamp) and `value` fields. An optional `name` field enables grouping by category. The following query of [Sample AirBnB Listings Dataset](https://www.mongodb.com/docs/atlas/sample-data/sample-airbnb/) shows the number of AirBnB listings in each month that have the first review in the selected time range. - ```json - [ - { - "$match": { - "first_review": { - "$gt": { - "$date": { - "$numberLong": "$__from" - } - }, - "$lt": { - "$date": { - "$numberLong": "$__to" - } - } - } - } - }, - { - "$group": { - "_id": { - "month": { - "$dateToString": { - "format": "%Y-%m", - "date": "$first_review" - } - }, - "property_type": "$property_type" - }, - "value": { - "$count": {} - } - } - }, - { - "$project": { - "ts": { - "$toDate": "$_id.month" - }, - "name": "$_id.property_type", - "value": 1 - } - } - ] - ``` + + ```json + [ + { + "$match": { + "first_review": { + "$gt": { + "$date": { + "$numberLong": "$__from" + } + }, + "$lt": { + "$date": { + "$numberLong": "$__to" + } + } + } + } + }, + { + "$group": { + "_id": { + "month": { + "$dateToString": { + "format": "%Y-%m", + "date": "$first_review" + } + }, + "property_type": "$property_type" + }, + "value": { + "$count": {} + } + } + }, + { + "$project": { + "ts": { + "$toDate": "$_id.month" + }, + "name": "$_id.property_type", + "value": 1 + } + } + ] + ``` + - **Table:** For more flexible data display in tables, pie charts, etc. No specific output schema is required. +### Live Streaming (Experimental) + +Switch on "Streaming" in the Dashboard panel or click "Live" in Explore to enable Live Streaming. The plugin will listen to [change events](https://www.mongodb.com/docs/manual/reference/change-events/) from the collection entered. You can query the change event and show the result in the dashboard. Here are example queries. + +#### Show MongoDB Operation and Timestamp + +```json +[ + { + "$project": { + "Operation": "$operationType", + "Time": "$wallTime" + } + } +] +``` + +#### Show newly inserted values + +If the a new document was inserted which contains `value` field, the query will show the timestamp and the value. The data structure of `insert` event can be found in [create](https://www.mongodb.com/docs/manual/reference/change-events/create). + +```json +[ + { + "$match": { + "fullDocument.value": { + "$exists": true + }, + "operationType": "insert" + } + }, + { + "$project": { + "ts": "$wallTime", + "value": "$fullDocument.value" + } + } +] +``` + +**Note**: Change streams are only available for [replica sets](https://www.mongodb.com/docs/manual/replication/#std-label-replication) and [sharded clusters](https://www.mongodb.com/docs/manual/sharding/#std-label-sharding-background). + ## Supported Data Types -| BSON Type | Support | Go Type | Notes | -|-----------------------|---------|-------------------|-------------------------------------------| -| Double | ✅ | float64 | | -| String | ✅ | string | | -| Object | ✅ | json.RawMessage | May be converted to string if necessary | -| Array | ✅ | json.RawMessage | May be converted to string if necessary | -| ObjectId | ✅ | string | | -| Boolean | ✅ | bool | | -| Date | ✅ | time.Time | | -| Null | ✅ | nil | | -| 32-bit integer | ✅ | int32 | May be converted to int64/float64 | -| 64-bit integer | ✅ | int64 | May be converted to float64 | - - **Note:** Unsupported BSON types are not included in the table and will display as `"[Unsupported type]"`. +| BSON Type | Support | Go Type | Notes | +| -------------- | ------- | --------------- | --------------------------------------- | +| Double | ✅ | float64 | | +| String | ✅ | string | | +| Object | ✅ | json.RawMessage | May be converted to string if necessary | +| Array | ✅ | json.RawMessage | May be converted to string if necessary | +| ObjectId | ✅ | string | | +| Boolean | ✅ | bool | | +| Date | ✅ | time.Time | | +| Null | ✅ | nil | | +| 32-bit integer | ✅ | int32 | May be converted to int64/float64 | +| 64-bit integer | ✅ | int64 | May be converted to float64 | +| Timestamps | ✅ | time.Time | The `ordinal` part is truncated | + +**Note:** Unsupported BSON types are not included in the table and will display as `"[Unsupported type]"`. ## License + [Apache-2.0](/LICENSE) diff --git a/docker-compose.prod.yaml b/docker-compose.prod.yaml index d6f9a63..c95d014 100644 --- a/docker-compose.prod.yaml +++ b/docker-compose.prod.yaml @@ -15,3 +15,11 @@ services: image: mongo ports: - 27017:27017 + + # Uncomment the following to run MongoDB Compass Web + # compass: + # image: haohanyang/compass-web + # ports: + # - 8080:8080 + # links: + # - mongo diff --git a/docker-compose.test.yaml b/docker-compose.test.yaml new file mode 100644 index 0000000..b116383 --- /dev/null +++ b/docker-compose.test.yaml @@ -0,0 +1,56 @@ +services: + grafana: + user: root + container_name: mongodb-datasource-grafana + build: + context: ./.config + args: + grafana_image: ${GRAFANA_IMAGE:-grafana} + grafana_version: ${GRAFANA_VERSION:-11.2.0} + development: ${DEVELOPMENT:-false} + ports: + - 3000:3000/tcp + - 2345:2345/tcp # delve + security_opt: + - 'apparmor:unconfined' + - 'seccomp:unconfined' + cap_add: + - SYS_PTRACE + volumes: + - ./dist:/var/lib/grafana/plugins/haohanyang-mongodb-datasource + - ./provisioning:/etc/grafana/provisioning + - .:/root/haohanyang-mongodb-datasource + + environment: + NODE_ENV: development + GF_LOG_FILTERS: plugin.haohanyang-mongodb-datasource:debug + GF_LOG_LEVEL: debug + GF_DATAPROXY_LOGGING: 1 + GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: haohanyang-mongodb-datasource + networks: + - mongodb-datasource + + mongo-no-auth: + image: mongo + container_name: mongodb-datasource-mongo-no-auth + ports: + - 27018:27017 + networks: + - mongodb-datasource + + mongo-username-password-auth: + image: mongo + container_name: mongodb-datasource-mongo-username-password-auth + ports: + - 27019:27017 + environment: + MONGO_INITDB_ROOT_USERNAME: username + MONGO_INITDB_ROOT_PASSWORD: password + networks: + - mongodb-datasource + +networks: + mongodb-datasource: + driver: bridge + + \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index b116383..d4282f9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,28 +29,31 @@ services: GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: haohanyang-mongodb-datasource networks: - mongodb-datasource - - mongo-no-auth: + + mongo: image: mongo - container_name: mongodb-datasource-mongo-no-auth + container_name: mongodb-datasource-mongo + command: [--replSet, rs0] ports: - - 27018:27017 + - 27017:27017 + healthcheck: + test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0, host:'mongo:27017'}]}) }" | mongosh --port 27017 --quiet + interval: 10s + timeout: 20s + start_period: 0s + start_interval: 1s + retries: 10 networks: - mongodb-datasource - mongo-username-password-auth: - image: mongo - container_name: mongodb-datasource-mongo-username-password-auth + compass: + image: haohanyang/compass-web + container_name: mongodb-datasource-compass ports: - - 27019:27017 - environment: - MONGO_INITDB_ROOT_USERNAME: username - MONGO_INITDB_ROOT_PASSWORD: password + - 8080:8080 networks: - mongodb-datasource networks: mongodb-datasource: driver: bridge - - \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index efe4194..70f4de8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "mongodb-datasource", - "version": "0.2.1", + "version": "0.3.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "mongodb-datasource", - "version": "0.2.1", + "version": "0.3.0", "license": "Apache-2.0", "dependencies": { "@emotion/css": "11.10.6", diff --git a/package.json b/package.json index d5dc9d6..7df3ef1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mongodb-datasource", - "version": "0.2.1", + "version": "0.3.0", "scripts": { "build": "webpack -c ./.config/webpack/webpack.config.ts --env production", "dev": "webpack -w -c ./.config/webpack/webpack.config.ts --env development", @@ -72,4 +72,4 @@ "tslib": "2.5.3" }, "packageManager": "npm@9.6.7" -} \ No newline at end of file +} diff --git a/pkg/models/column.go b/pkg/models/column.go index b66a471..8769ebd 100644 --- a/pkg/models/column.go +++ b/pkg/models/column.go @@ -16,7 +16,7 @@ type Column struct { BsonTypes []bsontype.Type } -var UNSUPPORTED_TYPE = "[Unsupported type]" +var UNSUPPORTED_TYPE = "[Unsupported type %s]" func (c *Column) AppendValue(rv bson.RawValue) error { switch rv.Type { @@ -119,6 +119,14 @@ func (c *Column) AppendValue(rv bson.RawValue) error { c.Field.Append(pointer(rv.Time())) + case bson.TypeTimestamp: + if c.Type() != data.FieldTypeNullableTime { + return fmt.Errorf("field %s should have type %s, but got %s", c.Name, c.Type().ItemTypeString(), rv.Type.String()) + } + + t, _ := rv.Timestamp() + c.Field.Append(pointer(time.Unix(int64(t), 0))) + case bson.TypeObjectID: if c.Type() != data.FieldTypeNullableString { return fmt.Errorf("field %s should have type %s, but got %s", c.Name, c.Type().ItemTypeString(), rv.Type.String()) @@ -152,7 +160,7 @@ func (c *Column) AppendValue(rv bson.RawValue) error { return fmt.Errorf("field %s should have type %s, but got %s", c.Name, c.Type().ItemTypeString(), rv.Type.String()) } - c.Field.Append(pointer(UNSUPPORTED_TYPE)) + c.Field.Append(pointer(fmt.Sprintf(UNSUPPORTED_TYPE, rv.Type.String()))) } c.BsonTypes = append(c.BsonTypes, rv.Type) @@ -197,6 +205,11 @@ func NewColumn(rowIndex int, element bson.RawElement) (*Column, error) { field = data.NewField(key, nil, make([]*time.Time, rowIndex+1)) field.Set(rowIndex, pointer(value.Time())) + case bson.TypeTimestamp: + t, _ := value.Timestamp() + field = data.NewField(key, nil, make([]*time.Time, rowIndex+1)) + field.Set(rowIndex, pointer(time.Unix(int64(t), 0))) + case bson.TypeObjectID: field = data.NewField(key, nil, make([]*string, rowIndex+1)) field.Set(rowIndex, pointer(value.ObjectID().String())) @@ -221,7 +234,7 @@ func NewColumn(rowIndex int, element bson.RawElement) (*Column, error) { default: field = data.NewField(key, nil, make([]*string, rowIndex+1)) - field.Set(rowIndex, pointer(UNSUPPORTED_TYPE)) + field.Set(rowIndex, pointer(fmt.Sprintf(UNSUPPORTED_TYPE, value.Type.String()))) } return &Column{ diff --git a/pkg/models/stream.go b/pkg/models/stream.go new file mode 100644 index 0000000..5b4854c --- /dev/null +++ b/pkg/models/stream.go @@ -0,0 +1,196 @@ +package models + +import ( + "fmt" + "sync" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "go.mongodb.org/mongo-driver/bson" +) + +type Stream struct { + mu sync.Mutex + Fields map[string]*data.Field + Size int +} + +func (s *Stream) initValue(element bson.RawElement) error { + key := element.Key() + value := element.Value() + var field *data.Field + + rowIndex := s.Size + + switch value.Type { + case bson.TypeBoolean: + field = data.NewField(key, nil, make([]*bool, rowIndex+1)) + field.Set(rowIndex, pointer(value.Boolean())) + + case bson.TypeInt32: + // Convert int32 to int64 + field = data.NewField(key, nil, make([]*int64, rowIndex+1)) + field.Set(rowIndex, pointer(int64(value.Int32()))) + + case bson.TypeInt64: + field = data.NewField(key, nil, make([]*int64, rowIndex+1)) + field.Set(rowIndex, pointer(value.Int64())) + + case bson.TypeDouble: + field = data.NewField(key, nil, make([]*float64, rowIndex+1)) + field.Set(rowIndex, pointer(value.Double())) + + case bson.TypeString: + field = data.NewField(key, nil, make([]*string, rowIndex+1)) + field.Set(rowIndex, pointer(value.StringValue())) + + case bson.TypeDateTime: + field = data.NewField(key, nil, make([]*time.Time, rowIndex+1)) + field.Set(rowIndex, pointer(value.Time())) + + case bson.TypeTimestamp: + t, _ := value.Timestamp() + field = data.NewField(key, nil, make([]*time.Time, rowIndex+1)) + field.Set(rowIndex, pointer(time.Unix(int64(t), 0))) + + case bson.TypeObjectID: + field = data.NewField(key, nil, make([]*string, rowIndex+1)) + field.Set(rowIndex, pointer(value.ObjectID().String())) + + case bson.TypeEmbeddedDocument: + field = data.NewField(key, nil, make([]*string, rowIndex+1)) + + v, err := rawDocToJson(value) + if err != nil { + return err + } + field.Set(rowIndex, &v) + + case bson.TypeArray: + field = data.NewField(key, nil, make([]*string, rowIndex+1)) + + v, err := rawArrayToJson(value) + if err != nil { + return err + } + field.Set(rowIndex, &v) + + default: + field = data.NewField(key, nil, make([]*string, rowIndex+1)) + field.Set(rowIndex, pointer(fmt.Sprintf(UNSUPPORTED_TYPE, value.Type.String()))) + } + + s.Fields[key] = field + return nil +} + +func (s *Stream) AddValue(re bson.RawElement) error { + + s.mu.Lock() + defer s.mu.Unlock() + + name := re.Key() + rv := re.Value() + + if field, ok := s.Fields[name]; ok { + switch rv.Type { + case bson.TypeNull: + field.Append(nil) + + case bson.TypeBoolean: + if field.Type() != data.FieldTypeNullableBool { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + field.Append(pointer(rv.Boolean())) + + case bson.TypeInt32: + v := rv.Int32() + + if field.Type() == data.FieldTypeNullableInt64 { + field.Append(pointer(int64(v))) + } else { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + case bson.TypeInt64: + v := rv.Int64() + + if field.Type() == data.FieldTypeNullableInt64 { + field.Append(pointer(v)) + } else { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + case bson.TypeDouble: + v := rv.Double() + + if field.Type() == data.FieldTypeNullableFloat64 { + field.Append(pointer(v)) + } else { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + case bson.TypeString: + if field.Type() != data.FieldTypeNullableString { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + field.Append(pointer(rv.StringValue())) + + case bson.TypeDateTime: + if field.Type() != data.FieldTypeNullableTime { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + field.Append(pointer(rv.Time())) + + case bson.TypeTimestamp: + if field.Type() != data.FieldTypeNullableTime { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + t, _ := rv.Timestamp() + field.Append(pointer(time.Unix(int64(t), 0))) + + case bson.TypeObjectID: + if field.Type() != data.FieldTypeNullableString { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + field.Append(pointer(rv.ObjectID().String())) + + case bson.TypeEmbeddedDocument: + if field.Type() != data.FieldTypeNullableString { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + v, err := rawDocToJson(rv) + if err != nil { + return err + } + + field.Append(&v) + case bson.TypeArray: + if field.Type() != data.FieldTypeNullableString { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + v, err := rawArrayToJson(rv) + if err != nil { + return err + } + field.Append(&v) + + default: + if field.Type() != data.FieldTypeNullableString { + return fmt.Errorf("field %s should have type %s, but got %s", name, field.Type().ItemTypeString(), rv.Type.String()) + } + + field.Append(pointer(fmt.Sprintf(UNSUPPORTED_TYPE, rv.Type.String()))) + } + + return nil + + } else { + return s.initValue(re) + } +} diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 60a593f..55f0a53 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -9,7 +9,9 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/haohanyang/mongodb-datasource/pkg/models" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -238,3 +240,74 @@ func (d *Datasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRe Message: "Successfully connects to MongoDB", }, nil } + +func (d *Datasource) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + backend.Logger.Info("User subscribed to channel", "path", req.Path) + + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusOK, + }, nil +} + +func (d *Datasource) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + backend.Logger.Info("User published to channel", "path", req.Path) + + return &backend.PublishStreamResponse{ + Status: backend.PublishStreamStatusPermissionDenied, + }, nil +} + +func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + backend.Logger.Info("User was running the stream on channel", "path", req.Path) + + qm := queryModel{} + json.Unmarshal(req.Data, &qm) + + var pipeline []bson.D + + err := bson.UnmarshalExtJSON([]byte(qm.QueryText), false, &pipeline) + if err != nil { + backend.Logger.Error("Failed to unmarshal JsonExt", "error", err) + return err + } + + mongoStream, err := d.client.Database(d.database).Watch(ctx, pipeline) + if err != nil { + backend.Logger.Error("Failed to listen to Mongo change streams", "error", err) + return err + } + + go watchChangeStream(ctx, &qm, mongoStream, sender) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func watchChangeStream(ctx context.Context, qm *queryModel, stream *mongo.ChangeStream, sender *backend.StreamSender) { + defer stream.Close(ctx) + + for stream.Next(ctx) { + var err error + var frame *data.Frame + + if qm.QueryType == "table" { + frame, err = CreateTableFramesFromStream(ctx, "stream", stream) + } else { + frame, err = CreateTableFramesFromStream(ctx, "stream", stream) + } + + if err != nil { + backend.Logger.Error("Failed to create data frame from stream", "error", err) + } else { + err = sender.SendFrame(frame, data.IncludeAll) + if err != nil { + backend.Logger.Error("Failed to send frame", "error", err) + } + } + + } +} diff --git a/pkg/plugin/query.go b/pkg/plugin/query.go index 983b559..5d16cd7 100644 --- a/pkg/plugin/query.go +++ b/pkg/plugin/query.go @@ -115,3 +115,51 @@ func CreateTableFramesFromQuery(ctx context.Context, tableName string, cursor *m return frame, nil } + +func CreateTableFramesFromStream(ctx context.Context, tableName string, stream *mongo.ChangeStream) (*data.Frame, error) { + + frame := data.NewFrame(tableName) + + elements, err := stream.Current.Elements() + if err != nil { + return nil, err + } + + for _, element := range elements { + if element.Value().Type == bson.TypeNull { + continue + } + nc, err := models.NewColumn(0, element) + if err != nil { + return nil, err + } + + nc.Rectify() + frame.Fields = append(frame.Fields, nc.Field) + + } + + return frame, nil +} + +func CreateTimeSeriesFramesFromStream(ctx context.Context, tableName string, stream *mongo.ChangeStream) (*data.Frame, error) { + + name := "" + rawName := stream.Current.Lookup("name") + if !rawName.IsZero() && rawName.Type == bson.TypeString { + name = rawName.StringValue() + } + + elements, err := stream.Current.Elements() + if err != nil { + return nil, err + } + + table := models.NewTimeSeriesTable(name) + err = table.AppendRow(elements) + if err != nil { + return nil, err + } + + return table.MakeDataFrame(), nil +} diff --git a/src/components/QueryEditor.css b/src/components/QueryEditor.css new file mode 100644 index 0000000..c26a563 --- /dev/null +++ b/src/components/QueryEditor.css @@ -0,0 +1,24 @@ +.field-label { + font-size: 12px; + font-weight: 500; + line-height: 1.25; + margin-bottom: 4px; + color: rgb(204, 204, 220); + max-width: 480px; +} + +.field-description { + color: rgba(204, 204, 220, 0.65); + font-size: 12px; + font-weight: 400; + margin-top: 2px; + display: block; +} + +.query-editor-collection-streaming-field { + margin-bottom: 0px !important; +} + +.query-editor-collection-streaming-container { + margin-bottom: 6px; +} \ No newline at end of file diff --git a/src/components/QueryEditor.tsx b/src/components/QueryEditor.tsx index a743e1a..0a37aac 100644 --- a/src/components/QueryEditor.tsx +++ b/src/components/QueryEditor.tsx @@ -1,4 +1,4 @@ -import React, { ChangeEvent, useRef, useState } from "react"; +import React, { ChangeEvent, FormEventHandler, useRef, useState } from "react"; import { Button, CodeEditor, @@ -10,13 +10,16 @@ import { ControlledCollapse, InlineSwitch, RadioButtonGroup, - Stack + Stack, + FeatureBadge, + Switch } from "@grafana/ui"; -import { QueryEditorProps, SelectableValue } from "@grafana/data"; +import { CoreApp, FeatureState, QueryEditorProps, SelectableValue } from "@grafana/data"; import { DataSource } from "../datasource"; import { MongoDataSourceOptions, MongoQuery, QueryLanguage, QueryType, DEFAULT_QUERY } from "../types"; import { parseJsQuery, parseJsQueryLegacy, validateJsonQueryText, validatePositiveNumber } from "../utils"; import * as monacoType from "monaco-editor/esm/vs/editor/editor.api"; +import "./QueryEditor.css"; type Props = QueryEditorProps; @@ -40,7 +43,7 @@ const languageOptions: Array> = [ ]; -export function QueryEditor({ query, onChange, onRunQuery, data }: Props) { +export function QueryEditor({ query, onChange, app }: Props) { const codeEditorRef = useRef(null); const [queryTextError, setQueryTextError] = useState(null); @@ -78,7 +81,7 @@ export function QueryEditor({ query, onChange, onRunQuery, data }: Props) { const onMaxTimeMSChange = (event: ChangeEvent) => { setMaxTimeMSText(event.target.value); - console.log(event.target.value); + if (!event.target.value) { onChange({ ...query, aggregateMaxTimeMS: undefined }); } else if (validatePositiveNumber(event.target.value)) { @@ -128,6 +131,10 @@ export function QueryEditor({ query, onChange, onRunQuery, data }: Props) { } }; + const onIsStreamingChange: FormEventHandler = e => { + onChange({ ...query, isStreaming: e.currentTarget.checked }); + }; + if (!query.queryLanguage) { query.queryLanguage = DEFAULT_QUERY.queryLanguage; } @@ -137,6 +144,17 @@ export function QueryEditor({ query, onChange, onRunQuery, data }: Props) { + {app !== CoreApp.Explore &&
+ +
Streaming
+ +
+ } horizontal={true}> + +
+
Watch MongoDB Change Streams
+
} + @@ -186,7 +204,6 @@ export function QueryEditor({ query, onChange, onRunQuery, data }: Props) { - ); diff --git a/src/datasource.ts b/src/datasource.ts index f539198..22c7c55 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,8 +1,12 @@ -import { DataSourceInstanceSettings, CoreApp, ScopedVars, DataQueryRequest, LegacyMetricFindQueryOptions, MetricFindValue, dateTime } from "@grafana/data"; -import { DataSourceWithBackend, getTemplateSrv } from "@grafana/runtime"; -import { parseJsQuery, getBucketCount, parseJsQueryLegacy, randomId, getMetricValues, datetimeToJson } from "./utils"; +import { + DataSourceInstanceSettings, CoreApp, ScopedVars, DataQueryRequest, LegacyMetricFindQueryOptions, + MetricFindValue, dateTime, LiveChannelScope, DataQueryResponse, + LoadingState +} from "@grafana/data"; +import { DataSourceWithBackend, getGrafanaLiveSrv, getTemplateSrv } from "@grafana/runtime"; +import { parseJsQuery, getBucketCount, parseJsQueryLegacy, randomId, getMetricValues, datetimeToJson, base64UrlEncode } from "./utils"; import { MongoQuery, MongoDataSourceOptions, DEFAULT_QUERY, QueryLanguage, VariableQuery } from "./types"; -import { firstValueFrom } from "rxjs"; +import { firstValueFrom, merge, Observable, of } from "rxjs"; export class DataSource extends DataSourceWithBackend { @@ -14,7 +18,6 @@ export class DataSource extends DataSourceWithBackend): Observable { + if (request.liveStreaming) { + const observables = request.targets.map(query => { + return getGrafanaLiveSrv().getDataStream({ + addr: { + scope: LiveChannelScope.DataSource, + namespace: this.uid, + path: `mongodb-datasource/${query.refId}`, + data: { + ...query, + }, + }, + }); + }); + + return merge(...observables); + } + + const streamQueries = request.targets.filter(query => query.isStreaming); + + if (streamQueries.length === 0) { + return super.query(request); + + } else if (streamQueries.length === request.targets.length) { + const observables = request.targets.map(query => { + return getGrafanaLiveSrv().getDataStream({ + addr: { + scope: LiveChannelScope.DataSource, + namespace: this.uid, + path: `mongodb-datasource/${base64UrlEncode(query.collection)}-${base64UrlEncode(query.queryText)}`, + data: { + ...query, + }, + }, + }); + }); + + return merge(...observables); + } else { + // Mix of streaming requests and normal requests is not supported + return of({ + data: [], + error: { + message: "Mix of streaming requests and normal requests is not supported", + }, + state: LoadingState.Error, + }); + } + } } diff --git a/src/plugin.json b/src/plugin.json index ebd33c8..e857680 100644 --- a/src/plugin.json +++ b/src/plugin.json @@ -3,6 +3,7 @@ "type": "datasource", "name": "MongoDB", "alerting": true, + "streaming": true, "id": "haohanyang-mongodb-datasource", "metrics": true, "backend": true, diff --git a/src/types.ts b/src/types.ts index ae11a59..70b20b8 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,6 +7,8 @@ export interface MongoQuery extends DataQuery { queryType?: string; queryLanguage?: string; + isStreaming?: boolean; + // Aggregate options aggregateMaxTimeMS?: number; aggregateComment?: string; @@ -30,13 +32,10 @@ export const QueryLanguage = { export const DEFAULT_QUERY: Partial = { - queryText: `[ - { - "$limit": 10 - } -]`, - queryType: QueryType.TIMESERIES, - queryLanguage: QueryLanguage.JSON + queryText: "", + queryType: QueryType.TABLE, + queryLanguage: QueryLanguage.JSON, + isStreaming: false }; diff --git a/src/utils.ts b/src/utils.ts index 9ad876d..17e9bb5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -121,7 +121,6 @@ export function getMetricValues(response: DataQueryResponse): MetricFindValue[] } export function validatePositiveNumber(num: string) { - if (!/^\d+$/.test(num.trim())) { return false; } @@ -129,3 +128,15 @@ export function validatePositiveNumber(num: string) { const parsed = parseInt(num, 10); return parsed > 0; } + + +export function base64UrlEncode(input: string | undefined) { + if (!input) { + return ""; + } + // Encode input string to Base64 + let base64 = btoa(input); + // Make the Base64 string URL-safe + let base64Url = base64.replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/, ""); + return base64Url; +} diff --git a/static/screenshot-3.png b/static/screenshot-3.png new file mode 100644 index 0000000..ee4df20 Binary files /dev/null and b/static/screenshot-3.png differ