From 437e135dbd94eb65b45533d9ce8ee28b5bd37b6d Mon Sep 17 00:00:00 2001 From: vit-vit Date: Fri, 12 Jun 2015 13:42:37 +1000 Subject: [PATCH] fixed a bug - crash --- ctpl_stl.h | 139 ++++++++++++++++++++++++++++------------------------- 1 file changed, 73 insertions(+), 66 deletions(-) diff --git a/ctpl_stl.h b/ctpl_stl.h index 43543a4..5956cf0 100644 --- a/ctpl_stl.h +++ b/ctpl_stl.h @@ -1,21 +1,20 @@ - /********************************************************* - * - * Copyright (C) 2014 by Vitaliy Vitsentiy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - *********************************************************/ +* +* Copyright (C) 2014 by Vitaliy Vitsentiy +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*********************************************************/ #ifndef __ctpl_stl_thread_pool_H__ @@ -41,6 +40,34 @@ namespace ctpl { + namespace detail { + template + class Queue { + public: + bool push(T const & value) { + std::unique_lock lock(this->mutex); + this->q.push(value); + return true; + } + // deletes the retrieved element, do not use for non integral types + bool pop(T & v) { + std::unique_lock lock(this->mutex); + if (this->q.empty()) + return false; + v = this->q.front(); + this->q.pop(); + return true; + } + bool empty() { + std::unique_lock lock(this->mutex); + return this->q.empty(); + } + private: + std::queue q; + std::mutex mutex; + }; + } + class thread_pool { public: @@ -93,24 +120,24 @@ namespace ctpl { // empty the queue void clear_queue() { - std::unique_lock lock(this->mutex); - while (!this->q.empty()) - this->q.pop(); // empty the queue + std::function * _f; + while (this->q.pop(_f)) + delete _f; // empty the queue } - // pops a functional wraper to the original function + // pops a functional wrapper to the original function std::function pop() { + std::function * _f = nullptr; + this->q.pop(_f); + std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred std::function f; - std::unique_lock lock(this->mutex); - if (!this->q.empty()) { - f = this->q.front(); - this->q.pop(); - } + if (_f) + f = *_f; return f; } // wait for all computing threads to finish and stop all threads - // may be called asyncronously to not pause the calling thread while waiting + // may be called asynchronously to not pause the calling thread while waiting // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions void stop(bool isWait = false) { if (!isWait) { @@ -132,8 +159,8 @@ namespace ctpl { this->cv.notify_all(); // stop all waiting threads } for (int i = 0; i < static_cast(this->threads.size()); ++i) { // wait for the computing threads to finish - if (this->threads[i]->joinable()) - this->threads[i]->join(); + if (this->threads[i]->joinable()) + this->threads[i]->join(); } // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads // therefore delete them here @@ -146,15 +173,13 @@ namespace ctpl { auto push(F && f, Rest&&... rest) ->std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...) - ); - - this->q.push([pck](int id) { + ); + auto _f = new std::function([pck](int id) { (*pck)(id); }); - + this->q.push(_f); std::unique_lock lock(this->mutex); this->cv.notify_one(); - return pck->get_future(); } @@ -163,14 +188,12 @@ namespace ctpl { template auto push(F && f) ->std::future { auto pck = std::make_shared>(std::forward(f)); - - this->q.push([pck](int id) { + auto _f = new std::function([pck](int id) { (*pck)(id); }); - + this->q.push(_f); std::unique_lock lock(this->mutex); this->cv.notify_one(); - return pck->get_future(); } @@ -184,52 +207,37 @@ namespace ctpl { thread_pool & operator=(thread_pool &&);// = delete; void set_thread(int i) { - std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag + std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { std::atomic & _flag = *flag; - std::function _f; - bool isPopped = false; + std::function * _f; + bool isPop = this->q.pop(_f); while (true) { - while (true) { // if there is anything in the queue - if (!isPopped) { - std::unique_lock lock(this->mutex); - if (this->q.empty()) - break; - _f = this->q.front(); - this->q.pop(); - } - _f(i); - isPopped = false; - + while (isPop) { // if there is anything in the queue + std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred + (*_f)(i); if (_flag) return; // the thread is wanted to stop, return even if the queue is not empty yet + else + isPop = this->q.pop(_f); } - // the queue is empty here, wait for the next command std::unique_lock lock(this->mutex); ++this->nWaiting; - this->cv.wait(lock, [this, &_f, &isPopped, &_flag](){ - isPopped = !this->q.empty(); - if (isPopped) { - _f = this->q.front(); - this->q.pop(); - } - return isPopped || this->isDone || _flag; - }); + this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); --this->nWaiting; - - if (!isPopped) + if (!isPop) return; // if the queue is empty and this->isDone == true or *flag then return } }; - this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() } void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } std::vector> threads; std::vector>> flags; - mutable std::queue> q; + detail::Queue *> q; std::atomic isDone; std::atomic isStop; std::atomic nWaiting; // how many threads are waiting @@ -241,4 +249,3 @@ namespace ctpl { } #endif // __ctpl_stl_thread_pool_H__ -