Skip to content

Commit

Permalink
Add logpoller.FilteredLogs, parser.NewEventBySubkeyFilter and Indexed…
Browse files Browse the repository at this point in the history
…ValueComparator
  • Loading branch information
reductionista committed Jan 17, 2025
1 parent 55938a0 commit cce7156
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
Expand All @@ -29,6 +30,7 @@ type ORM interface {
MarkFilterBackfilled(ctx context.Context, id int64) (err error)
InsertLogs(context.Context, []Log) (err error)
SelectSeqNums(ctx context.Context) (map[int64]int64, error)
FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type Service struct {
Expand Down Expand Up @@ -257,3 +259,7 @@ func (lp *Service) startFilterBackfill(ctx context.Context, filter Filter, toBlo
lp.lggr.Errorw("Failed to mark filter backfill", "filter", filter, "err", err)
}
}

func (lp *Service) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}
62 changes: 62 additions & 0 deletions pkg/solana/logpoller/mock_orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions pkg/solana/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
addressFieldName = "address"
eventSigFieldName = "event_sig"
defaultSort = "block_number ASC, log_index ASC"
subKeysFieldName = "subkey_values"
)

var (
Expand Down Expand Up @@ -54,6 +55,80 @@ var _ primitives.Visitor = (*pgDSLParser)(nil)

func (v *pgDSLParser) Comparator(_ primitives.Comparator) {}

type IndexedValueComparator struct {
Value IndexedValue
Operator primitives.ComparisonOperator
}

type eventBySubkeyFilter struct {
SubkeyIndex uint64
ValueComparers []IndexedValueComparator
}

func (f *eventBySubkeyFilter) Accept(visitor primitives.Visitor) {
switch v := visitor.(type) {
case *pgDSLParser:
v.VisitEventSubkeysByValueFilter(f)
}
}

func NewEventBySubkeyFilter(subkeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) {
var indexedValueComparators []IndexedValueComparator
for _, cmp := range valueComparers {
iVal, err := NewIndexedValue(cmp.Value)
if err != nil {
return query.Expression{}, err
}
iValCmp := IndexedValueComparator{
Value: iVal,
Operator: cmp.Operator,
}
indexedValueComparators = append(indexedValueComparators, iValCmp)
}
return query.Expression{
Primitive: &eventBySubkeyFilter{
SubkeyIndex: subkeyIndex,
ValueComparers: indexedValueComparators,
},
}, nil
}

func (v *pgDSLParser) VisitEventSubkeysByValueFilter(p *eventBySubkeyFilter) {
if len(p.ValueComparers) > 0 {
if p.SubkeyIndex > 3 { // For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding more db indexes
v.err = fmt.Errorf("invalid subkey index: %d", p.SubkeyIndex)
return
}

// Add 1 since postgresql arrays are 1-indexed.
subkeyIdx := v.args.withIndexedField("subkey_index", p.SubkeyIndex+1)

comps := make([]string, len(p.ValueComparers))
for idx, comp := range p.ValueComparers {
comps[idx], v.err = makeComp(comp, v.args, "subkey_value", subkeyIdx, "subkey_values[:%s] %s :%s")
if v.err != nil {
return
}
}

v.expression = strings.Join(comps, " AND ")
}
}

func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) {
cmp, err := cmpOpToString(comp.Operator)
if err != nil {
return "", err
}

return fmt.Sprintf(
pattern,
subfield,
cmp,
args.withIndexedField(field, comp.Value),
), nil
}

func (v *pgDSLParser) Block(prim primitives.Block) {
cmp, err := cmpOpToString(prim.Operator)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions pkg/solana/logpoller/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,46 @@ func TestDSLParser(t *testing.T) {
})
})

t.Run("query for event topic", func(t *testing.T) {
t.Parallel()

subkeyFilter, err := NewEventBySubkeyFilter(2, []primitives.ValueComparator{
{Value: 4, Operator: primitives.Gt},
{Value: 7, Operator: primitives.Lt},
})
require.NoError(t, err)

parser := &pgDSLParser{}
expressions := []query.Expression{subkeyFilter}
limiter := query.LimitAndSort{}

result, args, err := parser.buildQuery(chainID, expressions, limiter)
expectedQuery := logsQuery(
" WHERE chain_id = :chain_id " +
"AND subkey_values[:subkey_index_0] > :subkey_value_0 AND subkey_values[:subkey_index_0] < :subkey_value_1 ORDER BY " + defaultSort)

var iValLower, iValUpper IndexedValue
iValLower, err = NewIndexedValue(4)
require.NoError(t, err)
iValUpper, err = NewIndexedValue(7)
require.NoError(t, err)

expectedArgs := map[string]any{
"chain_id": chainID,
"subkey_index_0": uint64(3),
"subkey_value_0": iValLower,
"subkey_value_1": iValUpper,
}

require.NoError(t, err)
assert.Equal(t, expectedQuery, result)

var m map[string]any
m, err = args.toArgs()
require.NoError(t, err)
assert.Equal(t, expectedArgs, m)
})

// nested query -> a & (b || c)
t.Run("nested query", func(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit cce7156

Please sign in to comment.