Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev: draft for specifying a local ip address when sending a request #450

Merged
merged 9 commits into from
Oct 28, 2023
Merged
3 changes: 0 additions & 3 deletions app/src/single_order_execution/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ int main(int argc, char** argv) {
eventHandler.promisePtr = promisePtr;
#ifndef CCAPI_APP_IS_BACKTEST
SessionOptions sessionOptions;
sessionOptions.httpConnectionPoolIdleTimeoutMilliseconds = 1;
sessionOptions.httpMaxNumRetry = 0;
sessionOptions.httpMaxNumRedirect = 0;
SessionConfigs sessionConfigs;
Session session(sessionOptions, sessionConfigs, &eventHandler);
eventHandler.onInit(&session);
Expand Down
3 changes: 0 additions & 3 deletions app/src/spot_market_making/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ int main(int argc, char** argv) {
eventHandler.promisePtr = promisePtr;
#ifndef CCAPI_APP_IS_BACKTEST
SessionOptions sessionOptions;
sessionOptions.httpConnectionPoolIdleTimeoutMilliseconds = 1 + eventHandler.accountBalanceRefreshWaitSeconds;
sessionOptions.httpMaxNumRetry = 0;
sessionOptions.httpMaxNumRedirect = 0;
SessionConfigs sessionConfigs;
Session session(sessionOptions, sessionConfigs, &eventHandler);
eventHandler.onInit(&session);
Expand Down
2 changes: 1 addition & 1 deletion example/src/market_data_simple_subscription/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
set(NAME market_data_simple_subscription)
project(${NAME})
add_compile_definitions(CCAPI_ENABLE_SERVICE_MARKET_DATA)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_COINBASE)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_OKX)
add_executable(${NAME} main.cpp)
add_dependencies(${NAME} boost rapidjson)
2 changes: 1 addition & 1 deletion example/src/market_data_simple_subscription/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ int main(int argc, char** argv) {
SessionConfigs sessionConfigs;
MyEventHandler eventHandler;
Session session(sessionOptions, sessionConfigs, &eventHandler);
Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH");
Subscription subscription("okx", "BTC-USDT", "MARKET_DEPTH");
session.subscribe(subscription);
std::this_thread::sleep_for(std::chrono::seconds(10));
session.stop();
Expand Down
4 changes: 3 additions & 1 deletion include/ccapi_cpp/ccapi_http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ class HttpConnection CCAPI_FINAL {
std::string toString() const {
std::ostringstream oss;
oss << streamPtr;
std::string output = "HttpConnection [host = " + host + ", port = " + port + ", streamPtr = " + oss.str() + "]";
std::string output = "HttpConnection [host = " + host + ", port = " + port + ", streamPtr = " + oss.str() +
", lastReceiveDataTp = " + UtilTime::getISOTimestamp(lastReceiveDataTp) + "]";
return output;
}
std::string host;
std::string port;
std::shared_ptr<beast::ssl_stream<beast::tcp_stream> > streamPtr;
TimePoint lastReceiveDataTp{std::chrono::seconds{0}};
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_HTTP_CONNECTION_H_
9 changes: 3 additions & 6 deletions include/ccapi_cpp/ccapi_macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@
#ifndef CCAPI_ORDER_PRICE_TIMES_QUANTITY_MIN
#define CCAPI_ORDER_PRICE_TIMES_QUANTITY_MIN "PRICE_TIMES_QUANTITY_MIN"
#endif
#ifndef CCAPI_INSTRUMENT_STATUS
#define CCAPI_INSTRUMENT_STATUS "INSTRUMENT_STATUS"
#endif
#ifndef CCAPI_CONTRACT_SIZE
#define CCAPI_CONTRACT_SIZE "CONTRACT_SIZE"
#endif
Expand Down Expand Up @@ -1004,12 +1007,6 @@
#ifndef CCAPI_BINANCE_API_SECRET
#define CCAPI_BINANCE_API_SECRET "BINANCE_API_SECRET"
#endif
// #ifndef CCAPI_BINANCE_MARGIN_API_KEY
// #define CCAPI_BINANCE_MARGIN_API_KEY "BINANCE_MARGIN_API_KEY"
// #endif
// #ifndef CCAPI_BINANCE_MARGIN_API_SECRET
// #define CCAPI_BINANCE_MARGIN_API_SECRET "BINANCE_MARGIN_API_SECRET"
// #endif
#ifndef CCAPI_BINANCE_USDS_FUTURES_API_KEY
#define CCAPI_BINANCE_USDS_FUTURES_API_KEY "BINANCE_USDS_FUTURES_API_KEY"
#endif
Expand Down
32 changes: 31 additions & 1 deletion include/ccapi_cpp/ccapi_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class Request CCAPI_FINAL {
", correlationId = " + correlationId + ", secondaryCorrelationId = " + secondaryCorrelationId +
(this->serviceName == CCAPI_FIX ? ", paramListFix = " + ccapi::toString(paramListFix) : ", paramList = " + ccapi::toString(paramList)) +
", credential = " + ccapi::toString(shortCredential) + ", operation = " + operationToString(operation) +
", timeSent = " + UtilTime::getISOTimestamp(timeSent) + "]";
", timeSent = " + UtilTime::getISOTimestamp(timeSent) + ", index = " + ccapi::toString(index) + ", localIpAddress = " + localIpAddress +
", baseUrl = " + baseUrl + "]";
return output;
}
const std::string& getCorrelationId() const { return correlationId; }
Expand Down Expand Up @@ -181,11 +182,36 @@ class Request CCAPI_FINAL {
std::pair<long long, long long> getTimeSentPair() const { return UtilTime::divide(timeSent); }
void setTimeSent(TimePoint timeSent) { this->timeSent = timeSent; }
int getIndex() const { return index; }
const std::string& getLocalIpAddress() const { return localIpAddress; }
const std::string& getBaseUrl() const { return baseUrl; }
const std::string& getHost() const { return host; }
const std::string& getPort() const { return port; }
void setIndex(int index) { this->index = index; }
void setCredential(const std::map<std::string, std::string>& credential) { this->credential = credential; }
void setCorrelationId(const std::string& correlationId) { this->correlationId = correlationId; }
void setSecondaryCorrelationId(const std::string& secondaryCorrelationId) { this->secondaryCorrelationId = secondaryCorrelationId; }
void setMarginType(const std::string& marginType) { this->marginType = marginType; }
void setLocalIpAddress(const std::string& localIpAddress) { this->localIpAddress = localIpAddress; }
void setBaseUrl(const std::string& baseUrl) {
this->baseUrl = baseUrl;
this->setBaseUrlParts();
}
void setBaseUrlParts() {
auto splitted1 = UtilString::split(this->baseUrl, "://");
if (splitted1.size() >= 2) {
auto splitted2 = UtilString::split(UtilString::split(splitted1.at(1), "/").at(0), ":");
this->host = splitted2.at(0);
if (splitted2.size() == 2) {
this->port = splitted2.at(1);
} else {
if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") {
this->port = CCAPI_HTTPS_PORT_DEFAULT;
} else {
this->port = CCAPI_HTTP_PORT_DEFAULT;
}
}
}
}
#ifndef CCAPI_EXPOSE_INTERNAL

private:
Expand All @@ -202,6 +228,10 @@ class Request CCAPI_FINAL {
std::vector<std::vector<std::pair<int, std::string> > > paramListFix;
TimePoint timeSent{std::chrono::seconds{0}};
int index{};
std::string localIpAddress;
std::string baseUrl;
std::string host;
std::string port;
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_REQUEST_H_
54 changes: 34 additions & 20 deletions include/ccapi_cpp/ccapi_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,27 @@ class Session {
Session(const Session&) = delete;
Session& operator=(const Session&) = delete;
Session(const SessionOptions& sessionOptions = SessionOptions(), const SessionConfigs& sessionConfigs = SessionConfigs(),
EventHandler* eventHandler = nullptr, EventDispatcher* eventDispatcher = nullptr)
EventHandler* eventHandler = nullptr, EventDispatcher* eventDispatcher = nullptr
#ifndef SWIG
,
ServiceContext* serviceContextPtr = nullptr
#endif
)
: sessionOptions(sessionOptions),
sessionConfigs(sessionConfigs),
eventHandler(eventHandler),
#ifndef CCAPI_USE_SINGLE_THREAD
eventDispatcher(eventDispatcher),
#endif
eventQueue(sessionOptions.maxEventQueueSize),
serviceContextPtr(new ServiceContext()) {
eventQueue(sessionOptions.maxEventQueueSize)
#ifndef SWIG
,
serviceContextPtr(serviceContextPtr)
#endif
{
if (!this->serviceContextPtr) {
this->serviceContextPtr = new ServiceContext();
}
CCAPI_LOGGER_FUNCTION_ENTER;
#ifndef CCAPI_USE_SINGLE_THREAD
if (this->eventHandler) {
Expand All @@ -291,6 +303,7 @@ class Session {
delete this->eventDispatcher;
}
#endif
delete this->serviceContextPtr;
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void start() {
Expand Down Expand Up @@ -637,19 +650,19 @@ class Session {
return;
}
if (serviceName == CCAPI_MARKET_DATA) {
std::set<std::string> correlationIdSet;
std::set<std::string> duplicateCorrelationIdSet;
// std::set<std::string> correlationIdSet;
// std::set<std::string> duplicateCorrelationIdSet;
std::unordered_set<std::string> unsupportedExchangeFieldSet;
std::map<std::string, std::vector<Subscription> > subscriptionListByExchangeMap;
auto exchangeFieldMap = this->sessionConfigs.getExchangeFieldMap();
CCAPI_LOGGER_DEBUG("exchangeFieldMap = " + toString(exchangeFieldMap));
for (const auto& subscription : subscriptionList) {
auto correlationId = subscription.getCorrelationId();
if (correlationIdSet.find(correlationId) != correlationIdSet.end()) {
duplicateCorrelationIdSet.insert(correlationId);
} else {
correlationIdSet.insert(correlationId);
}
// auto correlationId = subscription.getCorrelationId();
// if (correlationIdSet.find(correlationId) != correlationIdSet.end()) {
// duplicateCorrelationIdSet.insert(correlationId);
// } else {
// correlationIdSet.insert(correlationId);
// }
auto exchange = subscription.getExchange();
CCAPI_LOGGER_DEBUG("exchange = " + exchange);
auto field = subscription.getField();
Expand All @@ -663,11 +676,11 @@ class Session {
}
subscriptionListByExchangeMap[exchange].push_back(subscription);
}
if (!duplicateCorrelationIdSet.empty()) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE,
"duplicated correlation ids: " + toString(duplicateCorrelationIdSet));
return;
}
// if (!duplicateCorrelationIdSet.empty()) {
// this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE,
// "duplicated correlation ids: " + toString(duplicateCorrelationIdSet));
// return;
// }
if (!unsupportedExchangeFieldSet.empty()) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE,
"unsupported exchange fields: " + toString(unsupportedExchangeFieldSet));
Expand Down Expand Up @@ -902,7 +915,8 @@ class Session {
virtual void setTimer(const std::string& id, long delayMilliseconds, std::function<void(const boost::system::error_code&)> errorHandler,
std::function<void()> successHandler) {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id, delayMilliseconds, errorHandler, successHandler]() {
std::shared_ptr<steady_timer> timerPtr(new steady_timer(*this->serviceContextPtr->ioContextPtr, boost::asio::chrono::milliseconds(delayMilliseconds)));
std::shared_ptr<boost::asio::steady_timer> timerPtr(
new boost::asio::steady_timer(*this->serviceContextPtr->ioContextPtr, boost::asio::chrono::milliseconds(delayMilliseconds)));
timerPtr->async_wait([this, id, errorHandler, successHandler](const boost::system::error_code& ec) {
if (this->eventHandler) {
#ifdef CCAPI_USE_SINGLE_THREAD
Expand Down Expand Up @@ -963,12 +977,12 @@ class Session {
#endif
SessionOptions sessionOptions;
SessionConfigs sessionConfigs;
EventHandler* eventHandler;
EventHandler* eventHandler{nullptr};
#ifndef CCAPI_USE_SINGLE_THREAD
EventDispatcher* eventDispatcher;
EventDispatcher* eventDispatcher{nullptr};
bool useInternalEventDispatcher{};
#endif
std::shared_ptr<ServiceContext> serviceContextPtr;
ServiceContext* serviceContextPtr{nullptr};
std::map<std::string, std::map<std::string, std::shared_ptr<Service> > > serviceByServiceNameExchangeMap;
std::thread t;
Queue<Event> eventQueue;
Expand Down
2 changes: 0 additions & 2 deletions include/ccapi_cpp/ccapi_session_configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ class SessionConfigs CCAPI_FINAL {
{CCAPI_EXCHANGE_NAME_BITMEX, CCAPI_BITMEX_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_US, CCAPI_BINANCE_US_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE, CCAPI_BINANCE_URL_WS_BASE},
// {CCAPI_EXCHANGE_NAME_BINANCE_MARGIN, CCAPI_BINANCE_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_USDS_FUTURES, CCAPI_BINANCE_USDS_FUTURES_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_COIN_FUTURES, CCAPI_BINANCE_COIN_FUTURES_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_HUOBI, CCAPI_HUOBI_URL_WS_BASE},
Expand Down Expand Up @@ -374,7 +373,6 @@ class SessionConfigs CCAPI_FINAL {
{CCAPI_EXCHANGE_NAME_BITMEX, CCAPI_BITMEX_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_US, CCAPI_BINANCE_US_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE, CCAPI_BINANCE_URL_REST_BASE},
// {CCAPI_EXCHANGE_NAME_BINANCE_MARGIN, CCAPI_BINANCE_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_USDS_FUTURES, CCAPI_BINANCE_USDS_FUTURES_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_COIN_FUTURES, CCAPI_BINANCE_COIN_FUTURES_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_HUOBI, CCAPI_HUOBI_URL_REST_BASE},
Expand Down
8 changes: 4 additions & 4 deletions include/ccapi_cpp/ccapi_session_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SessionOptions CCAPI_FINAL {
", httpMaxNumRedirect = " + ccapi::toString(httpMaxNumRedirect) +
", httpRequestTimeoutMilliseconds = " + ccapi::toString(httpRequestTimeoutMilliseconds) +
", httpConnectionPoolMaxSize = " + ccapi::toString(httpConnectionPoolMaxSize) +
", httpConnectionPoolIdleTimeoutMilliseconds = " + ccapi::toString(httpConnectionPoolIdleTimeoutMilliseconds) +
", httpConnectionKeepAliveTimeoutSeconds = " + ccapi::toString(httpConnectionKeepAliveTimeoutSeconds) +
", enableOneHttpConnectionPerRequest = " + ccapi::toString(enableOneHttpConnectionPerRequest) + "]";
return output;
}
Expand All @@ -50,9 +50,9 @@ class SessionOptions CCAPI_FINAL {
int httpMaxNumRedirect{1};
long httpRequestTimeoutMilliseconds{10000};
int httpConnectionPoolMaxSize{1}; // used to set the maximal number of http connections to be kept in the pool (connections in the pool are idle)
long httpConnectionPoolIdleTimeoutMilliseconds{0}; // used to purge the http connection pool if all connections in the
// pool have stayed idle for at least this amount of time
bool enableOneHttpConnectionPerRequest{}; // create a new http connection for each request
long httpConnectionKeepAliveTimeoutSeconds{
10}; // used to remove a http connection from the http connection pool if it has stayed idle for at least this amount of time
bool enableOneHttpConnectionPerRequest{}; // create a new http connection for each request
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
long websocketConnectTimeoutMilliseconds{10000};
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Subscription CCAPI_FINAL {
}
return output;
}
const std::string getSerializedCredential() const { return ::ccapi::toString(this->credential); }
const std::string getSerializedCredential() const { return ccapi::toString(this->credential); }
// 'getTimeSent' only works in C++. For other languages, please use 'getTimeSentISO'.
TimePoint getTimeSent() const { return timeSent; }
std::string getTimeSentISO() const { return UtilTime::getISOTimestamp(timeSent); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService {
this->baseUrlRest = sessionConfigs.getUrlRestBase().at(this->exchangeName);
this->setHostRestFromUrlRest(this->baseUrlRest);
this->setHostWsFromUrlWs(this->baseUrlWs);
try {
this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
try {
this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#endif
// try {
// this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
// #else
// try {
// this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #endif
this->apiKeyName = CCAPI_ASCENDEX_API_KEY;
this->apiSecretName = CCAPI_ASCENDEX_API_SECRET;
this->apiAccountGroupName = CCAPI_ASCENDEX_API_ACCOUNT_GROUP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ class ExecutionManagementServiceBinance : public ExecutionManagementServiceBinan
this->baseUrlRest = sessionConfigs.getUrlRestBase().at(this->exchangeName);
this->setHostRestFromUrlRest(this->baseUrlRest);
this->setHostWsFromUrlWs(this->baseUrlWs);
try {
this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
try {
this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#endif
// try {
// this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
// #else
// try {
// this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #endif
this->apiKeyName = CCAPI_BINANCE_API_KEY;
this->apiSecretName = CCAPI_BINANCE_API_SECRET;
this->setupCredential({this->apiKeyName, this->apiSecretName});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ class ExecutionManagementServiceBinanceCoinFutures : public ExecutionManagementS
this->baseUrlRest = sessionConfigs.getUrlRestBase().at(this->exchangeName);
this->setHostRestFromUrlRest(this->baseUrlRest);
this->setHostWsFromUrlWs(this->baseUrlWs);
try {
this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
try {
this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#endif
// try {
// this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
// #else
// try {
// this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #endif
this->apiKeyName = CCAPI_BINANCE_COIN_FUTURES_API_KEY;
this->apiSecretName = CCAPI_BINANCE_COIN_FUTURES_API_SECRET;
this->setupCredential({this->apiKeyName, this->apiSecretName});
Expand Down
Loading