Skip to content

Commit

Permalink
Add Hard/Soft Timeouts Based On Lambda Deadline (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolanos authored Jun 7, 2021
1 parent 5facc53 commit d49852d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 10 deletions.
20 changes: 10 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba
select {
case <-ctx.Done():
// We're already done
util.Logln(ctx.Err())
return eventCounter, ""
default:
// Our call to next blocks. It is likely that the container is frozen immediately after we call NextEvent.
Expand All @@ -201,7 +200,6 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba
eventStart := time.Now()

if err != nil {

util.Logln(err)
err = invocationClient.ExitError(ctx, "NextEventError.Main", err)
if err != nil {
Expand All @@ -224,7 +222,6 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba
util.Logf("We suspected a timeout for request %s but got telemetry anyway", lastRequestId)
default:
}

}

invokedFunctionARN = event.InvokedFunctionARN
Expand Down Expand Up @@ -260,16 +257,19 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba

// Set the timeout timer for a smidge before the actual timeout;
// we can recover from early.
timeoutWatchBegins := time.Millisecond * 100
timeout := timeoutInstant.Sub(time.Now()) - timeoutWatchBegins
timeoutWatchBegins := 100 * time.Millisecond
hardTimeout := timeoutInstant.Sub(time.Now())
softTimeout := hardTimeout - timeoutWatchBegins

hardCtx, hardCancel := context.WithTimeout(ctx, hardTimeout)
defer hardCancel()

invCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
softCtx, softCancel := context.WithTimeout(hardCtx, softTimeout)
defer softCancel()

select {
case <-invCtx.Done():
case <-softCtx.Done():
// We are about to timeout
util.Debugln("Timeout suspected: ", invCtx.Err())
probablyTimeout = true
continue
case telemetryBytes := <-telemetryChan:
Expand All @@ -282,7 +282,7 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba

pollLogServer(logServer, batch)
harvested := batch.Harvest(time.Now())
shipHarvest(ctx, harvested, telemetryClient, invokedFunctionARN)
shipHarvest(hardCtx, harvested, telemetryClient, invokedFunctionARN)
}

lastEventStart = eventStart
Expand Down
113 changes: 113 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/newrelic/newrelic-lambda-extension/lambda/extension/api"
"github.com/newrelic/newrelic-lambda-extension/util"
Expand Down Expand Up @@ -532,6 +534,117 @@ func TestMainTimeout(t *testing.T) {
assert.Equal(t, 1, nextEventRequestCount)
}

func TestMainTimeoutUnreachable(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(200*time.Millisecond))
defer cancel()
overrideContext(ctx)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer util.Close(r.Body)

if r.URL.Path == "/2020-01-01/extension/register" {
w.Header().Add(api.ExtensionIdHeader, "test-ext-id")
w.WriteHeader(200)
res, err := json.Marshal(api.RegistrationResponse{
FunctionName: "foobar",
FunctionVersion: "$latest",
Handler: "lambda.handler",
})
assert.Nil(t, err)
_, _ = w.Write(res)
}

if r.URL.Path == "/2020-01-01/extension/init/error" {
w.WriteHeader(200)
_, _ = w.Write([]byte(""))
}

if r.URL.Path == "/2020-01-01/extension/exit/error" {
w.WriteHeader(200)
_, _ = w.Write(nil)
}

if r.URL.Path == "/2020-08-15/logs" {
w.WriteHeader(200)
_, _ = w.Write(nil)
}

if r.URL.Path == "/2020-01-01/extension/event/next" {
time.Sleep(25 * time.Millisecond)

w.WriteHeader(200)
res, err := json.Marshal(api.InvocationEvent{
EventType: api.Invoke,
DeadlineMs: 100,
RequestID: "12345",
InvokedFunctionARN: "arn:aws:lambda:us-east-1:12345:foobar",
ShutdownReason: "",
Tracing: nil,
})
assert.Nil(t, err)
_, _ = w.Write(res)
}

if r.URL.Path == "/aws/lambda/v1" {
time.Sleep(5 * time.Second)

w.WriteHeader(200)
_, _ = w.Write(nil)
}
}))
defer srv.Close()

url := srv.URL[7:]

_ = os.Setenv(api.LambdaHostPortEnvVar, url)
defer os.Unsetenv(api.LambdaHostPortEnvVar)

_ = os.Setenv("NEW_RELIC_LICENSE_KEY", "foobar")
defer os.Unsetenv("NEW_RELIC_LICENSE_KEY")

_ = os.Setenv("NEW_RELIC_LOG_SERVER_HOST", "localhost")
defer os.Unsetenv("NEW_RELIC_LOG_SERVER_HOST")

_ = os.Setenv("NEW_RELIC_EXTENSION_LOG_LEVEL", "DEBUG")
defer os.Unsetenv("NEW_RELIC_EXTENSION_LOG_LEVEL")

_ = os.Setenv("NEW_RELIC_TELEMETRY_ENDPOINT", fmt.Sprintf("%s/aws/lambda/v1", srv.URL))
defer os.Unsetenv("NEW_RELIC_TELEMETRY_ENDPOINT")

_ = os.Remove("/tmp/newrelic-telemetry")

go func() {
pipeOpened := false

for {
select {
case <-ctx.Done():
return
default:
if _, err := os.Stat("/tmp/newrelic-telemetry"); os.IsNotExist(err) {
if pipeOpened {
return
} else {
continue
}
} else {
pipeOpened = true
}

pipe, err := os.OpenFile("/tmp/newrelic-telemetry", os.O_WRONLY, 0)
assert.Nil(t, err)
defer pipe.Close()

pipe.WriteString("foobar\n")
pipe.Close()
time.Sleep(100 * time.Millisecond)
}
}
}()

assert.NotPanics(t, main)
}

func overrideContext(ctx context.Context) {
rootCtx = ctx
}
1 change: 1 addition & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (c *Client) sendPayloads(compressedPayloads []*bytes.Buffer, builder reques
successCount += 1
}
}

return successCount, sentBytes, nil
}

Expand Down

0 comments on commit d49852d

Please sign in to comment.