forked from choria-io/asyncjobs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrequest_reply_handler.go
101 lines (84 loc) · 3.07 KB
/
request_reply_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0
package asyncjobs
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/nats-io/nats.go"
)
const (
// RequestReplyContentTypeHeader is the header text sent to indicate the body encoding and type
RequestReplyContentTypeHeader = "AJ-Content-Type"
// RequestReplyDeadlineHeader is the header indicating the deadline for processing the item
RequestReplyDeadlineHeader = "AJ-Handler-Deadline"
// RequestReplyTerminateError is the header to send in a reply that the task should be terminated via ErrTerminateTask
RequestReplyTerminateError = "AJ-Terminate"
// RequestReplyError is the header indicating a generic failure in handling an item
RequestReplyError = "AJ-Error"
// RequestReplyTaskType is the content type indicating the payload is a Task in JSON format
RequestReplyTaskType = "application/x-asyncjobs-task+json"
)
type requestReplyHandler struct {
nc *nats.Conn
tt string
subj string
}
func newRequestReplyHandleFunc(nc *nats.Conn, tt string) HandlerFunc {
h := &requestReplyHandler{
nc: nc,
tt: tt,
}
h.subj = RequestReplySubjectForTaskType(tt)
return h.processTask
}
// RequestReplySubjectForTaskType returns the subject a request-reply handler should listen on for a specified task type
func RequestReplySubjectForTaskType(taskType string) string {
if taskType == "" {
return fmt.Sprintf(RequestReplyTaskHandlerPattern, "catchall")
}
return fmt.Sprintf(RequestReplyTaskHandlerPattern, taskType)
}
func (r *requestReplyHandler) processTask(ctx context.Context, logger Logger, task *Task) (any, error) {
if r.nc == nil {
return nil, fmt.Errorf("no connnection set")
}
var err error
deadline, ok := ctx.Deadline()
if !ok {
return nil, ErrRequestReplyNoDeadline
}
if time.Until(deadline) < 3*time.Second {
return nil, ErrRequestReplyShortDeadline
}
msg := nats.NewMsg(r.subj)
msg.Header.Add(RequestReplyContentTypeHeader, RequestReplyTaskType)
msg.Header.Add(RequestReplyDeadlineHeader, deadline.Add(-2*time.Second).UTC().Format(time.RFC3339))
msg.Data, err = json.Marshal(task)
if err != nil {
return nil, fmt.Errorf("could not encode task: %v", err)
}
logger.Infof("Calling request-reply handler on %s", msg.Subject)
res, err := r.nc.RequestMsgWithContext(ctx, msg)
switch {
case err == context.DeadlineExceeded:
logger.Errorf("Request-Reply callout failed, no response received within %v", deadline)
return nil, fmt.Errorf("%w: %v", ErrRequestReplyFailed, err)
case err == nats.ErrNoResponders:
logger.Errorf("Request-Reply handler failed, no responders on subject %s", msg.Subject)
return nil, fmt.Errorf("%w: %v", ErrRequestReplyFailed, err)
case err != nil:
logger.Errorf("Request-Reply handler failed: %v", err)
return nil, fmt.Errorf("%w: %v", ErrRequestReplyFailed, err)
}
if v := res.Header.Get(RequestReplyTerminateError); v != "" {
return res.Data, fmt.Errorf("%s: %w", v, ErrTerminateTask)
}
if v := res.Header.Get(RequestReplyError); v != "" {
return res.Data, errors.New(v)
}
return res.Data, nil
}