Skip to content

Commit

Permalink
Replace IceUtil::Mutex with std::mutex (#1755)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Feb 5, 2024
1 parent fd85c5c commit 7415fd6
Show file tree
Hide file tree
Showing 62 changed files with 414 additions and 439 deletions.
5 changes: 3 additions & 2 deletions cpp/include/Ice/FactoryTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
#ifndef ICE_FACTORYTABLE_H
#define ICE_FACTORYTABLE_H

#include <IceUtil/Mutex.h>
#include <Ice/UserExceptionFactory.h>
#include <Ice/ValueFactory.h>

#include <mutex>

namespace Ice
{

Expand Down Expand Up @@ -53,7 +54,7 @@ class ICE_API FactoryTable : private IceUtil::noncopyable

private:

IceUtil::Mutex _m;
mutable std::mutex _mutex;

typedef ::std::pair<::Ice::UserExceptionFactory, int> EFPair;
typedef ::std::map<::std::string, EFPair> EFTable;
Expand Down
14 changes: 8 additions & 6 deletions cpp/include/Ice/MetricsObserverI.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ template<typename T> class ObserverT : public virtual ::Ice::Instrumentation::Ob
};

template<typename ObserverImplType>
class ObserverFactoryT : public Updater, private IceUtil::Mutex
class ObserverFactoryT : public Updater
{
public:

Expand All @@ -496,7 +496,7 @@ class ObserverFactoryT : public Updater, private IceUtil::Mutex
ObserverImplPtrType
getObserver(const MetricsHelperT<MetricsType>& helper)
{
IceUtil::Mutex::Lock sync(*this);
std::lock_guard lock(_mutex);
if(!_metrics)
{
return nullptr;
Expand Down Expand Up @@ -531,7 +531,8 @@ class ObserverFactoryT : public Updater, private IceUtil::Mutex
{
return getObserver(helper);
}
IceUtil::Mutex::Lock sync(*this);

std::lock_guard lock(_mutex);
if(!_metrics)
{
return nullptr;
Expand Down Expand Up @@ -573,7 +574,7 @@ class ObserverFactoryT : public Updater, private IceUtil::Mutex
{
UpdaterPtr updater;
{
IceUtil::Mutex::Lock sync(*this);
std::lock_guard lock(_mutex);
if(!_metrics)
{
return;
Expand All @@ -598,13 +599,13 @@ class ObserverFactoryT : public Updater, private IceUtil::Mutex

void setUpdater(const UpdaterPtr& updater)
{
IceUtil::Mutex::Lock sync(*this);
std::lock_guard lock(_mutex);
_updater = updater;
}

void destroy()
{
IceUtil::Mutex::Lock sync(*this);
std::lock_guard lock(_mutex);
_metrics = 0;
_maps.clear();
}
Expand All @@ -616,6 +617,7 @@ class ObserverFactoryT : public Updater, private IceUtil::Mutex
MetricsMapSeqType _maps;
std::atomic<bool> _enabled;
UpdaterPtr _updater;
std::mutex _mutex;
};

typedef ObserverT<Metrics> ObserverI;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/Glacier2/RoutingTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

#include <Ice/Ice.h>
#include <Ice/ObserverHelper.h>
#include <IceUtil/Mutex.h>

#include <Glacier2/ProxyVerifier.h>
#include <Glacier2/Instrumentation.h>

#include <list>
#include <mutex>

namespace Glacier2
{
Expand Down
36 changes: 15 additions & 21 deletions cpp/src/Ice/ACM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,14 @@ IceInternal::FactoryACMMonitor::~FactoryACMMonitor()
void
IceInternal::FactoryACMMonitor::destroy()
{
Lock sync(*this);
unique_lock lock(_mutex);
if(!_instance)
{
//
// Ensure all the connections have been cleared, it's important to wait here
// to prevent the timer destruction in IceInternal::Instance::destroy.
//
while(!_connections.empty())
{
wait();
}
_conditionVariable.wait(lock, [this] { return _connections.empty(); });
return;
}

Expand All @@ -117,10 +114,7 @@ IceInternal::FactoryACMMonitor::destroy()
//
// Wait for the connection set to be cleared by the timer thread.
//
while(!_connections.empty())
{
wait();
}
_conditionVariable.wait(lock, [this] { return _connections.empty(); });
}

void
Expand All @@ -131,7 +125,7 @@ IceInternal::FactoryACMMonitor::add(const ConnectionIPtr& connection)
return;
}

Lock sync(*this);
lock_guard lock(_mutex);
if(_connections.empty())
{
_connections.insert(connection);
Expand All @@ -151,15 +145,15 @@ IceInternal::FactoryACMMonitor::remove(const ConnectionIPtr& connection)
return;
}

Lock sync(*this);
lock_guard lock(_mutex);
assert(_instance);
_changes.push_back(make_pair(connection, false));
}

void
IceInternal::FactoryACMMonitor::reap(const ConnectionIPtr& connection)
{
Lock sync(*this);
lock_guard lock(_mutex);
_reapedConnections.push_back(connection);
}

Expand All @@ -168,7 +162,7 @@ IceInternal::FactoryACMMonitor::acm(const optional<int>& timeout,
const optional<Ice::ACMClose>& close,
const optional<Ice::ACMHeartbeat>& heartbeat)
{
Lock sync(*this);
lock_guard lock(_mutex);
assert(_instance);

ACMConfig config(_config);
Expand Down Expand Up @@ -200,19 +194,19 @@ IceInternal::FactoryACMMonitor::getACM()
void
IceInternal::FactoryACMMonitor::swapReapedConnections(vector<ConnectionIPtr>& connections)
{
Lock sync(*this);
lock_guard lock(_mutex);
_reapedConnections.swap(connections);
}

void
IceInternal::FactoryACMMonitor::runTimerTask()
{
{
Lock sync(*this);
lock_guard lock(_mutex);
if(!_instance)
{
_connections.clear();
notifyAll();
_conditionVariable.notify_all();
return;
}

Expand Down Expand Up @@ -261,7 +255,7 @@ IceInternal::FactoryACMMonitor::runTimerTask()
void
FactoryACMMonitor::handleException(const exception& ex)
{
Lock sync(*this);
lock_guard lock(_mutex);
if(!_instance)
{
return;
Expand All @@ -274,7 +268,7 @@ FactoryACMMonitor::handleException(const exception& ex)
void
FactoryACMMonitor::handleException()
{
Lock sync(*this);
lock_guard lock(_mutex);
if(!_instance)
{
return;
Expand All @@ -299,7 +293,7 @@ IceInternal::ConnectionACMMonitor::~ConnectionACMMonitor()
void
IceInternal::ConnectionACMMonitor::add(const ConnectionIPtr& connection)
{
Lock sync(*this);
lock_guard lock(_mutex);
assert(!_connection && connection);
_connection = connection;
if(_config.timeout != IceUtil::Time())
Expand All @@ -314,7 +308,7 @@ IceInternal::ConnectionACMMonitor::remove(ICE_MAYBE_UNUSED const ConnectionIPtr&
#ifdef _MSC_VER
UNREFERENCED_PARAMETER(connection);
#endif
Lock sync(*this);
lock_guard lock(_mutex);
assert(_connection == connection);
if(_config.timeout != IceUtil::Time())
{
Expand Down Expand Up @@ -352,7 +346,7 @@ IceInternal::ConnectionACMMonitor::runTimerTask()
{
Ice::ConnectionIPtr connection;
{
Lock sync(*this);
lock_guard lock(_mutex);
if(!_connection)
{
return;
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/Ice/ACM.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
#ifndef ICE_ACM_H
#define ICE_ACM_H

#include <IceUtil/Mutex.h>
#include <IceUtil/Monitor.h>
#include <IceUtil/Timer.h>
#include <Ice/ACMF.h>
#include <Ice/Connection.h>
#include <Ice/ConnectionIF.h>
#include <Ice/InstanceF.h>
#include <Ice/PropertiesF.h>
#include <Ice/LoggerF.h>

#include <condition_variable>
#include <mutex>
#include <set>

namespace IceInternal
Expand Down Expand Up @@ -46,7 +47,6 @@ class ACMMonitor : public IceUtil::TimerTask
};

class FactoryACMMonitor : public ACMMonitor,
public IceUtil::Monitor<IceUtil::Mutex>,
public std::enable_shared_from_this<FactoryACMMonitor>
{
public:
Expand Down Expand Up @@ -80,10 +80,11 @@ class FactoryACMMonitor : public ACMMonitor,
std::vector<std::pair<Ice::ConnectionIPtr, bool> > _changes;
std::set<Ice::ConnectionIPtr> _connections;
std::vector<Ice::ConnectionIPtr> _reapedConnections;
std::mutex _mutex;
std::condition_variable _conditionVariable;
};

class ConnectionACMMonitor : public ACMMonitor,
public IceUtil::Mutex,
public std::enable_shared_from_this<ConnectionACMMonitor>
{
public:
Expand All @@ -109,6 +110,7 @@ class ConnectionACMMonitor : public ACMMonitor,
const ACMConfig _config;

Ice::ConnectionIPtr _connection;
std::mutex _mutex;
};

}
Expand Down
34 changes: 13 additions & 21 deletions cpp/src/Ice/BatchRequestQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ BatchRequestQueue::BatchRequestQueue(const InstancePtr& instance, bool datagram)
void
BatchRequestQueue::prepareBatchRequest(OutputStream* os)
{
Lock sync(*this);
unique_lock lock(_mutex);
if(_exception)
{
_exception->ice_throw();
}
waitStreamInUse(false);
_conditionVariable.wait(lock, [this] { return !_batchStreamInUse; });

_batchStreamInUse = true;
_batchStream.swap(*os);
}
Expand Down Expand Up @@ -135,46 +136,46 @@ BatchRequestQueue::finishBatchRequest(OutputStream* os,
++_batchRequestNum;
}

Lock sync(*this);
lock_guard lock(_mutex);
_batchStream.resize(_batchMarker);
_batchStreamInUse = false;
_batchStreamCanFlush = false;
notifyAll();
_conditionVariable.notify_all();
}
catch(const std::exception&)
{
Lock sync(*this);
lock_guard lock(_mutex);
_batchStream.resize(_batchMarker);
_batchStreamInUse = false;
_batchStreamCanFlush = false;
notifyAll();
_conditionVariable.notify_all();
throw;
}
}

void
BatchRequestQueue::abortBatchRequest(OutputStream* os)
{
Lock sync(*this);
lock_guard lock(_mutex);
if(_batchStreamInUse)
{
_batchStream.swap(*os);
_batchStream.resize(_batchMarker);
_batchStreamInUse = false;
notifyAll();
_conditionVariable.notify_all();
}
}

int
BatchRequestQueue::swap(OutputStream* os, bool& compress)
{
Lock sync(*this);
unique_lock lock(_mutex);
if(_batchRequestNum == 0)
{
return 0;
}

waitStreamInUse(true);
_conditionVariable.wait(lock, [this] { return !_batchStreamInUse || _batchStreamCanFlush; });

vector<Ice::Byte> lastRequest;
if(_batchMarker < _batchStream.b.size())
Expand Down Expand Up @@ -204,26 +205,17 @@ BatchRequestQueue::swap(OutputStream* os, bool& compress)
void
BatchRequestQueue::destroy(const Ice::LocalException& ex)
{
Lock sync(*this);
lock_guard lock(_mutex);
_exception = ex.ice_clone();
}

bool
BatchRequestQueue::isEmpty()
{
Lock sync(*this);
lock_guard lock(_mutex);
return _batchStream.b.size() == sizeof(requestBatchHdr);
}

void
BatchRequestQueue::waitStreamInUse(bool flush)
{
while(_batchStreamInUse && !(flush && _batchStreamCanFlush))
{
wait();
}
}

void
BatchRequestQueue::enqueueBatchRequest(const Ice::ObjectPrxPtr& proxy)
{
Expand Down
Loading

0 comments on commit 7415fd6

Please sign in to comment.