Skip to content

Commit

Permalink
Upgrade Python extension to use the new C++ mapping (#1727)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Jan 29, 2024
1 parent f1e1bac commit c1e512e
Show file tree
Hide file tree
Showing 39 changed files with 1,003 additions and 1,621 deletions.
2 changes: 0 additions & 2 deletions cpp/include/Ice/AsyncResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ class ICE_API AsyncResult : private IceUtil::noncopyable, public Ice::LocalObjec
virtual void run() = 0;
};
typedef IceUtil::Handle<Callback> CallbackPtr;

virtual void _scheduleCallback(const CallbackPtr&) = 0;
/// \endcond

protected:
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/Ice/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ class ICE_CLASS(ICE_API) Communicator
*/
virtual dispatch_queue_t getServerDispatchQueue() const = 0;
#endif

virtual void postToClientThreadPool(::std::function<void()> call) = 0;
};

}
Expand Down
2 changes: 0 additions & 2 deletions cpp/include/Ice/OutgoingAsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ class ICE_API OutgoingAsyncBase : public virtual OutgoingAsyncCompletionCallback
virtual void _readEmptyParams();
virtual void _readParamEncaps(const ::Ice::Byte*&, ::Ice::Int&);
virtual void _throwUserException();

virtual void _scheduleCallback(const CallbackPtr&);
#endif

void attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, Ice::Int requestId)
Expand Down
6 changes: 4 additions & 2 deletions cpp/include/Ice/Proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ class InvokePromiseOutgoing : public InvokeOutgoingAsyncT<R>, public PromiseInvo
{
if(this->_is.b.empty())
{
this->_promise.set_value(R { ok, { 0, 0 }});
std::vector<Ice::Byte> encaps;
this->_promise.set_value(R { ok, encaps});
}
else
{
Expand All @@ -200,7 +201,8 @@ class InvokePromiseOutgoing : public InvokeOutgoingAsyncT<R>, public PromiseInvo
{
if(done)
{
this->_promise.set_value(R { true, { 0, 0 }});
std::vector<Ice::Byte> encaps;
this->_promise.set_value(R { true, encaps});
}
return false;
}
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/Ice/CommunicatorI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
#include <Ice/LocalException.h>
#include <Ice/DefaultsAndOverrides.h>
#include <Ice/TraceLevels.h>
#include <Ice/ThreadPool.h>
#include <Ice/Router.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/UUID.h>
#ifdef ICE_SWIFT
# include <Ice/ThreadPool.h>
#endif

using namespace std;
using namespace Ice;
Expand Down Expand Up @@ -377,6 +375,12 @@ Ice::CommunicatorI::getServerDispatchQueue() const

#endif

void
Ice::CommunicatorI::postToClientThreadPool(function<void()> call)
{
_instance->clientThreadPool()->dispatch(call);
}

namespace
{

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/Ice/CommunicatorI.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <Ice/CommunicatorAsync.h>
#include <Ice/OutgoingAsync.h>

#include <functional>

namespace IceInternal
{

Expand Down Expand Up @@ -98,6 +100,8 @@ class CommunicatorI : public Communicator
virtual dispatch_queue_t getServerDispatchQueue() const;
#endif

virtual void postToClientThreadPool(::std::function<void()> call);

#ifdef ICE_CPP11_MAPPING
virtual ::std::function<void()>
flushBatchRequestsAsync(CompressBatch,
Expand Down
33 changes: 0 additions & 33 deletions cpp/src/Ice/OutgoingAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,39 +525,6 @@ OutgoingAsyncBase::_throwUserException()
}
}

void
OutgoingAsyncBase::_scheduleCallback(const CallbackPtr& cb)
{
//
// NOTE: for internal use only. This should only be called when the invocation has
// completed. Accessing _cachedConnection is not safe otherwise.
//

class WorkItem : public DispatchWorkItem
{
public:

WorkItem(const ConnectionPtr& connection, const CallbackPtr& cb) :
DispatchWorkItem(connection), _cb(cb)
{
}

virtual void run()
{
_cb->run();
}

private:

CallbackPtr _cb;
};

//
// CommunicatorDestroyedException is the only exception that can propagate directly from this method.
//
_instance->clientThreadPool()->dispatch(new WorkItem(_cachedConnection, cb));
}

#endif

void
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/Ice/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,33 @@ IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem)
_workQueue->queue(workItem);
}

void
IceInternal::ThreadPool::dispatch(function<void()> call)
{
class WorkItem final : public IceInternal::DispatchWorkItem
{
public:

WorkItem(function<void()> call)
: _call(std::move(call))
{
}

void run() final
{
_call();
}

private:

function<void()> _call;

};

DispatchWorkItemPtr workItem = new WorkItem(std::move(call));
dispatch(workItem);
}

void
IceInternal::ThreadPool::joinWithAllThreads()
{
Expand Down
1 change: 1 addition & 0 deletions cpp/src/Ice/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class ThreadPool : public IceUtil::Shared, private IceUtil::Monitor<IceUtil::Mut

void dispatchFromThisThread(const DispatchWorkItemPtr&);
void dispatch(const DispatchWorkItemPtr&);
void dispatch(std::function<void()>);

void joinWithAllThreads();

Expand Down
6 changes: 3 additions & 3 deletions python/modules/IcePy/BatchRequestInterceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ batchRequestGetProxy(BatchRequestObject* self, PyObject* /*args*/)
assert(self->request);
if(!self->proxy)
{
Ice::ObjectPrx proxy;
shared_ptr<Ice::ObjectPrx> proxy;
try
{
proxy = self->request->getProxy();
Expand Down Expand Up @@ -227,7 +227,7 @@ IcePy::initBatchRequest(PyObject* module)
return true;
}

IcePy::BatchRequestInterceptor::BatchRequestInterceptor(PyObject* interceptor) : _interceptor(interceptor)
IcePy::BatchRequestInterceptorWrapper::BatchRequestInterceptorWrapper(PyObject* interceptor) : _interceptor(interceptor)
{
if(!PyCallable_Check(interceptor) && !PyObject_HasAttrString(interceptor, STRCAST("enqueue")))
{
Expand All @@ -239,7 +239,7 @@ IcePy::BatchRequestInterceptor::BatchRequestInterceptor(PyObject* interceptor) :
}

void
IcePy::BatchRequestInterceptor::enqueue(const Ice::BatchRequest& request, int queueCount, int queueSize)
IcePy::BatchRequestInterceptorWrapper::enqueue(const Ice::BatchRequest& request, int queueCount, int queueSize)
{
AdoptThread adoptThread; // Ensure the current thread is able to call into Python.

Expand Down
9 changes: 5 additions & 4 deletions python/modules/IcePy/BatchRequestInterceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,27 @@
#include <Util.h>
#include <Ice/BatchRequestInterceptor.h>

#include <memory>

namespace IcePy
{

extern PyTypeObject BatchRequestType;

bool initBatchRequest(PyObject*);

class BatchRequestInterceptor : public Ice::BatchRequestInterceptor
class BatchRequestInterceptorWrapper final
{
public:

BatchRequestInterceptor(PyObject*);
BatchRequestInterceptorWrapper(PyObject*);

virtual void enqueue(const Ice::BatchRequest&, int, int);
void enqueue(const Ice::BatchRequest&, int, int);

private:

PyObjectHandle _interceptor;
};
typedef IceUtil::Handle<BatchRequestInterceptor> BatchRequestInterceptorPtr;

}

Expand Down
Loading

0 comments on commit c1e512e

Please sign in to comment.