From 099fa2b262dbe18eb0e7824ec96b3fee89af6081 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 20 Feb 2023 17:39:36 +0100 Subject: [PATCH] Add shard and token to ObservedQuery/Batch We need to log this information. --- query_executor.go | 11 ++++++---- session.go | 52 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/query_executor.go b/query_executor.go index 9889316fb..8e1faa6d1 100644 --- a/query_executor.go +++ b/query_executor.go @@ -8,7 +8,7 @@ import ( type ExecutableQuery interface { execute(ctx context.Context, conn *Conn) *Iter - attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) + attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo, shard int, token token) retryPolicy() RetryPolicy speculativeExecutionPolicy() SpeculativeExecutionPolicy GetRoutingKey() ([]byte, error) @@ -27,12 +27,15 @@ type queryExecutor struct { policy HostSelectionPolicy } -func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter { +func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn, selected SelectedHost) *Iter { start := time.Now() iter := qry.execute(ctx, conn) end := time.Now() - qry.attempt(q.pool.keyspace, end, start, iter, conn.host) + // conn.scyllaSupported.shard might be different shard than the one based on selected.Token() in case + // heavy loaded optimization kicks in or a connection to the data-owning shard is not available. + // We use conn.scyllaSupported.shard as that is the actual shard number where the query was sent. + qry.attempt(q.pool.keyspace, end, start, iter, conn.host, conn.scyllaSupported.shard, selected.Token()) return iter } @@ -124,7 +127,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne continue } - iter = q.attemptQuery(ctx, qry, conn) + iter = q.attemptQuery(ctx, qry, conn, selectedHost) iter.host = selectedHost.Info() // Update host switch iter.err { diff --git a/session.go b/session.go index eb9c6c7e9..d937a2d47 100644 --- a/session.go +++ b/session.go @@ -1111,22 +1111,26 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { return conn.executeQuery(ctx, q) } -func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { +func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo, shard int, token token) { latency := end.Sub(start) attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil) if q.observer != nil { + scyllaToken, _ := token.(int64Token) + q.observer.ObserveQuery(q.Context(), ObservedQuery{ - Keyspace: keyspace, - Statement: q.stmt, - Values: q.values, - Start: start, - End: end, - Rows: iter.numRows, - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, + Keyspace: keyspace, + Statement: q.stmt, + Values: q.values, + Start: start, + End: end, + Rows: iter.numRows, + Host: host, + ScyllaShard: shard, + ScyllaToken: int64(scyllaToken), + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, }) } } @@ -1930,7 +1934,7 @@ func (b *Batch) WithTimestamp(timestamp int64) *Batch { return b } -func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { +func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo, shard int, token token) { latency := end.Sub(start) attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil) @@ -1946,6 +1950,8 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host values[i] = entry.Args } + scyllaToken, _ := token.(int64Token) + b.observer.ObserveBatch(b.Context(), ObservedBatch{ Keyspace: keyspace, Statements: statements, @@ -1953,10 +1959,12 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host Start: start, End: end, // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, + Host: host, + ScyllaShard: shard, + ScyllaToken: int64(scyllaToken), + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, }) } @@ -2176,6 +2184,12 @@ type ObservedQuery struct { // Host is the informations about the host that performed the query Host *HostInfo + // ScyllaShard is the shard number that performed the batch. + ScyllaShard int + + // ScyllaToken is the token that was used. + ScyllaToken int64 + // The metrics per this host Metrics *hostMetrics @@ -2213,6 +2227,12 @@ type ObservedBatch struct { // Host is the informations about the host that performed the batch Host *HostInfo + // ScyllaShard is the shard number that performed the batch. + ScyllaShard int + + // ScyllaToken is the token that was used. + ScyllaToken int64 + // Err is the error in the batch query. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error Err error