Skip to content

Commit

Permalink
Merge pull request #11 from logzio/aws-dev
Browse files Browse the repository at this point in the history
v1.0.2
  • Loading branch information
yotamloe authored Sep 10, 2023
2 parents 4419b91 + 176620b commit 8d3ded5
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/upload-zip.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Upload release

on:
release:
types: [published]

jobs:
upload:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
- name: Build function
run: make function
- name: Upload release asset
uses: actions/upload-release-asset@v1
with:
asset_path: ./function.zip
asset_name: function.zip
asset_content_type: application/zip
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,16 @@ This function has the following limitations:

- It can only process metrics data in OTLP 0.7 format.
- It can only forward the data to a Prometheus Remote Write endpoint.

### Changelog

- v1.0.2
- Stop trying to send bulks if encountered 401 status code
- Add logzio identifier to each log (5 last chars of the shipping token)
- Add zip workflow and artifact
- v1.0.1
- Improved logging (Add `zap` logger)
- Add metadata (AWS account, firehose request id, lambda invocation id) to each log for context
- Flush buffered logs if exists, before the function run ends
- v1.0.0
- Initial release: Lambda function that receives OTLP (0.7.0) data from AWS metric stream and exports the data to logz.io using Prometheus remote write
23 changes: 15 additions & 8 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ func generateValidFirehoseResponse(statusCode int, requestId string, errorMessag
}
}
}
func initLogger(ctx context.Context, request events.APIGatewayProxyRequest) zap.SugaredLogger {
func initLogger(ctx context.Context, request events.APIGatewayProxyRequest, token string) zap.SugaredLogger {
awsRequestId := ""
account := ""
logzioIdentifier := ""
lambdaContext, ok := lambdacontext.FromContext(ctx)
if ok {
awsRequestId = lambdaContext.AwsRequestID
Expand All @@ -98,14 +99,18 @@ func initLogger(ctx context.Context, request events.APIGatewayProxyRequest) zap.
if len(awsAccount) > 4 {
account = awsAccount[4]
}
if len(token) >= 5 {
logzioIdentifier = token[len(token)-5:]
}
firehoseRequestId := request.Headers["X-Amz-Firehose-Request-Id"]
config := zap.NewProductionConfig()
config.EncoderConfig.StacktraceKey = "" // to hide stacktrace info
config.OutputPaths = []string{"stdout"} // write to stdout
config.InitialFields = map[string]interface{}{
"aws_account": account,
"lambda_invocation_id": awsRequestId,
"firehose_request_id": firehoseRequestId,
"aws_account": account,
"lambda_invocation_id": awsRequestId,
"firehose_request_id": firehoseRequestId,
"logzio_account_identifier": logzioIdentifier,
}
logger, configErr := config.Build()
if configErr != nil {
Expand Down Expand Up @@ -250,13 +255,9 @@ func summaryValuesToMetrics(metricsToSendSlice pdata.InstrumentationLibraryMetri
}
}
func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
log := initLogger(ctx, request)
// flush buffered logs if exists, before the function run ends
defer log.Sync()
metricCount := 0
dataPointCount := 0
shippingErrors := new(ErrorCollector)
log.Infof("Getting access key from headers")
// get requestId to match firehose response requirements
requestId := request.Headers["X-Amz-Firehose-Request-Id"]
if requestId == "" {
Expand All @@ -266,6 +267,9 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
if LogzioToken == "" {
LogzioToken = request.Headers["x-amz-firehose-access-key"]
}
log := initLogger(ctx, request, LogzioToken)
// flush buffered logs if exists, before the function run ends
defer log.Sync()
if LogzioToken == "" {
accessKeyErr := errors.New("cant find access key in 'X-Amz-Firehose-Access-Key' or 'x-amz-firehose-access-key' headers")
log.Error(accessKeyErr)
Expand Down Expand Up @@ -368,6 +372,9 @@ func HandleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (
err = metricsExporter.PushMetrics(ctx, metricsToSend)
if err != nil {
log.Warnf("Error while sending metrics: %s", err)
if strings.Contains(err.Error(), "status 401") {
return generateValidFirehoseResponse(400, requestId, "Error while sending metrics:", err), nil
}
shippingErrors.Collect(err)
} else {
numberOfMetrics, numberOfDataPoints := metricsToSend.MetricAndDataPointCount()
Expand Down
2 changes: 1 addition & 1 deletion handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestHandleRequestErrors(t *testing.T) {
expected int
}
var getListenerUrlTests = []getListenerUrlTest{
{"noValidToken", 500},
{"noValidToken", 400},
{"noToken", 400},
{"malformedBody", 400},
{"simpleevent", 400},
Expand Down

0 comments on commit 8d3ded5

Please sign in to comment.