diff --git a/include/proxy/TransformInternal.h b/include/proxy/TransformInternal.h index 1aba6df1adc..78817073cb7 100644 --- a/include/proxy/TransformInternal.h +++ b/include/proxy/TransformInternal.h @@ -31,6 +31,7 @@ class TransformTerminus : public VConnection { public: TransformTerminus(TransformVConnection *tvc); + ~TransformTerminus() override; int handle_event(int event, void *edata); @@ -49,6 +50,12 @@ class TransformTerminus : public VConnection int m_deletable; int m_closed; int m_called_user; + +private: + Event *_read_event = nullptr; + bool _read_disabled = true; + Event *_write_event = nullptr; + bool _write_disabled = true; }; class TransformVConnection : public TransformVCChain diff --git a/src/proxy/Transform.cc b/src/proxy/Transform.cc index a19517135f4..fd74f7e6068 100644 --- a/src/proxy/Transform.cc +++ b/src/proxy/Transform.cc @@ -132,6 +132,18 @@ TransformTerminus::TransformTerminus(TransformVConnection *tvc) SET_HANDLER(&TransformTerminus::handle_event); } +TransformTerminus::~TransformTerminus() +{ + if (_read_event != nullptr) { + _read_event->cancel(); + _read_event = nullptr; + } + if (_write_event != nullptr) { + _write_event->cancel(); + _write_event = nullptr; + } +} + #define RETRY() \ if (ink_atomic_increment((int *)&m_event_count, 1) < 0) { \ ink_assert(!"not reached"); \ @@ -140,8 +152,15 @@ TransformTerminus::TransformTerminus(TransformVConnection *tvc) return 0; int -TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */) +TransformTerminus::handle_event(int event, void *edata) { + Event *event_p = reinterpret_cast(edata); + if (event_p == _read_event) { + _read_event = nullptr; + } else if (event_p == _write_event) { + _write_event = nullptr; + } + int val; m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0)); @@ -212,12 +231,14 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */) } } - if (m_write_vio.ntodo() > 0) { - if (towrite > 0) { - m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio); + if (!_write_disabled) { + if (m_write_vio.ntodo() > 0) { + if (towrite > 0) { + m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio); + } + } else { + m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio); } - } else { - m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio); } // We could have closed on the write callback @@ -225,14 +246,16 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */) return 0; } - if (m_read_vio.ntodo() > 0) { - if (m_write_vio.ntodo() <= 0) { - m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio); - } else if (towrite > 0) { - m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio); + if (!_read_disabled) { + if (m_read_vio.ntodo() > 0) { + if (m_write_vio.ntodo() <= 0) { + m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio); + } else if (towrite > 0) { + m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio); + } + } else { + m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio); } - } else { - m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio); } } } else { @@ -256,7 +279,7 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */) if (!m_called_user) { m_called_user = 1; m_tvc->m_cont->handleEvent(ev, &m_read_vio); - } else { + } else if (!_read_disabled) { ink_assert(m_read_vio.cont != nullptr); m_read_vio.cont->handleEvent(ev, &m_read_vio); } @@ -275,19 +298,29 @@ TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */) VIO * TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) { - m_read_vio.set_writer(buf); m_read_vio.op = VIO::READ; m_read_vio.set_continuation(c); m_read_vio.nbytes = nbytes; m_read_vio.ndone = 0; m_read_vio.vc_server = this; - if (ink_atomic_increment(&m_event_count, 1) < 0) { - ink_assert(!"not reached"); - } - Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_read] event_count %d", m_event_count); + if (buf != nullptr) { + _read_disabled = false; + m_read_vio.set_writer(buf); + if (ink_atomic_increment(&m_event_count, 1) < 0) { + ink_assert(!"not reached"); + } + Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_read] event_count %d", m_event_count); - this_ethread()->schedule_imm_local(this); + _read_event = this_ethread()->schedule_imm_local(this); + } else { + _read_disabled = true; + if (_read_event != nullptr) { + _read_event->cancel(); + _read_event = nullptr; + } + m_read_vio.buffer.clear(); + } return &m_read_vio; } @@ -300,19 +333,29 @@ TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader * { // In the process of eliminating 'owner' mode so asserting against it ink_assert(!owner); - m_write_vio.set_reader(buf); m_write_vio.op = VIO::WRITE; m_write_vio.set_continuation(c); m_write_vio.nbytes = nbytes; m_write_vio.ndone = 0; m_write_vio.vc_server = this; - if (ink_atomic_increment(&m_event_count, 1) < 0) { - ink_assert(!"not reached"); - } - Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_write] event_count %d", m_event_count); + if (buf != nullptr) { + _write_disabled = false; + m_write_vio.set_reader(buf); + if (ink_atomic_increment(&m_event_count, 1) < 0) { + ink_assert(!"not reached"); + } + Dbg(dbg_ctl_transform, "[TransformTerminus::do_io_write] event_count %d", m_event_count); - this_ethread()->schedule_imm_local(this); + _write_event = this_ethread()->schedule_imm_local(this); + } else { + _write_disabled = true; + if (_write_event != nullptr) { + _write_event->cancel(); + _write_event = nullptr; + } + m_write_vio.buffer.clear(); + } return &m_write_vio; }