Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Merge Add shard and token to ObservedQuery/Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-sucha committed Mar 1, 2023
2 parents 7092a3d + 099fa2b commit cea7224
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
11 changes: 7 additions & 4 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type ExecutableQuery interface {
execute(ctx context.Context, conn *Conn) *Iter
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo, shard int, token token)
retryPolicy() RetryPolicy
speculativeExecutionPolicy() SpeculativeExecutionPolicy
GetRoutingKey() ([]byte, error)
Expand All @@ -27,12 +27,15 @@ type queryExecutor struct {
policy HostSelectionPolicy
}

func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn, selected SelectedHost) *Iter {
start := time.Now()
iter := qry.execute(ctx, conn)
end := time.Now()

qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
// conn.scyllaSupported.shard might be different shard than the one based on selected.Token() in case
// heavy loaded optimization kicks in or a connection to the data-owning shard is not available.
// We use conn.scyllaSupported.shard as that is the actual shard number where the query was sent.
qry.attempt(q.pool.keyspace, end, start, iter, conn.host, conn.scyllaSupported.shard, selected.Token())

return iter
}
Expand Down Expand Up @@ -124,7 +127,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
continue
}

iter = q.attemptQuery(ctx, qry, conn)
iter = q.attemptQuery(ctx, qry, conn, selectedHost)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
Expand Down
52 changes: 36 additions & 16 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,22 +1111,26 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter {
return conn.executeQuery(ctx, q)
}

func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo, shard int, token token) {
latency := end.Sub(start)
attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil)

if q.observer != nil {
scyllaToken, _ := token.(int64Token)

q.observer.ObserveQuery(q.Context(), ObservedQuery{
Keyspace: keyspace,
Statement: q.stmt,
Values: q.values,
Start: start,
End: end,
Rows: iter.numRows,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Keyspace: keyspace,
Statement: q.stmt,
Values: q.values,
Start: start,
End: end,
Rows: iter.numRows,
Host: host,
ScyllaShard: shard,
ScyllaToken: int64(scyllaToken),
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
})
}
}
Expand Down Expand Up @@ -1930,7 +1934,7 @@ func (b *Batch) WithTimestamp(timestamp int64) *Batch {
return b
}

func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo, shard int, token token) {
latency := end.Sub(start)
attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil)

Expand All @@ -1946,17 +1950,21 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
values[i] = entry.Args
}

scyllaToken, _ := token.(int64Token)

b.observer.ObserveBatch(b.Context(), ObservedBatch{
Keyspace: keyspace,
Statements: statements,
Values: values,
Start: start,
End: end,
// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Host: host,
ScyllaShard: shard,
ScyllaToken: int64(scyllaToken),
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
})
}

Expand Down Expand Up @@ -2176,6 +2184,12 @@ type ObservedQuery struct {
// Host is the informations about the host that performed the query
Host *HostInfo

// ScyllaShard is the shard number that performed the batch.
ScyllaShard int

// ScyllaToken is the token that was used.
ScyllaToken int64

// The metrics per this host
Metrics *hostMetrics

Expand Down Expand Up @@ -2213,6 +2227,12 @@ type ObservedBatch struct {
// Host is the informations about the host that performed the batch
Host *HostInfo

// ScyllaShard is the shard number that performed the batch.
ScyllaShard int

// ScyllaToken is the token that was used.
ScyllaToken int64

// Err is the error in the batch query.
// It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
Err error
Expand Down

0 comments on commit cea7224

Please sign in to comment.