Skip to content

Commit

Permalink
Merge branch 'main' into es-kind
Browse files Browse the repository at this point in the history
  • Loading branch information
Manik2708 authored Jan 16, 2025
2 parents f08ccda + 2ee8e4c commit 9b2a169
Show file tree
Hide file tree
Showing 43 changed files with 223 additions and 128 deletions.
67 changes: 67 additions & 0 deletions .github/workflows/ci-lint-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,70 @@ jobs:
- name: Run unit tests for scripts
run: |
SHUNIT2=.tools/shunit2 bash scripts/utils/compute-tags.test.sh
binary-size-check:
runs-on: ubuntu-latest
steps:
- uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
with:
egress-policy: audit

- uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
with:
submodules: true

- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: 1.23.x

- name: Setup Node.js version
uses: ./.github/actions/setup-node.js

- name: Build jaeger binary
run: make build-jaeger

- name: Calculate jaeger binary size
run: |
TOTAL_SIZE=$(du -sb ./cmd/jaeger/jaeger-linux-amd64 | cut -f1)
echo "$TOTAL_SIZE" > ./new_jaeger_binary_size.txt
echo "Total binary size: $TOTAL_SIZE bytes"
- name: Restore previous binary size
id: cache-binary-size
uses: actions/cache/restore@1bd1e32a3bdc45362d1e726936510720a7c30a57 #v4.2.0
with:
path: ./jaeger_binary_size.txt
key: jaeger_binary_size
restore-keys: |
jaeger_binary_size
- name: Compare jaeger binary sizes
if: steps.cache-binary-size.outputs.cache-hit == 'true'
run: |
OLD_BINARY_SIZE=$(cat ./jaeger_binary_size.txt)
NEW_BINARY_SIZE=$(cat ./new_jaeger_binary_size.txt)
echo "Previous binary size: $OLD_BINARY_SIZE bytes"
echo "New binary size: $NEW_BINARY_SIZE bytes"
SIZE_CHANGE=$(( $NEW_BINARY_SIZE - $OLD_BINARY_SIZE ))
PERCENTAGE_CHANGE=$(( SIZE_CHANGE * 100 / $OLD_BINARY_SIZE ))
echo "Size change: $PERCENTAGE_CHANGE%"
if [ $PERCENTAGE_CHANGE -gt 2 ]; then
echo "❌ binary size increased by more than 2% ($PERCENTAGE_CHANGE%)"
exit 1
else
echo "✅ binary size change is within acceptable range ($PERCENTAGE_CHANGE%)"
fi

- name: Remove previous *_binary_*.txt
run: |
rm -rf ./jaeger_binary_size.txt
mv ./new_jaeger_binary_size.txt ./jaeger_binary_size.txt
- name: Save new jaeger binary size
if: ${{ (github.event_name == 'push') && (github.ref == 'refs/heads/main') }}
uses: actions/cache/save@1bd1e32a3bdc45362d1e726936510720a7c30a57 #v4.2.0
with:
path: ./jaeger_binary_size.txt
key: jaeger_binary_size_${{ github.run_id }}
4 changes: 2 additions & 2 deletions cmd/agent/app/httpserver/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

// NewHTTPServer creates a new server that hosts an HTTP/JSON endpoint for clients
// to query for sampling strategies.
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory, logger *zap.Logger) *http.Server {
handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
handler := samplinghttp.NewHandler(samplinghttp.HandlerParams{
ConfigManager: manager,
MetricsFactory: mFactory,
LegacySamplingEndpoint: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down
11 changes: 9 additions & 2 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
traces []ptrace.Traces
tenants map[string]bool
transport processor.InboundTransport
spanFormat processor.SpanFormat
Expand All @@ -41,8 +42,8 @@ func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batc
defer p.mux.Unlock()
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
}, func(td ptrace.Traces) {
p.traces = append(p.traces, td)
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
Expand All @@ -60,6 +61,12 @@ func (p *mockSpanProcessor) getSpans() []*model.Span {
return p.spans
}

func (p *mockSpanProcessor) getTraces() []ptrace.Traces {
p.mux.Lock()
defer p.mux.Unlock()
return p.traces
}

func (p *mockSpanProcessor) getTenants() map[string]bool {
p.mux.Lock()
defer p.mux.Unlock()
Expand Down
48 changes: 24 additions & 24 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var _ component.Host = (*otelHost)(nil) // API check
Expand Down Expand Up @@ -72,9 +71,14 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create the OTLP consumer: %w", err)
}
Expand All @@ -93,29 +97,25 @@ func startOTLPReceiver(
return otlpReceiver, nil
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
}

type consumerDelegate struct {
batchConsumer batchConsumer
type consumerHelper struct {
batchConsumer
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
return err
}
func (ch *consumerHelper) consume(ctx context.Context, td ptrace.Traces) error {
tenant, err := ch.validateTenant(ctx)
if err != nil {
ch.logger.Debug("rejecting spans (tenancy)", zap.Error(err))
return err
}
return nil
_, err = ch.spanProcessor.ProcessSpans(ctx, processor.SpansV2{
Traces: td,
Details: processor.Details{
InboundTransport: ch.spanOptions.InboundTransport,
SpanFormat: ch.spanOptions.SpanFormat,
Tenant: tenant,
},
})
return err
}

var _ componentstatus.Reporter = (*otelHost)(nil)
Expand Down
75 changes: 46 additions & 29 deletions cmd/collector/app/handler/otlp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,8 +20,10 @@ import (
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
)
Expand Down Expand Up @@ -49,7 +52,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions {

func TestStartOtlpReceiver(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
tm := &tenancy.Manager{}
rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm)
require.NoError(t, err)
Expand All @@ -70,36 +73,9 @@ func makeTracesOneSpan() ptrace.Traces {
return traces
}

func TestConsumerDelegate(t *testing.T) {
testCases := []struct {
expectErr error
expectLog string
}{
{}, // no errors
{expectErr: errors.New("test-error"), expectLog: "test-error"},
}
for _, test := range testCases {
t.Run(test.expectLog, func(t *testing.T) {
logger, logBuf := testutils.NewLogger()
spanProcessor := &mockSpanProcessor{expectedError: test.expectErr}
consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.Manager{})

err := consumer.consume(context.Background(), makeTracesOneSpan())

if test.expectErr != nil {
require.Equal(t, test.expectErr, err)
assert.Contains(t, logBuf.String(), test.expectLog)
} else {
require.NoError(t, err)
assert.Len(t, spanProcessor.getSpans(), 1)
}
})
}
}

func TestStartOtlpReceiver_Error(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
opts := optionsWithPorts(":-1")
tm := &tenancy.Manager{}
_, err := StartOTLPReceiver(opts, logger, spanProcessor, tm)
Expand Down Expand Up @@ -139,3 +115,44 @@ func TestOtelHost(t *testing.T) {
assert.Nil(t, host.GetExtensions())
assert.Nil(t, host.GetExporters())
}

func TestConsumerHelper(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.NoError(t, err)
assert.Eventually(t, func() bool {
return len(spanProcessor.getTraces()) == 1
}, time.Second, time.Millisecond, "spanProcessor should have received one span")
assert.Empty(t, spanProcessor.getSpans())
}

func TestConsumerHelper_Consume_Error(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{expectedError: assert.AnError},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorIs(t, err, assert.AnError)
}

func TestConsumerHelper_Consume_TenantError(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{Enabled: true}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorContains(t, err, "missing tenant header")
}
12 changes: 8 additions & 4 deletions cmd/collector/app/handler/zipkin_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func startZipkinReceiver(
},
}

consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm)
// reset Zipkin spanFormat
consumerAdapter.batchConsumer.spanOptions.SpanFormat = processor.ZipkinSpanFormat
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.HTTPTransport,
processor.ZipkinSpanFormat,
tm),
}

nextConsumer, err := newTraces(consumerAdapter.consume)
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin consumer: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
samplinggrpc "github.com/jaegertracing/jaeger/internal/sampling/grpc"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/httpmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -59,8 +59,8 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
apiHandler := handler.NewAPIHandler(params.Handler)
apiHandler.RegisterRoutes(r)

cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{
ConfigManager: &clientcfgHandler.ConfigManager{
cfgHandler := samplinghttp.NewHandler(samplinghttp.HandlerParams{
ConfigManager: &samplinghttp.ConfigManager{
SamplingProvider: params.SamplingProvider,
},
MetricsFactory: params.MetricsFactory,
Expand Down
10 changes: 5 additions & 5 deletions cmd/jaeger/internal/extension/remotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/internal/leaderelection"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
samplinggrpc "github.com/jaegertracing/jaeger/internal/sampling/grpc"
"github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
Expand Down Expand Up @@ -229,8 +229,8 @@ func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error
func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error {
mf := otelmetrics.NewFactory(ext.telemetry.MeterProvider)
mf = mf.Namespace(metrics.NSOptions{Name: "jaeger_remote_sampling"})
handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
ConfigManager: &clientcfghttp.ConfigManager{
handler := samplinghttp.NewHandler(samplinghttp.HandlerParams{
ConfigManager: &samplinghttp.ConfigManager{
SamplingProvider: ext.strategyProvider,
},
MetricsFactory: mf,
Expand Down
Loading

0 comments on commit 9b2a169

Please sign in to comment.