Skip to content

Commit

Permalink
Quieten enhanced telem (#15793)
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav authored Jan 6, 2025
1 parent 0fb7546 commit d27deaa
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 23 deletions.
13 changes: 1 addition & 12 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,11 @@ func hexStringToDecimal(hexString string) (decimal.Decimal, bool) {
func (e *EnhancedTelemetryService[T]) getObservation(finalResult *pipeline.FinalResult) int64 {
singularResult, err := finalResult.SingularResult()
if err != nil {
e.lggr.Warnf("cannot get singular result, job %d", e.job.ID)
return 0
}

finalResultDecimal, err := utils.ToDecimal(singularResult.Value)
if err != nil {
e.lggr.Warnf("cannot parse singular result from bridge task, job %d", e.job.ID)
return 0
}

Expand All @@ -277,7 +275,6 @@ func (e *EnhancedTelemetryService[T]) getObservation(finalResult *pipeline.Final
func (e *EnhancedTelemetryService[T]) getParsedValue(trrs *pipeline.TaskRunResults, trr pipeline.TaskRunResult) float64 {
parsedValue := getJsonParsedValue(trr, trrs)
if parsedValue == nil {
e.lggr.Warnf("cannot get json parse value, job %d, id %s", e.job.ID, trr.Task.DotID())
return 0
}
return *parsedValue
Expand All @@ -302,23 +299,16 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul
if trr.Task.Type() != pipeline.TaskTypeBridge {
continue
}
var bridgeName string
if b, is := trr.Task.(*pipeline.BridgeTask); is {
bridgeName = b.Name
}

if trr.Result.Error != nil {
e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", trr.Result.Error, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
bridgeRawResponse, ok := trr.Result.Value.(string)
if !ok {
e.lggr.Warnw(fmt.Sprintf("cannot parse bridge response from bridge task, job=%d, id=%s, name=%q: expected string, got: %v (type %T)", e.job.ID, trr.Task.DotID(), bridgeName, trr.Result.Value, trr.Result.Value), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
if err != nil {
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
value := e.getParsedValue(trrs, trr)
Expand Down Expand Up @@ -635,12 +625,11 @@ func getPricesFromBridgeTaskByTelemetryField(lggr logger.Logger, bridgeTask pipe
func parsePriceFromTask(lggr logger.Logger, trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, id %s: %s", trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
return 0
}
return val
}
Expand Down
15 changes: 4 additions & 11 deletions core/services/ocrcommon/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ func TestGetObservation(t *testing.T) {

obs := e.getObservation(&pipeline.FinalResult{})
assert.Equal(t, obs, int64(0))
assert.Equal(t, logs.Len(), 1)
assert.Contains(t, logs.All()[0].Message, "cannot get singular result")
assert.Equal(t, 0, logs.Len())

finalResult := &pipeline.FinalResult{
Values: []interface{}{"123456"},
Expand Down Expand Up @@ -457,8 +456,7 @@ func TestCollectAndSend(t *testing.T) {
}

wg.Wait()
assert.Equal(t, logs.Len(), 2)
assert.Contains(t, logs.All()[0].Message, "cannot parse bridge response from bridge task")
assert.Equal(t, 0, logs.Len())

badTrrs = &pipeline.TaskRunResults{
pipeline.TaskRunResult{
Expand All @@ -475,9 +473,7 @@ func TestCollectAndSend(t *testing.T) {
RepTimestamp: observationTimestamp,
}
wg.Wait()
assert.Equal(t, 2, logs.Len())
assert.Contains(t, logs.All()[0].Message, "cannot parse bridge response from bridge task")
assert.Contains(t, logs.All()[1].Message, "cannot get json parse value")
assert.Equal(t, 0, logs.Len())
doneCh <- struct{}{}
}

Expand Down Expand Up @@ -707,10 +703,7 @@ func TestGetPricesFromBridgeTaskByOrder(t *testing.T) {
require.Equal(t, benchmarkPrice, float64(0))
require.Equal(t, bid, float64(0))
require.Equal(t, ask, float64(0))
require.Equal(t, 3, logs.Len())
require.Contains(t, logs.All()[0].Message, "cannot parse EA telemetry price to float64, DOT id ds1_benchmark")
require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry price to float64, DOT id ds2_bid")
require.Contains(t, logs.All()[2].Message, "cannot parse EA telemetry price to float64, DOT id ds3_ask")
require.Equal(t, 0, logs.Len())

benchmarkPrice, bid, ask = getPricesFromBridgeTask(lggr, trrsMercuryV1[0], trrsMercuryV2, 2)
require.Equal(t, 123456.123456, benchmarkPrice)
Expand Down

0 comments on commit d27deaa

Please sign in to comment.