diff --git a/.circleci/config.yml b/.circleci/config.yml index 84a7fc7695..aa9ff697d9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,6 +8,13 @@ jobs: working_directory: /tmp/licode steps: + - run: + name: Install Git client + command: | + set -x + sudo apt-get update + sudo apt-get install -y git + - checkout - setup_remote_docker diff --git a/erizo/src/erizo/DtlsTransport.cpp b/erizo/src/erizo/DtlsTransport.cpp index efdd631998..53e31e9822 100644 --- a/erizo/src/erizo/DtlsTransport.cpp +++ b/erizo/src/erizo/DtlsTransport.cpp @@ -26,7 +26,8 @@ static std::mutex dtls_mutex; Resender::Resender(DtlsTransport* transport, dtls::DtlsSocketContext* ctx) : transport_(transport), socket_context_(ctx), - resend_seconds_(kInitialSecsPerResend), max_resends_(kMaxResends) { + resend_seconds_(kInitialSecsPerResend), max_resends_(kMaxResends), + scheduled_task_{std::make_shared()} { } Resender::~Resender() { diff --git a/erizo/src/erizo/DtlsTransport.h b/erizo/src/erizo/DtlsTransport.h index 6dd8ce7060..1a2f089de9 100644 --- a/erizo/src/erizo/DtlsTransport.h +++ b/erizo/src/erizo/DtlsTransport.h @@ -78,7 +78,7 @@ class Resender { packetPtr packet_; unsigned int resend_seconds_; unsigned int max_resends_; - int scheduled_task_ = -1; + std::shared_ptr scheduled_task_; }; } // namespace erizo #endif // ERIZO_SRC_ERIZO_DTLSTRANSPORT_H_ diff --git a/erizo/src/erizo/rtp/PliPacerHandler.cpp b/erizo/src/erizo/rtp/PliPacerHandler.cpp index 9afaef7a34..86d0409c5b 100644 --- a/erizo/src/erizo/rtp/PliPacerHandler.cpp +++ b/erizo/src/erizo/rtp/PliPacerHandler.cpp @@ -13,7 +13,7 @@ constexpr duration PliPacerHandler::kKeyframeTimeout; PliPacerHandler::PliPacerHandler(std::shared_ptr the_clock) : enabled_{true}, connection_{nullptr}, clock_{the_clock}, time_last_keyframe_{clock_->now()}, - waiting_for_keyframe_{false}, scheduled_pli_{-1}, + waiting_for_keyframe_{false}, scheduled_pli_{std::make_shared()}, video_sink_ssrc_{0}, video_source_ssrc_{0}, fir_seq_number_{0} {} void PliPacerHandler::enable() { @@ -38,7 +38,7 @@ void PliPacerHandler::read(Context *ctx, std::shared_ptr packet) { time_last_keyframe_ = clock_->now(); waiting_for_keyframe_ = false; connection_->getWorker()->unschedule(scheduled_pli_); - scheduled_pli_ = -1; + scheduled_pli_ = std::make_shared(); } ctx->fireRead(std::move(packet)); } @@ -54,7 +54,7 @@ void PliPacerHandler::sendFIR() { getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++)); getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++)); waiting_for_keyframe_ = false; - scheduled_pli_ = -1; + scheduled_pli_ = std::make_shared(); } void PliPacerHandler::scheduleNextPLI() { diff --git a/erizo/src/erizo/rtp/PliPacerHandler.h b/erizo/src/erizo/rtp/PliPacerHandler.h index 6ecb57405a..7407421a55 100644 --- a/erizo/src/erizo/rtp/PliPacerHandler.h +++ b/erizo/src/erizo/rtp/PliPacerHandler.h @@ -5,6 +5,7 @@ #include "./logger.h" #include "pipeline/Handler.h" +#include "thread/Worker.h" #include "lib/Clock.h" namespace erizo { @@ -43,7 +44,7 @@ class PliPacerHandler: public Handler, public std::enable_shared_from_this clock_; time_point time_last_keyframe_; bool waiting_for_keyframe_; - int scheduled_pli_; + std::shared_ptr scheduled_pli_; uint32_t video_sink_ssrc_; uint32_t video_source_ssrc_; uint8_t fir_seq_number_; diff --git a/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.cpp b/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.cpp index b018210fa8..ea5d571c17 100644 --- a/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.cpp +++ b/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.cpp @@ -26,7 +26,7 @@ RtpPaddingGeneratorHandler::RtpPaddingGeneratorHandler(std::shared_ptr()} {} diff --git a/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.h b/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.h index 46269e3da0..1cd138259b 100644 --- a/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.h +++ b/erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.h @@ -7,6 +7,7 @@ #include "pipeline/Handler.h" #include "lib/Clock.h" #include "lib/TokenBucket.h" +#include "thread/Worker.h" #include "rtp/SequenceNumberTranslator.h" #include "./Stats.h" @@ -64,7 +65,7 @@ class RtpPaddingGeneratorHandler: public Handler, public std::enable_shared_from MovingIntervalRateStat marker_rate_; uint32_t rtp_header_length_; TokenBucket bucket_; - int scheduled_task_; + std::shared_ptr scheduled_task_; }; } // namespace erizo diff --git a/erizo/src/erizo/thread/Worker.cpp b/erizo/src/erizo/thread/Worker.cpp index 86ea13d325..e9de8a4a26 100644 --- a/erizo/src/erizo/thread/Worker.cpp +++ b/erizo/src/erizo/thread/Worker.cpp @@ -10,6 +10,17 @@ using erizo::Worker; using erizo::SimulatedWorker; +using erizo::ScheduledTaskReference; + +ScheduledTaskReference::ScheduledTaskReference() : cancelled{false} { +} + +bool ScheduledTaskReference::isCancelled() { + return cancelled; +} +void ScheduledTaskReference::cancel() { + cancelled = true; +} Worker::Worker(std::weak_ptr scheduler, std::shared_ptr the_clock) : scheduler_{scheduler}, @@ -50,21 +61,22 @@ void Worker::close() { service_.stop(); } -int Worker::scheduleFromNow(Task f, duration delta) { +std::shared_ptr Worker::scheduleFromNow(Task f, duration delta) { auto delta_ms = std::chrono::duration_cast(delta); - int uuid = next_scheduled_++; + auto id = std::make_shared(); if (auto scheduler = scheduler_.lock()) { - scheduler->scheduleFromNow(safeTask([f, uuid](std::shared_ptr this_ptr) { - this_ptr->task(this_ptr->safeTask([f, uuid](std::shared_ptr this_ptr) { - std::unique_lock lock(this_ptr->cancel_mutex_); - if (this_ptr->isCancelled(uuid)) { - return; + scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr this_ptr) { + this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr this_ptr) { + { + if (id->isCancelled()) { + return; + } } f(); })); }), delta_ms); } - return uuid; + return id; } void Worker::scheduleEvery(ScheduledTask f, duration period) { @@ -84,20 +96,8 @@ void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay }), next_delay); } -void Worker::unschedule(int uuid) { - if (uuid < 0) { - return; - } - std::unique_lock lock(cancel_mutex_); - cancelled_.push_back(uuid); -} - -bool Worker::isCancelled(int uuid) { - if (std::find(cancelled_.begin(), cancelled_.end(), uuid) != cancelled_.end()) { - cancelled_.erase(std::remove(cancelled_.begin(), cancelled_.end(), uuid), cancelled_.end()); - return true; - } - return false; +void Worker::unschedule(std::shared_ptr id) { + id->cancel(); } std::function Worker::safeTask(std::function)> f) { @@ -128,15 +128,15 @@ void SimulatedWorker::close() { tasks_.clear(); } -int SimulatedWorker::scheduleFromNow(Task f, duration delta) { - int uuid = next_scheduled_++; - scheduled_tasks_[clock_->now() + delta] = [this, f, uuid] { - if (isCancelled(uuid)) { +std::shared_ptr SimulatedWorker::scheduleFromNow(Task f, duration delta) { + auto id = std::make_shared(); + scheduled_tasks_[clock_->now() + delta] = [this, f, id] { + if (id->isCancelled()) { return; } f(); }; - return uuid; + return id; } void SimulatedWorker::executeTasks() { diff --git a/erizo/src/erizo/thread/Worker.h b/erizo/src/erizo/thread/Worker.h index 95bb2bff0c..58ca33673f 100644 --- a/erizo/src/erizo/thread/Worker.h +++ b/erizo/src/erizo/thread/Worker.h @@ -17,6 +17,15 @@ namespace erizo { +class ScheduledTaskReference { + public: + ScheduledTaskReference(); + bool isCancelled(); + void cancel(); + private: + std::atomic cancelled; +}; + class Worker : public std::enable_shared_from_this { public: typedef std::unique_ptr asio_worker; @@ -33,14 +42,11 @@ class Worker : public std::enable_shared_from_this { virtual void start(std::shared_ptr> start_promise); virtual void close(); - virtual int scheduleFromNow(Task f, duration delta); - virtual void unschedule(int uuid); + virtual std::shared_ptr scheduleFromNow(Task f, duration delta); + virtual void unschedule(std::shared_ptr id); virtual void scheduleEvery(ScheduledTask f, duration period); - protected: - bool isCancelled(int uuid); - private: void scheduleEvery(ScheduledTask f, duration period, duration next_delay); std::function safeTask(std::function)> f); @@ -55,8 +61,6 @@ class Worker : public std::enable_shared_from_this { asio_worker service_worker_; boost::thread_group group_; std::atomic closed_; - std::vector cancelled_; - mutable std::mutex cancel_mutex_; }; class SimulatedWorker : public Worker { @@ -66,7 +70,7 @@ class SimulatedWorker : public Worker { void start() override; void start(std::shared_ptr> start_promise) override; void close() override; - int scheduleFromNow(Task f, duration delta) override; + std::shared_ptr scheduleFromNow(Task f, duration delta) override; void executeTasks(); void executePastScheduledTasks();