From 68e1c41ea1ecedc36415696d1c4348cfa834b414 Mon Sep 17 00:00:00 2001 From: yk <1876421041@qq.com> Date: Wed, 28 Feb 2024 17:06:37 +0800 Subject: [PATCH 1/8] Wait until all data in buffer is flushed to client when upstream server finishes. (cherry picked from commit d7765067b0c7d4a8b0bf5548bcd3b9a77b73d0b1) --- proxy/http/handler.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/proxy/http/handler.py b/proxy/http/handler.py index b8d207d561..26ff8ed182 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -37,6 +37,8 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]): Accepts `Client` connection and delegates to HttpProtocolHandlerPlugin. """ + server_teared = False + def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.start_time: float = time.time() @@ -145,15 +147,19 @@ async def handle_events( teardown = await self.plugin.write_to_descriptors(writables) if teardown: return True - # Read from ready to read sockets - teardown = await self.handle_readables(readables) - if teardown: - return True - # Invoke plugin.read_from_descriptors - if self.plugin: - teardown = await self.plugin.read_from_descriptors(readables) + if not self.server_teared: + # Read from ready to read sockets + teardown = await self.handle_readables(readables) if teardown: - return True + self.server_teared = True + # Invoke plugin.read_from_descriptors + if self.plugin: + teardown = await self.plugin.read_from_descriptors(readables) + if teardown: + self.server_teared = True + if self.server_teared and not self.work.has_buffer(): + # Wait until client buffer flushed when server teared down. + return True return False def handle_data(self, data: memoryview) -> Optional[bool]: From 5fd393c0d30dc56bc2d7591ef297b953589d055e Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Sat, 13 Apr 2024 13:51:39 +0530 Subject: [PATCH 2/8] Wait until buffer flush --- helper/monitor_open_files.sh | 23 +++++++++++-------- proxy/http/handler.py | 43 ++++++++++++++++++------------------ proxy/http/proxy/server.py | 12 ++++++---- 3 files changed, 44 insertions(+), 34 deletions(-) diff --git a/helper/monitor_open_files.sh b/helper/monitor_open_files.sh index f353e0db15..b340867377 100755 --- a/helper/monitor_open_files.sh +++ b/helper/monitor_open_files.sh @@ -20,15 +20,20 @@ if [[ -z "$PROXY_PY_PID" ]]; then exit 1 fi -OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l) -echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN" +while true; +do + OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l) + echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN" -pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do - OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l) - echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR" + pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do + OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l) + echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR" - pgrep -P "$acceptorPid" | while read -r childPid; do - OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l) - echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC" + pgrep -P "$acceptorPid" | while read -r childPid; do + OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l) + echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC" + done done -done + + sleep 1 +done \ No newline at end of file diff --git a/proxy/http/handler.py b/proxy/http/handler.py index 26ff8ed182..5b74117ddb 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -37,8 +37,6 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]): Accepts `Client` connection and delegates to HttpProtocolHandlerPlugin. """ - server_teared = False - def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.start_time: float = time.time() @@ -131,6 +129,9 @@ async def get_events(self) -> SelectableEvents: events[wfileno] |= selectors.EVENT_WRITE return events + writes_teardown: bool = False + reads_teardown: bool = False + # We override super().handle_events and never call it async def handle_events( self, @@ -139,26 +140,26 @@ async def handle_events( ) -> bool: """Returns True if proxy must tear down.""" # Flush buffer for ready to write sockets - teardown = await self.handle_writables(writables) - if teardown: - return True - # Invoke plugin.write_to_descriptors - if self.plugin: - teardown = await self.plugin.write_to_descriptors(writables) - if teardown: - return True - if not self.server_teared: - # Read from ready to read sockets - teardown = await self.handle_readables(readables) - if teardown: - self.server_teared = True - # Invoke plugin.read_from_descriptors + self.writes_teardown = await self.handle_writables(writables) + if not self.writes_teardown: + # Invoke plugin.write_to_descriptors if self.plugin: - teardown = await self.plugin.read_from_descriptors(readables) - if teardown: - self.server_teared = True - if self.server_teared and not self.work.has_buffer(): - # Wait until client buffer flushed when server teared down. + self.writes_teardown = await self.plugin.write_to_descriptors(writables) + if not self.writes_teardown: + # Read from ready to read sockets + self.reads_teardown = await self.handle_readables(readables) + if not self.reads_teardown: + # Invoke plugin.read_from_descriptors + if self.plugin: + self.reads_teardown = await self.plugin.read_from_descriptors( + readables + ) + # Wait until client buffer has flushed when reads has teared down, but we can still write + if ( + self.reads_teardown + and not self.writes_teardown + and not self.work.has_buffer() + ): return True return False diff --git a/proxy/http/proxy/server.py b/proxy/http/proxy/server.py index f18f45fc55..70d3369ec4 100644 --- a/proxy/http/proxy/server.py +++ b/proxy/http/proxy/server.py @@ -175,7 +175,11 @@ async def get_descriptors(self) -> Descriptors: return r, w async def write_to_descriptors(self, w: Writables) -> bool: - if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream: + if ( + self.upstream + and not self.upstream.closed + and self.upstream.connection.fileno() not in w + ) or not self.upstream: # Currently, we just call write/read block of each plugins. It is # plugins responsibility to ignore this callback, if passed descriptors # doesn't contain the descriptor they registered. @@ -208,9 +212,9 @@ async def write_to_descriptors(self, w: Writables) -> bool: async def read_from_descriptors(self, r: Readables) -> bool: if ( - self.upstream and not - self.upstream.closed and - self.upstream.connection.fileno() not in r + self.upstream + and not self.upstream.closed + and self.upstream.connection.fileno() not in r ) or not self.upstream: # Currently, we just call write/read block of each plugins. It is # plugins responsibility to ignore this callback, if passed descriptors From a4be18398e6d373b7ce96e5f98c7547544373631 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Apr 2024 08:22:45 +0000 Subject: [PATCH 3/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- helper/monitor_open_files.sh | 2 +- proxy/http/handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/helper/monitor_open_files.sh b/helper/monitor_open_files.sh index b340867377..d4214e8bb6 100755 --- a/helper/monitor_open_files.sh +++ b/helper/monitor_open_files.sh @@ -36,4 +36,4 @@ do done sleep 1 -done \ No newline at end of file +done diff --git a/proxy/http/handler.py b/proxy/http/handler.py index 5b74117ddb..b8c767aeeb 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -152,7 +152,7 @@ async def handle_events( # Invoke plugin.read_from_descriptors if self.plugin: self.reads_teardown = await self.plugin.read_from_descriptors( - readables + readables, ) # Wait until client buffer has flushed when reads has teared down, but we can still write if ( From b4625dc4e2049474c9d26956b73d88f7bbf3a6ce Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Sat, 13 Apr 2024 13:58:31 +0530 Subject: [PATCH 4/8] Avoid shadowing --- proxy/http/handler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/proxy/http/handler.py b/proxy/http/handler.py index b8c767aeeb..a30036f757 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -49,6 +49,8 @@ def __init__(self, *args: Any, **kwargs: Any): if not self.flags.threadless: self.selector = selectors.DefaultSelector() self.plugin: Optional[HttpProtocolHandlerPlugin] = None + self.writes_teardown: bool = False + self.reads_teardown: bool = False ## # initialize, is_inactive, shutdown, get_events, handle_events @@ -129,9 +131,6 @@ async def get_events(self) -> SelectableEvents: events[wfileno] |= selectors.EVENT_WRITE return events - writes_teardown: bool = False - reads_teardown: bool = False - # We override super().handle_events and never call it async def handle_events( self, From 963eef0b3582243667290558d8a623d5c1993925 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Sat, 13 Apr 2024 14:08:12 +0530 Subject: [PATCH 5/8] _teared not _teardown --- proxy/http/handler.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/proxy/http/handler.py b/proxy/http/handler.py index a30036f757..2e1b0f2648 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -49,8 +49,8 @@ def __init__(self, *args: Any, **kwargs: Any): if not self.flags.threadless: self.selector = selectors.DefaultSelector() self.plugin: Optional[HttpProtocolHandlerPlugin] = None - self.writes_teardown: bool = False - self.reads_teardown: bool = False + self.writes_teared: bool = False + self.reads_teared: bool = False ## # initialize, is_inactive, shutdown, get_events, handle_events @@ -139,26 +139,22 @@ async def handle_events( ) -> bool: """Returns True if proxy must tear down.""" # Flush buffer for ready to write sockets - self.writes_teardown = await self.handle_writables(writables) - if not self.writes_teardown: + self.writes_teared = await self.handle_writables(writables) + if not self.writes_teared: # Invoke plugin.write_to_descriptors if self.plugin: - self.writes_teardown = await self.plugin.write_to_descriptors(writables) - if not self.writes_teardown: + self.writes_teared = await self.plugin.write_to_descriptors(writables) + if not self.writes_teared: # Read from ready to read sockets - self.reads_teardown = await self.handle_readables(readables) - if not self.reads_teardown: + self.reads_teared = await self.handle_readables(readables) + if not self.reads_teared: # Invoke plugin.read_from_descriptors if self.plugin: - self.reads_teardown = await self.plugin.read_from_descriptors( + self.reads_teared = await self.plugin.read_from_descriptors( readables, ) - # Wait until client buffer has flushed when reads has teared down, but we can still write - if ( - self.reads_teardown - and not self.writes_teardown - and not self.work.has_buffer() - ): + # Wait until client buffer has flushed when reads has teared down but we can still write + if self.reads_teared and not self.writes_teared and not self.work.has_buffer(): return True return False From fec5517603bcf10739b351f58d9faf2ddc95035e Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Sat, 13 Apr 2024 14:24:16 +0530 Subject: [PATCH 6/8] Refactor logic --- proxy/http/handler.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/proxy/http/handler.py b/proxy/http/handler.py index 2e1b0f2648..e629431913 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -140,21 +140,23 @@ async def handle_events( """Returns True if proxy must tear down.""" # Flush buffer for ready to write sockets self.writes_teared = await self.handle_writables(writables) - if not self.writes_teared: - # Invoke plugin.write_to_descriptors + if self.writes_teared: + return True + # Invoke plugin.write_to_descriptors + if self.plugin: + self.writes_teared = await self.plugin.write_to_descriptors(writables) + if self.writes_teared: + return True + # Read from ready to read sockets + self.reads_teared = await self.handle_readables(readables) + if not self.reads_teared: + # Invoke plugin.read_from_descriptors if self.plugin: - self.writes_teared = await self.plugin.write_to_descriptors(writables) - if not self.writes_teared: - # Read from ready to read sockets - self.reads_teared = await self.handle_readables(readables) - if not self.reads_teared: - # Invoke plugin.read_from_descriptors - if self.plugin: - self.reads_teared = await self.plugin.read_from_descriptors( - readables, - ) + self.reads_teared = await self.plugin.read_from_descriptors( + readables, + ) # Wait until client buffer has flushed when reads has teared down but we can still write - if self.reads_teared and not self.writes_teared and not self.work.has_buffer(): + if self.reads_teared and not self.work.has_buffer(): return True return False From 631626496326e521089f3a5d18479b0c5fe9b753 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Sat, 13 Apr 2024 14:36:11 +0530 Subject: [PATCH 7/8] Do not try `read_from_descriptors` if reads have previously teared down --- proxy/core/connection/connection.py | 6 +++++- proxy/http/handler.py | 15 ++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index d0bebe26db..69803702d1 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -87,7 +87,11 @@ def flush(self, max_send_size: Optional[int] = None) -> int: # TODO: Assemble multiple packets if total # size remains below max send size. max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE - sent: int = self.send(mv[:max_send_size]) + try: + sent: int = self.send(mv[:max_send_size]) + except BlockingIOError: + logger.warning("BlockingIOError when trying send to {0}".format(self.tag)) + return 0 if sent == len(mv): self.buffer.pop(0) self._num_buffer -= 1 diff --git a/proxy/http/handler.py b/proxy/http/handler.py index e629431913..581e911a33 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -147,14 +147,15 @@ async def handle_events( self.writes_teared = await self.plugin.write_to_descriptors(writables) if self.writes_teared: return True - # Read from ready to read sockets - self.reads_teared = await self.handle_readables(readables) + # Read from ready to read sockets if reads have not already teared down if not self.reads_teared: - # Invoke plugin.read_from_descriptors - if self.plugin: - self.reads_teared = await self.plugin.read_from_descriptors( - readables, - ) + self.reads_teared = await self.handle_readables(readables) + if not self.reads_teared: + # Invoke plugin.read_from_descriptors + if self.plugin: + self.reads_teared = await self.plugin.read_from_descriptors( + readables, + ) # Wait until client buffer has flushed when reads has teared down but we can still write if self.reads_teared and not self.work.has_buffer(): return True From db11006326684371e581e47e4ca432870c674f22 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Apr 2024 09:06:54 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- proxy/core/connection/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 69803702d1..63cb62e316 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -90,7 +90,7 @@ def flush(self, max_send_size: Optional[int] = None) -> int: try: sent: int = self.send(mv[:max_send_size]) except BlockingIOError: - logger.warning("BlockingIOError when trying send to {0}".format(self.tag)) + logger.warning('BlockingIOError when trying send to {0}'.format(self.tag)) return 0 if sent == len(mv): self.buffer.pop(0)