Skip to content

Commit

Permalink
channel.close时不再清空缓存, 以便于还可以继续读
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyangzi committed Mar 10, 2023
1 parent 093285d commit f4a3d15
Show file tree
Hide file tree
Showing 2 changed files with 334 additions and 59 deletions.
140 changes: 84 additions & 56 deletions libgo/routine_sync/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p",
id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_);

if (closed_)
return false;

if (pushWaiting_) { // 有push协程在等待, 读出来 & 清理
if (pushWaiting_ && pushWaiting_ != (ConditionVariable*)kClosedWaiting) { // 有push协程在等待, 读出来 & 清理
t = *pushQ_;
pushQ_ = nullptr;

Expand All @@ -51,6 +48,9 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
return true;
}

if (closed_)
return false;

if (!isWait) {
RS_DBG(dbg_channel, "channel=%ld | %s | not match && not wait | return false",
id(), __func__);
Expand All @@ -68,15 +68,16 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
id(), __func__);

(void)waiting.wait_until_p(lock, abstime, [&]{ return popWaiting_ != &waiting; });
bool ok = popWaiting_ != &waiting;
bool changed = (popWaiting_ != &waiting);
bool ok = changed && popWaiting_ != (ConditionVariable*)kClosedWaiting;

RS_DBG(dbg_channel, "channel=%ld | %s | waked | matched=%d",
id(), __func__, ok);
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | changed=%d | ok=%d",
id(), __func__, closed_, changed, ok);

if (ok) {
// 成功
t = std::move(temp); // 对外部T的写操作放到本线程来做, 降低使用难度
} else {
} else if (!changed) {
// 超时,清理
popQ_ = nullptr;
popWaiting_ = nullptr;
Expand All @@ -93,13 +94,13 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p",
id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_);

if (closed_)
return false;

if (!popWaiting_) {
return pop_impl_with_signal_noqueued(t, isWait, abstime, lock);
}

if (closed_)
return false;

if (!isWait) {
RS_DBG(dbg_channel, "channel=%ld | %s | pop contended && not wait | return false",
id(), __func__);
Expand All @@ -119,8 +120,12 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
}
}

RS_DBG(dbg_channel, "channel=%ld | %s | waked | pop idle",
id(), __func__);
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | pop idle",
id(), __func__, closed_);

if (closed_)
return false;

return pop_impl_with_signal_noqueued(t, isWait, abstime, lock);
}

Expand All @@ -132,10 +137,7 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p",
id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_);

if (closed_)
return false;

if (popWaiting_) { // 有pop协程在等待, 写入 & 清理
if (popWaiting_ && popWaiting_ != (ConditionVariable*)kClosedWaiting) { // 有pop协程在等待, 写入 & 清理
*popQ_ = t;
popQ_ = nullptr;

Expand All @@ -150,6 +152,9 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
return true;
}

if (closed_)
return false;

if (!isWait) {
RS_DBG(dbg_channel, "channel=%ld | %s | not match && not wait | return false",
id(), __func__);
Expand All @@ -166,18 +171,20 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
id(), __func__);

(void)waiting.wait_until_p(lock, abstime, [&]{ return pushWaiting_ != &waiting; });
bool ok = pushWaiting_ != &waiting;
bool changed = (pushWaiting_ != &waiting);
bool ok = changed && pushWaiting_ != (ConditionVariable*)kClosedWaiting;

RS_DBG(dbg_channel, "channel=%ld | %s | waked | matched=%d",
id(), __func__, ok);
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | changed=%d | ok=%d",
id(), __func__, closed_, changed, ok);

if (ok) {
// 成功
} else {
} else if (!changed) {
// 超时,清理
pushQ_ = nullptr;
pushWaiting_ = nullptr;
}

return ok;
}

Expand All @@ -194,6 +201,9 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
return push_impl_with_signal_noqueued(t, isWait, abstime, lock);
}

if (closed_)
return false;

if (!isWait) {
RS_DBG(dbg_channel, "channel=%ld | %s | push contended && not wait | return false",
id(), __func__);
Expand All @@ -213,11 +223,36 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
}
}

RS_DBG(dbg_channel, "channel=%ld | %s | waked | push idle",
id(), __func__);
RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | push idle",
id(), __func__, closed_);

if (closed_)
return false;

return push_impl_with_signal_noqueued(t, isWait, abstime, lock);
}

void impl_with_signal_close(std::unique_lock<Mutex> & lock)
{
long push_wakeup = 0;
long pop_wakeup = 0;
pushQ_ = nullptr;
popQ_ = nullptr;
if (pushWaiting_) {
pushWaiting_->fast_notify_all(lock);
pushWaiting_ = (ConditionVariable*)kClosedWaiting;
push_wakeup = 1;
}
if (popWaiting_) {
popWaiting_->fast_notify_all(lock);
popWaiting_ = (ConditionVariable*)kClosedWaiting;
pop_wakeup = 1;
}

RS_DBG(dbg_channel, "channel(...)=%ld | %s | no-cap branch | push-wakeup=%ld | pop-wakeup=%ld",
id(), __func__, push_wakeup, pop_wakeup);
}

protected:
Mutex mtx_;
ConditionVariable pushCv_;
Expand All @@ -228,6 +263,8 @@ class ChannelImplWithSignal : public DebuggerId<ChannelImplWithSignal<int>>
T* popQ_ {nullptr};
ConditionVariable* pushWaiting_ {nullptr};
ConditionVariable* popWaiting_ {nullptr};

static const std::size_t kClosedWaiting = (std::size_t)-1;
};

template <
Expand All @@ -248,6 +285,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
using ChannelImplWithSignal<T>::pop_impl_with_signal;
using ChannelImplWithSignal<T>::push_impl_with_signal;
using ChannelImplWithSignal<T>::id;
using ChannelImplWithSignal<T>::impl_with_signal_close;

explicit ChannelImpl(std::size_t capacity = 0)
: cap_(capacity)
Expand Down Expand Up @@ -302,22 +340,16 @@ class ChannelImpl : public ChannelImplWithSignal<T>
std::unique_lock<Mutex> lock(mtx_);
closed_ = true;
if (!cap_) {
pushQ_ = nullptr;
popQ_ = nullptr;
if (pushWaiting_) {
pushWaiting_->fast_notify_all(lock);
pushWaiting_ = nullptr;
}
if (popWaiting_) {
popWaiting_->fast_notify_all(lock);
popWaiting_ = nullptr;
}
impl_with_signal_close(lock);
}

pushCv_.fast_notify_all(lock);
popCv_.fast_notify_all(lock);
QueueT q;
std::swap(q, q_);
long push_wakeup = pushCv_.fast_notify_all(lock);
long pop_wakeup = popCv_.fast_notify_all(lock);
(void)push_wakeup;
(void)pop_wakeup;

RS_DBG(dbg_channel, "channel(queue)=%ld | %s | cap=%lu | size=%lu | push-wakeup=%ld | pop-wakeup=%ld",
id(), __func__, cap_, q_.size(), push_wakeup, pop_wakeup);
}

private:
Expand Down Expand Up @@ -349,7 +381,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
return false;
}

auto p = [this]{ return q_.size() < cap_; };
auto p = [this]{ return q_.size() < cap_ || closed_; };

RS_DBG(dbg_channel, "channel(queue)=%ld | %s | begin wait",
id(), __func__);
Expand Down Expand Up @@ -404,7 +436,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
return false;
}

auto p = [this]{ return !q_.empty(); };
auto p = [this]{ return !q_.empty() || closed_; };

RS_DBG(dbg_channel, "channel(queue)=%ld | %s | begin wait",
id(), __func__);
Expand All @@ -418,7 +450,7 @@ class ChannelImpl : public ChannelImplWithSignal<T>
if (status == std::cv_status::timeout)
return false;

if (closed_)
if (q_.empty())
return false;

t = q_.front();
Expand Down Expand Up @@ -454,6 +486,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
using ChannelImplWithSignal<nullptr_t>::pop_impl_with_signal;
using ChannelImplWithSignal<nullptr_t>::push_impl_with_signal;
using ChannelImplWithSignal<nullptr_t>::id;
using ChannelImplWithSignal<nullptr_t>::impl_with_signal_close;

explicit ChannelImpl(std::size_t capacity = 0)
: cap_(capacity), count_(0)
Expand Down Expand Up @@ -508,21 +541,16 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
std::unique_lock<Mutex> lock(mtx_);
closed_ = true;
if (!cap_) {
pushQ_ = nullptr;
popQ_ = nullptr;
if (pushWaiting_) {
pushWaiting_->fast_notify_all(lock);
pushWaiting_ = nullptr;
}
if (popWaiting_) {
popWaiting_->fast_notify_all(lock);
popWaiting_ = nullptr;
}
impl_with_signal_close(lock);
}

pushCv_.fast_notify_all(lock);
popCv_.fast_notify_all(lock);
count_ = 0;
long push_wakeup = pushCv_.fast_notify_all(lock);
long pop_wakeup = popCv_.fast_notify_all(lock);
(void)push_wakeup;
(void)pop_wakeup;

RS_DBG(dbg_channel, "channel(void)=%ld | %s | cap=%lu | size=%lu | push-wakeup=%ld | pop-wakeup=%ld",
id(), __func__, cap_, count_, push_wakeup, pop_wakeup);
}

private:
Expand Down Expand Up @@ -554,7 +582,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
return false;
}

auto p = [this]{ return count_ < cap_; };
auto p = [this]{ return count_ < cap_ || closed_; };

RS_DBG(dbg_channel, "channel(void)=%ld | %s | begin wait",
id(), __func__);
Expand Down Expand Up @@ -607,7 +635,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
return false;
}

auto p = [this]{ return count_ > 0; };
auto p = [this]{ return count_ > 0 || closed_; };

RS_DBG(dbg_channel, "channel(void)=%ld | %s | begin wait",
id(), __func__);
Expand All @@ -621,7 +649,7 @@ class ChannelImpl<nullptr_t, QueueT> : public ChannelImplWithSignal<nullptr_t>
if (status == std::cv_status::timeout)
return false;

if (closed_)
if (count_ <= 0)
return false;

--count_;
Expand Down
Loading

0 comments on commit f4a3d15

Please sign in to comment.