Skip to content

Commit

Permalink
add transaction filter tag support (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsulli-rai authored Mar 8, 2023
1 parent 0329cce commit abc116c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 47 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v0.4.1-alpha

* add support for transaction tagging

## v0.4.0-alpha

* Complete overhaul of transaction response handling and results access API.
Expand Down
15 changes: 8 additions & 7 deletions examples/execute/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
)

type Options struct {
Database string `short:"d" long:"database" required:"true" description:"database name"`
Engine string `short:"e" long:"engine" required:"true" descriptio:"engine name"`
Code string `short:"c" long:"code" description:"rel source code"`
File string `short:"f" long:"file" description:"rel source file"`
Readonly bool `long:"readonly" description:"readonly query (default: false)"`
Profile string `long:"profile" default:"default" description:"config profile"`
Database string `short:"d" long:"database" required:"true" description:"database name"`
Engine string `short:"e" long:"engine" required:"true" descriptio:"engine name"`
Code string `short:"c" long:"code" description:"rel source code"`
File string `short:"f" long:"file" description:"rel source file"`
Readonly bool `long:"readonly" description:"readonly query (default: false)"`
Profile string `long:"profile" default:"default" description:"config profile"`
Tags []string `long:"tag" default:"" description:"tag applied to query --tag=tagA --tag=tagB"`
}

func getCode(opts *Options) (string, error) {
Expand All @@ -52,7 +53,7 @@ func run(opts *Options) error {
if err != nil {
return err
}
rsp, err := client.Execute(opts.Database, opts.Engine, source, nil, opts.Readonly)
rsp, err := client.Execute(opts.Database, opts.Engine, source, nil, opts.Readonly, opts.Tags...)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions examples/get_transactions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
)

type Options struct {
Profile string `long:"profile" default:"default" description:"config profile"`
Profile string `long:"profile" default:"default" description:"config profile"`
Tags []string `long:"tags" default:"" description:"filter tansactions by a comma separated list of tags"`
}

func run(opts *Options) error {
client, err := rai.NewClientFromConfig(opts.Profile)
if err != nil {
return err
}
rsp, err := client.ListTransactions()
rsp, err := client.ListTransactions(opts.Tags...)
if err != nil {
return err
}
Expand Down
16 changes: 12 additions & 4 deletions rai/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,9 +943,10 @@ const twoMinutes = 2 * time.Minute
func (c *Client) Execute(
database, engine, source string,
inputs map[string]string, readonly bool,
tags ...string,
) (*TransactionResponse, error) {
t0 := time.Now()
rsp, err := c.ExecuteAsync(database, engine, source, inputs, readonly)
rsp, err := c.ExecuteAsync(database, engine, source, inputs, readonly, tags...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1039,6 +1040,7 @@ func ReadTransactionResponse(rsp *http.Response) (*TransactionResponse, error) {
func (c *Client) ExecuteAsync(
database, engine, query string,
inputs map[string]string, readonly bool,
tags ...string,
) (*TransactionResponse, error) {
var inputList = make([]interface{}, 0)
for k, v := range inputs {
Expand All @@ -1050,7 +1052,8 @@ func (c *Client) ExecuteAsync(
Engine: engine,
Query: query,
ReadOnly: readonly,
Inputs: inputList}
Inputs: inputList,
Tags: tags}
var rsp *http.Response
err := c.request(http.MethodPost, PathTransactions, nil, nil, tx, &rsp)
if err != nil {
Expand Down Expand Up @@ -1260,9 +1263,14 @@ type listTransactionsResponse struct {
Transactions []Transaction `json:"transactions"`
}

func (c *Client) ListTransactions() ([]Transaction, error) {
func (c *Client) ListTransactions(tags ...string) ([]Transaction, error) {
args, err := queryArgs("tags", tags)
if err != nil {
return nil, err
}

var result listTransactionsResponse
err := c.Get(makePath(PathTransactions), nil, nil, &result)
err = c.Get(makePath(PathTransactions), nil, args, &result)
return result.Transactions, err
}

Expand Down
55 changes: 30 additions & 25 deletions rai/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package rai

import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -209,44 +211,47 @@ func TestExecuteV1(t *testing.T) {
assert.Equal(t, expected, columns)
}

/*
// Test transaction asynchronous execution
func TestExecuteAsync(t *testing.T) {
func TestListTransactions(t *testing.T) {
client := test.client

query := "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}"
rsp, err := client.Execute(test.databaseName, test.engineName, query, nil, true)
txn, err := client.Execute(test.databaseName, test.engineName, query, nil, true)
assert.Nil(t, err)

expectedResults := []ArrowRelation{
ArrowRelation{"/:output/Int64/Int64/Int64/Int64", [][]interface{}{
{1., 2., 3., 4., 5.},
{1., 4., 9., 16., 25.},
{1., 8., 27., 64., 125.},
{1., 16., 81., 256., 625.},
}},
}
expectedProblems := []Problem{}
assert.Equal(t, expectedProblems, txn.Problems)

txns, err := client.ListTransactions()
assert.Nil(t, err)

assert.Equal(t, rsp.Results[0].Table, expectedResults[0].Table)
found := false
for _, i := range txns {
if i.ID == txn.Transaction.ID {
found = true
break
}
}
assert.True(t, found, "transaction id not found in list")
}

var expectedMetadata pb.MetadataInfo
data, _ := os.ReadFile("./metadata.pb")
proto.Unmarshal(data, &expectedMetadata)
// testing tag filters for transactions
func TestListTransactionsByTag(t *testing.T) {
client := test.client

assert.Equal(t, rsp.Metadata.String(), expectedMetadata.String())
query := "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}"
tag := fmt.Sprintf("rai-sdk-go:%d", time.Now().Unix())
txn, err := client.Execute(test.databaseName, test.engineName, query, nil, true, tag)
assert.Nil(t, err)

expectedProblems := []interface{}{}
expectedProblems := []Problem{}
assert.Equal(t, expectedProblems, txn.Problems)

assert.Equal(t, rsp.Problems, expectedProblems)
txns, err := client.ListTransactions(tag)
assert.Nil(t, err)

// also testing Show v2 result format
var io bytes.Buffer
rsp.ShowIO(&io)
expectedOutput := "/:output/Int64/Int64/Int64/Int64\n1, 1, 1, 1\n2, 4, 8, 16\n3, 9, 27, 81\n4, 16, 64, 256\n5, 25, 125, 625\n\n"
assert.Equal(t, 1, len(txns), "filter tag did not apply as expected")

assert.Equal(t, io.String(), expectedOutput)
}
*/

func findRelation(relations []RelationV1, colName string) *RelationV1 {
for _, relation := range relations {
Expand Down
11 changes: 6 additions & 5 deletions rai/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,12 @@ type Transaction struct {
}

type TransactionRequest struct {
Database string `json:"dbname"`
Engine string `json:"engine_name"`
Query string `json:"query"`
ReadOnly bool `json:"readonly"`
Inputs []any `json:"v1_inputs"`
Database string `json:"dbname"`
Engine string `json:"engine_name"`
Query string `json:"query"`
ReadOnly bool `json:"readonly"`
Inputs []any `json:"v1_inputs"`
Tags []string `json:"tags"`
}

type Problem struct {
Expand Down
11 changes: 7 additions & 4 deletions rai/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/stretchr/testify/assert"
)

// track test results in transactions db
const o11yTag = "rai-sdk-go:test"

type tdata struct {
sig Signature
cols [][]any
Expand Down Expand Up @@ -1821,7 +1824,7 @@ func runTests(t *testing.T, tests []execTest) {
if test.showQuery {
fmt.Println(q) // useful for debugging tests
}
rsp, err := test.client.Execute(test.databaseName, test.engineName, q, nil, true)
rsp, err := test.client.Execute(test.databaseName, test.engineName, q, nil, true, o11yTag)
assert.Nil(t, err)
checkResponse(t, tst, rsp)
}
Expand Down Expand Up @@ -1978,7 +1981,7 @@ func TestInterfaceTypes(t *testing.T) {
func TestPrefixMatch(t *testing.T) {
query := `def output = 1, :foo, "a"; 42, :bar, "c"`

rsp, err := test.client.Execute(test.databaseName, test.engineName, dindent(query), nil, true)
rsp, err := test.client.Execute(test.databaseName, test.engineName, dindent(query), nil, true, o11yTag)
assert.Nil(t, err)
assert.Equal(t, 2, len(rsp.Relations()))

Expand Down Expand Up @@ -2046,7 +2049,7 @@ func TestRelationSlice(t *testing.T) {
5, :bip, 3.14, "pi!";
6, :zip, missing, "pip"`

rsp, err := test.client.Execute(test.databaseName, test.engineName, dindent(query), nil, true)
rsp, err := test.client.Execute(test.databaseName, test.engineName, dindent(query), nil, true, o11yTag)
assert.Nil(t, err)
assert.Equal(t, 6, len(rsp.Relations()))

Expand Down Expand Up @@ -2125,7 +2128,7 @@ func TestRelationUnion(t *testing.T) {
5, :bip, 3.14, "pi!";
6, :zip, missing, "pip"`

rsp, err := test.client.Execute(test.databaseName, test.engineName, dindent(query), nil, true)
rsp, err := test.client.Execute(test.databaseName, test.engineName, dindent(query), nil, true, o11yTag)
assert.Nil(t, err)
assert.Equal(t, 6, len(rsp.Relations()))

Expand Down

0 comments on commit abc116c

Please sign in to comment.