From db68e16e3a96b80d9c54113e1061adc88f5cf83f Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Wed, 15 May 2024 16:05:21 +0800 Subject: [PATCH 1/2] feat: pex support parent task Signed-off-by: Jim Ma --- client/config/headers.go | 2 + client/daemon/peer/peertask_stream.go | 8 +++ client/daemon/transport/transport.go | 73 +++++++++++++++++++-------- 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/client/config/headers.go b/client/config/headers.go index 2f3fff6e5f7..3be169a7ff3 100644 --- a/client/config/headers.go +++ b/client/config/headers.go @@ -37,4 +37,6 @@ const ( HeaderDragonflyObjectMetaStorageClass = "X-Dragonfly-Object-Meta-Storage-Class" // HeaderDragonflyObjectOperation is used for object storage operation. HeaderDragonflyObjectOperation = "X-Dragonfly-Object-Operation" + // HeaderDragonflyForwardedFrom is used to mark http request forwarded from other peers + HeaderDragonflyForwardedFrom = "X-Dragonfly-Forwarded-From" ) diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 08a6d93fb08..287c39613cf 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -56,6 +56,14 @@ func (req *StreamTaskRequest) TaskID() string { return req.taskID } +func (req *StreamTaskRequest) HasParentTask() bool { + return req.Range != nil +} + +func (req *StreamTaskRequest) ParentTaskID() string { + return idgen.ParentTaskIDV1(req.URL, req.URLMeta) +} + // StreamTask represents a peer task with stream io for reading directly without once more disk io type StreamTask interface { // Start starts the special peer task, return an io.Reader for stream io diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index 6ac6025b7e2..e9d0e20f658 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -310,6 +310,7 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res filter := nethttp.PickHeader(req.Header, config.HeaderDragonflyFilter, rt.defaultFilter) tag := nethttp.PickHeader(req.Header, config.HeaderDragonflyTag, rt.defaultTag) application := nethttp.PickHeader(req.Header, config.HeaderDragonflyApplication, rt.defaultApplication) + forwarded := nethttp.PickHeader(req.Header, config.HeaderDragonflyForwardedFrom, "") var priority = rt.defaultPriority priorityString := nethttp.PickHeader(req.Header, config.HeaderDragonflyPriority, fmt.Sprintf("%d", rt.defaultPriority)) priorityInt, err := strconv.ParseInt(priorityString, 10, 32) @@ -336,30 +337,16 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res log := logger.With(append(logKV, "task", taskID)...) log.Infof("start download with url: %s", reqURL) - log.Debugf("request url: %s, with header: %#v", reqURL, req.Header) - - if rt.peerSearcher != nil { - searchPeerResult := rt.peerSearcher.SearchPeer(taskID) - switch searchPeerResult.Type { - case pex.SearchPeerResultTypeLocal: - log.Debugf("local peer exists") - goto local - case pex.SearchPeerResultTypeReplica: - log.Debugf("make replica from other peers") - goto local - case pex.SearchPeerResultTypeNotFound: - log.Debugf("no available peer after search peer") - goto local - case pex.SearchPeerResultTypeRemote: - resp, err := rt.proxyToPeers(log, req, searchPeerResult.Peers) - if err == nil { - return resp, nil - } - log.Warnf("proxy to other peers error: %s, fallback to local", err) + log.Debugf("request url: %s, with header: %#v, forwarded from: %q", reqURL, req.Header, forwarded) + + // check whether the request is forwarded to avoid infinity forward + if rt.peerSearcher != nil && forwarded == "" { + resp := rt.tryProxyToPeers(log, req, streamTaskRequest) + if resp != nil { + return resp, nil } } -local: body, attr, err := rt.peerTaskManager.StartStreamTask(ctx, streamTaskRequest) if err != nil { log.Errorf("start stream task error: %v", err) @@ -426,6 +413,50 @@ local: return resp, nil } +func (rt *transport) tryProxyToPeers( + log *logger.SugaredLoggerOnWith, + request *http.Request, + taskRequest *peer.StreamTaskRequest) (resp *http.Response) { + // search normal peers + resp = rt.searchPeers(log, request, taskRequest.TaskID(), taskRequest.PeerID) + if resp != nil { + return resp + } + // search parent peers + if taskRequest.HasParentTask() { + parentTaskID := taskRequest.ParentTaskID() + resp = rt.searchPeers(log, request, parentTaskID, taskRequest.PeerID) + if resp != nil { + log.Debugf("proxy to parent task %s done", parentTaskID) + return resp + } + } + return nil +} + +func (rt *transport) searchPeers(log *logger.SugaredLoggerOnWith, request *http.Request, taskID, sourcePeerID string) *http.Response { + searchPeerResult := rt.peerSearcher.SearchPeer(taskID) + switch searchPeerResult.Type { + case pex.SearchPeerResultTypeLocal: + log.Debugf("local peer exists") + case pex.SearchPeerResultTypeReplica: + log.Debugf("make replica from other peers") + case pex.SearchPeerResultTypeNotFound: + log.Debugf("no available peer after search peer") + case pex.SearchPeerResultTypeRemote: + // mark request is already forwarded to avoid infinity forward + request.Header.Set(config.HeaderDragonflyForwardedFrom, sourcePeerID) + resp, err := rt.proxyToPeers(log, request, searchPeerResult.Peers) + if err == nil { + return resp + } + // forward failed, remove marked header + request.Header.Del(config.HeaderDragonflyForwardedFrom) + log.Warnf("proxy to other peers error: %s, fallback to local", err) + } + return nil +} + func (rt *transport) proxyToPeers(log *logger.SugaredLoggerOnWith, req *http.Request, peers []*pex.DestPeer) (*http.Response, error) { // shuffle peers if len(peers) > 1 { From 3772d69b09fdfc71ccc0fd4c0fdec0f98e948f16 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Thu, 16 May 2024 14:34:15 +0800 Subject: [PATCH 2/2] chor: update header Signed-off-by: Jim Ma --- client/config/headers.go | 4 ++-- client/daemon/transport/transport.go | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/client/config/headers.go b/client/config/headers.go index 3be169a7ff3..831a94ea03d 100644 --- a/client/config/headers.go +++ b/client/config/headers.go @@ -37,6 +37,6 @@ const ( HeaderDragonflyObjectMetaStorageClass = "X-Dragonfly-Object-Meta-Storage-Class" // HeaderDragonflyObjectOperation is used for object storage operation. HeaderDragonflyObjectOperation = "X-Dragonfly-Object-Operation" - // HeaderDragonflyForwardedFrom is used to mark http request forwarded from other peers - HeaderDragonflyForwardedFrom = "X-Dragonfly-Forwarded-From" + // HeaderDragonflyForwardedFor is used to mark http request forwarded from other peers + HeaderDragonflyForwardedFor = "X-Dragonfly-Forwarded-For" ) diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index e9d0e20f658..7b44439cbf3 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -178,13 +178,11 @@ func peerProxyCacheLoaderFunc(c *ttlcache.Cache[string, *http.Transport], hostPo } roundTripper := &http.Transport{ Proxy: http.ProxyURL(proxyURL), - DialContext: func(dialer *net.Dialer) func(context.Context, string, string) (net.Conn, error) { - return dialer.DialContext - }(&net.Dialer{ + DialContext: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, - }), - MaxIdleConns: 100, + }).DialContext, + MaxIdleConns: 1000, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, @@ -310,7 +308,7 @@ func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Res filter := nethttp.PickHeader(req.Header, config.HeaderDragonflyFilter, rt.defaultFilter) tag := nethttp.PickHeader(req.Header, config.HeaderDragonflyTag, rt.defaultTag) application := nethttp.PickHeader(req.Header, config.HeaderDragonflyApplication, rt.defaultApplication) - forwarded := nethttp.PickHeader(req.Header, config.HeaderDragonflyForwardedFrom, "") + forwarded := nethttp.PickHeader(req.Header, config.HeaderDragonflyForwardedFor, "") var priority = rt.defaultPriority priorityString := nethttp.PickHeader(req.Header, config.HeaderDragonflyPriority, fmt.Sprintf("%d", rt.defaultPriority)) priorityInt, err := strconv.ParseInt(priorityString, 10, 32) @@ -445,13 +443,13 @@ func (rt *transport) searchPeers(log *logger.SugaredLoggerOnWith, request *http. log.Debugf("no available peer after search peer") case pex.SearchPeerResultTypeRemote: // mark request is already forwarded to avoid infinity forward - request.Header.Set(config.HeaderDragonflyForwardedFrom, sourcePeerID) + request.Header.Set(config.HeaderDragonflyForwardedFor, sourcePeerID) resp, err := rt.proxyToPeers(log, request, searchPeerResult.Peers) if err == nil { return resp } // forward failed, remove marked header - request.Header.Del(config.HeaderDragonflyForwardedFrom) + request.Header.Del(config.HeaderDragonflyForwardedFor) log.Warnf("proxy to other peers error: %s, fallback to local", err) } return nil