From 3c88eb0151ab69b07f8e48925e22a55e7b10f73e Mon Sep 17 00:00:00 2001 From: qgymib Date: Sat, 7 Oct 2023 15:17:28 +0800 Subject: [PATCH] remove active_events in tcp/win --- ev.c | 105 ++++++++++++++++++++++++++++++++++------------ src/win/tcp_win.c | 101 +++++++++++++++++++++++++++++++++----------- 2 files changed, 156 insertions(+), 50 deletions(-) diff --git a/ev.c b/ev.c index 2e5542fa3..53ae508cf 100644 --- a/ev.c +++ b/ev.c @@ -10113,8 +10113,8 @@ void ev_shm_exit(ev_shm_t* shm) //////////////////////////////////////////////////////////////////////////////// // FILE: src/win/tcp_win.c -// SIZE: 20319 -// SHA-256: fc905f9f437fa791366f0e02d758ba6b9f03c023dae4093b3f67d5b1167dcfa6 +// SIZE: 21629 +// SHA-256: 0c8e5c9b96d855ba9327540ebdf923b1cb6446dc63cff7b05663bea9c397d20b //////////////////////////////////////////////////////////////////////////////// #line 1 "src/win/tcp_win.c" /* AMALGAMATE: #include "ev.h" */ @@ -10135,6 +10135,51 @@ static void _ev_tcp_close_socket(ev_tcp_t* sock) } } +static void _ev_tcp_smart_deactive_win(ev_tcp_t* sock) +{ + size_t io_sz = 0; + if (sock->base.data.flags & EV_HANDLE_TCP_LISTING) + { + io_sz = ev_list_size(&sock->backend.u.listen.a_queue); + if (io_sz != 0) + { + return; + } + } + else if (sock->base.data.flags & EV_HANDLE_TCP_ACCEPTING) + { + if (sock->backend.u.accept.cb != NULL) + { + return; + } + } + else if (sock->base.data.flags & EV_HANDLE_TCP_CONNECTING) + { + if (sock->backend.u.client.cb != NULL) + { + return; + } + } + else if (sock->base.data.flags & EV_HANDLE_TCP_STREAMING) + { + io_sz += ev_list_size(&sock->backend.u.stream.w_queue); + io_sz += ev_list_size(&sock->backend.u.stream.r_queue); + if (io_sz != 0) + { + return; + } + } + + ev__handle_deactive(&sock->base); +} + +static void _ev_tcp_accept_callback_once(ev_tcp_t* lisn, ev_tcp_t* conn, int stat) +{ + ev_tcp_accept_cb bak_cb = conn->backend.u.accept.cb; + conn->backend.u.accept.cb = NULL; + bak_cb(lisn, conn, stat); +} + static void _ev_tcp_finialize_accept(ev_tcp_t* conn) { ev_tcp_t* lisn = conn->backend.u.accept.listen; @@ -10150,23 +10195,23 @@ static void _ev_tcp_finialize_accept(ev_tcp_t* conn) ev_list_erase(&lisn->backend.u.listen.a_queue_done, &conn->backend.u.accept.node); } - ev__handle_event_dec(&lisn->base); - ev__handle_event_dec(&conn->base); - conn->base.data.flags &= ~EV_HANDLE_TCP_ACCEPTING; - conn->backend.u.accept.cb(lisn, conn, conn->backend.u.accept.stat); + _ev_tcp_smart_deactive_win(lisn); + _ev_tcp_smart_deactive_win(conn); + + _ev_tcp_accept_callback_once(lisn, conn, conn->backend.u.accept.stat); } static void _ev_tcp_cleanup_connection_in_listen(ev_tcp_t* conn) { ev_tcp_t* lisn = conn->backend.u.accept.listen; - ev__handle_event_dec(&lisn->base); - ev__handle_event_dec(&conn->base); + conn->base.data.flags &= ~EV_HANDLE_TCP_ACCEPTING; + _ev_tcp_smart_deactive_win(lisn); + _ev_tcp_smart_deactive_win(conn); _ev_tcp_close_socket(conn); - conn->base.data.flags &= ~EV_HANDLE_TCP_ACCEPTING; - conn->backend.u.accept.cb(lisn, conn, EV_ECANCELED); + _ev_tcp_accept_callback_once(lisn, conn, EV_ECANCELED); } static void _ev_tcp_cleanup_listen(ev_tcp_t* sock) @@ -10186,14 +10231,14 @@ static void _ev_tcp_cleanup_listen(ev_tcp_t* sock) static void _ev_tcp_w_user_callback_win(ev_tcp_t* sock, ev_tcp_write_req_t* req, ssize_t size) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); ev__write_exit(&req->base); req->user_callback(req, size); } static void _ev_tcp_r_user_callbak_win(ev_tcp_t* sock, ev_tcp_read_req_t* req, ssize_t size) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); ev__read_exit(&req->base); req->user_callback(req, size); } @@ -10223,13 +10268,21 @@ static void _ev_tcp_cleanup_stream(ev_tcp_t* sock) } } +static void _ev_tcp_connect_callback_once_win(ev_tcp_t* sock, int stat) +{ + ev_tcp_connect_cb bak_cb = sock->backend.u.client.cb; + sock->backend.u.client.cb = NULL; + bak_cb(sock, stat); +} + static void _ev_tcp_cleanup_connect(ev_tcp_t* sock) { if (sock->backend.u.client.stat == EV_EINPROGRESS) { sock->backend.u.client.stat = EV_ECANCELED; } - sock->backend.u.client.cb(sock, sock->backend.u.client.stat); + + _ev_tcp_connect_callback_once_win(sock, sock->backend.u.client.stat); } static void _ev_tcp_on_close_win(ev_handle_t* handle) @@ -10396,10 +10449,10 @@ static void _ev_tcp_process_connect(ev_tcp_t* sock) } } - ev__handle_event_dec(&sock->base); - sock->base.data.flags &= ~EV_HANDLE_TCP_CONNECTING; - sock->backend.u.client.cb(sock, sock->backend.u.client.stat); + _ev_tcp_smart_deactive_win(sock); + + _ev_tcp_connect_callback_once_win(sock, sock->backend.u.client.stat); } static void _ev_tcp_on_task_done(ev_handle_t* handle) @@ -10667,8 +10720,8 @@ int ev_tcp_accept(ev_tcp_t* lisn, ev_tcp_t* conn, ev_tcp_accept_cb cb) } _ev_tcp_setup_accept_win(lisn, conn, cb); - ev__handle_event_add(&lisn->base); - ev__handle_event_add(&conn->base); + ev__handle_active(&lisn->base); + ev__handle_active(&conn->base); DWORD bytes = 0; ret = AcceptEx(lisn->sock, conn->sock, @@ -10686,9 +10739,9 @@ int ev_tcp_accept(ev_tcp_t* lisn, ev_tcp_t* conn, ev_tcp_accept_cb cb) if ((ret = WSAGetLastError()) != WSA_IO_PENDING) { - ev__handle_event_dec(&lisn->base); - ev__handle_event_dec(&conn->base); conn->base.data.flags &= ~EV_HANDLE_TCP_CONNECTING; + _ev_tcp_smart_deactive_win(lisn); + _ev_tcp_smart_deactive_win(conn); return ev__translate_sys_error(ret); } conn->base.data.flags |= EV_HANDLE_TCP_ACCEPTING; @@ -10763,7 +10816,7 @@ int ev_tcp_connect(ev_tcp_t* sock, struct sockaddr* addr, size_t size, ev_tcp_co { goto err; } - ev__handle_event_add(&sock->base); + ev__handle_active(&sock->base); DWORD bytes; ret = sock->backend.u.client.fn_connectex(sock->sock, addr, (int)size, @@ -10779,7 +10832,7 @@ int ev_tcp_connect(ev_tcp_t* sock, struct sockaddr* addr, size_t size, ev_tcp_co if (ret != WSA_IO_PENDING) { sock->base.data.flags &= ~EV_HANDLE_TCP_CONNECTING; - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); ret = ev__translate_sys_error(ret); goto err; } @@ -10809,7 +10862,7 @@ int ev_tcp_write(ev_tcp_t* sock, ev_tcp_write_req_t* req, ev_buf_t* bufs, } ev_list_push_back(&sock->backend.u.stream.w_queue, &req->base.node); - ev__handle_event_add(&sock->base); + ev__handle_active(&sock->base); ret = WSASend(sock->sock, (WSABUF*)req->base.bufs, (DWORD)req->base.nbuf, NULL, 0, &req->backend.io.overlapped, NULL); @@ -10824,7 +10877,7 @@ int ev_tcp_write(ev_tcp_t* sock, ev_tcp_write_req_t* req, ev_buf_t* bufs, if ((ret = WSAGetLastError()) != WSA_IO_PENDING) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); return ev__translate_sys_error(ret); } @@ -10847,7 +10900,7 @@ int ev_tcp_read(ev_tcp_t* sock, ev_tcp_read_req_t* req, } ev_list_push_back(&sock->backend.u.stream.r_queue, &req->base.node); - ev__handle_event_add(&sock->base); + ev__handle_active(&sock->base); DWORD flags = 0; ret = WSARecv(sock->sock, (WSABUF*)req->base.data.bufs, (DWORD)req->base.data.nbuf, @@ -10863,7 +10916,7 @@ int ev_tcp_read(ev_tcp_t* sock, ev_tcp_read_req_t* req, if ((ret = WSAGetLastError()) != WSA_IO_PENDING) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); return ev__translate_sys_error(ret); } diff --git a/src/win/tcp_win.c b/src/win/tcp_win.c index fbd359ba9..0b7ad4fc1 100644 --- a/src/win/tcp_win.c +++ b/src/win/tcp_win.c @@ -16,6 +16,51 @@ static void _ev_tcp_close_socket(ev_tcp_t* sock) } } +static void _ev_tcp_smart_deactive_win(ev_tcp_t* sock) +{ + size_t io_sz = 0; + if (sock->base.data.flags & EV_HANDLE_TCP_LISTING) + { + io_sz = ev_list_size(&sock->backend.u.listen.a_queue); + if (io_sz != 0) + { + return; + } + } + else if (sock->base.data.flags & EV_HANDLE_TCP_ACCEPTING) + { + if (sock->backend.u.accept.cb != NULL) + { + return; + } + } + else if (sock->base.data.flags & EV_HANDLE_TCP_CONNECTING) + { + if (sock->backend.u.client.cb != NULL) + { + return; + } + } + else if (sock->base.data.flags & EV_HANDLE_TCP_STREAMING) + { + io_sz += ev_list_size(&sock->backend.u.stream.w_queue); + io_sz += ev_list_size(&sock->backend.u.stream.r_queue); + if (io_sz != 0) + { + return; + } + } + + ev__handle_deactive(&sock->base); +} + +static void _ev_tcp_accept_callback_once(ev_tcp_t* lisn, ev_tcp_t* conn, int stat) +{ + ev_tcp_accept_cb bak_cb = conn->backend.u.accept.cb; + conn->backend.u.accept.cb = NULL; + bak_cb(lisn, conn, stat); +} + static void _ev_tcp_finialize_accept(ev_tcp_t* conn) { ev_tcp_t* lisn = conn->backend.u.accept.listen; @@ -31,23 +76,23 @@ static void _ev_tcp_finialize_accept(ev_tcp_t* conn) ev_list_erase(&lisn->backend.u.listen.a_queue_done, &conn->backend.u.accept.node); } - ev__handle_event_dec(&lisn->base); - ev__handle_event_dec(&conn->base); - conn->base.data.flags &= ~EV_HANDLE_TCP_ACCEPTING; - conn->backend.u.accept.cb(lisn, conn, conn->backend.u.accept.stat); + _ev_tcp_smart_deactive_win(lisn); + _ev_tcp_smart_deactive_win(conn); + + _ev_tcp_accept_callback_once(lisn, conn, conn->backend.u.accept.stat); } static void _ev_tcp_cleanup_connection_in_listen(ev_tcp_t* conn) { ev_tcp_t* lisn = conn->backend.u.accept.listen; - ev__handle_event_dec(&lisn->base); - ev__handle_event_dec(&conn->base); + conn->base.data.flags &= ~EV_HANDLE_TCP_ACCEPTING; + _ev_tcp_smart_deactive_win(lisn); + _ev_tcp_smart_deactive_win(conn); _ev_tcp_close_socket(conn); - conn->base.data.flags &= ~EV_HANDLE_TCP_ACCEPTING; - conn->backend.u.accept.cb(lisn, conn, EV_ECANCELED); + _ev_tcp_accept_callback_once(lisn, conn, EV_ECANCELED); } static void _ev_tcp_cleanup_listen(ev_tcp_t* sock) @@ -67,14 +112,14 @@ static void _ev_tcp_cleanup_listen(ev_tcp_t* sock) static void _ev_tcp_w_user_callback_win(ev_tcp_t* sock, ev_tcp_write_req_t* req, ssize_t size) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); ev__write_exit(&req->base); req->user_callback(req, size); } static void _ev_tcp_r_user_callbak_win(ev_tcp_t* sock, ev_tcp_read_req_t* req, ssize_t size) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); ev__read_exit(&req->base); req->user_callback(req, size); } @@ -104,13 +149,21 @@ static void _ev_tcp_cleanup_stream(ev_tcp_t* sock) } } +static void _ev_tcp_connect_callback_once_win(ev_tcp_t* sock, int stat) +{ + ev_tcp_connect_cb bak_cb = sock->backend.u.client.cb; + sock->backend.u.client.cb = NULL; + bak_cb(sock, stat); +} + static void _ev_tcp_cleanup_connect(ev_tcp_t* sock) { if (sock->backend.u.client.stat == EV_EINPROGRESS) { sock->backend.u.client.stat = EV_ECANCELED; } - sock->backend.u.client.cb(sock, sock->backend.u.client.stat); + + _ev_tcp_connect_callback_once_win(sock, sock->backend.u.client.stat); } static void _ev_tcp_on_close_win(ev_handle_t* handle) @@ -277,10 +330,10 @@ static void _ev_tcp_process_connect(ev_tcp_t* sock) } } - ev__handle_event_dec(&sock->base); - sock->base.data.flags &= ~EV_HANDLE_TCP_CONNECTING; - sock->backend.u.client.cb(sock, sock->backend.u.client.stat); + _ev_tcp_smart_deactive_win(sock); + + _ev_tcp_connect_callback_once_win(sock, sock->backend.u.client.stat); } static void _ev_tcp_on_task_done(ev_handle_t* handle) @@ -548,8 +601,8 @@ int ev_tcp_accept(ev_tcp_t* lisn, ev_tcp_t* conn, ev_tcp_accept_cb cb) } _ev_tcp_setup_accept_win(lisn, conn, cb); - ev__handle_event_add(&lisn->base); - ev__handle_event_add(&conn->base); + ev__handle_active(&lisn->base); + ev__handle_active(&conn->base); DWORD bytes = 0; ret = AcceptEx(lisn->sock, conn->sock, @@ -567,9 +620,9 @@ int ev_tcp_accept(ev_tcp_t* lisn, ev_tcp_t* conn, ev_tcp_accept_cb cb) if ((ret = WSAGetLastError()) != WSA_IO_PENDING) { - ev__handle_event_dec(&lisn->base); - ev__handle_event_dec(&conn->base); conn->base.data.flags &= ~EV_HANDLE_TCP_CONNECTING; + _ev_tcp_smart_deactive_win(lisn); + _ev_tcp_smart_deactive_win(conn); return ev__translate_sys_error(ret); } conn->base.data.flags |= EV_HANDLE_TCP_ACCEPTING; @@ -644,7 +697,7 @@ int ev_tcp_connect(ev_tcp_t* sock, struct sockaddr* addr, size_t size, ev_tcp_co { goto err; } - ev__handle_event_add(&sock->base); + ev__handle_active(&sock->base); DWORD bytes; ret = sock->backend.u.client.fn_connectex(sock->sock, addr, (int)size, @@ -660,7 +713,7 @@ int ev_tcp_connect(ev_tcp_t* sock, struct sockaddr* addr, size_t size, ev_tcp_co if (ret != WSA_IO_PENDING) { sock->base.data.flags &= ~EV_HANDLE_TCP_CONNECTING; - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); ret = ev__translate_sys_error(ret); goto err; } @@ -690,7 +743,7 @@ int ev_tcp_write(ev_tcp_t* sock, ev_tcp_write_req_t* req, ev_buf_t* bufs, } ev_list_push_back(&sock->backend.u.stream.w_queue, &req->base.node); - ev__handle_event_add(&sock->base); + ev__handle_active(&sock->base); ret = WSASend(sock->sock, (WSABUF*)req->base.bufs, (DWORD)req->base.nbuf, NULL, 0, &req->backend.io.overlapped, NULL); @@ -705,7 +758,7 @@ int ev_tcp_write(ev_tcp_t* sock, ev_tcp_write_req_t* req, ev_buf_t* bufs, if ((ret = WSAGetLastError()) != WSA_IO_PENDING) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); return ev__translate_sys_error(ret); } @@ -728,7 +781,7 @@ int ev_tcp_read(ev_tcp_t* sock, ev_tcp_read_req_t* req, } ev_list_push_back(&sock->backend.u.stream.r_queue, &req->base.node); - ev__handle_event_add(&sock->base); + ev__handle_active(&sock->base); DWORD flags = 0; ret = WSARecv(sock->sock, (WSABUF*)req->base.data.bufs, (DWORD)req->base.data.nbuf, @@ -744,7 +797,7 @@ int ev_tcp_read(ev_tcp_t* sock, ev_tcp_read_req_t* req, if ((ret = WSAGetLastError()) != WSA_IO_PENDING) { - ev__handle_event_dec(&sock->base); + _ev_tcp_smart_deactive_win(sock); return ev__translate_sys_error(ret); }