Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Gdax adding user channel #160

Merged
merged 7 commits into from
Oct 8, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected void initServices() {
public Completable connect(ProductSubscription... args) {
if (args == null || args.length == 0)
throw new UnsupportedOperationException("The ProductSubscription must be defined!");
this.streamingService = new GDAXStreamingService(API_URI);
this.streamingService = new GDAXStreamingService(API_URI, this);
this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService);
streamingService.subscribeMultipleCurrencyPairs(args);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package info.bitrich.xchangestream.gdax;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction;
import io.reactivex.Observable;
import static io.netty.util.internal.StringUtil.isNullOrEmpty;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptOrderBook;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptTicker;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptTradeHistory;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptTrades;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
Expand All @@ -13,14 +21,16 @@
import org.knowm.xchange.gdax.dto.marketdata.GDAXProductBook;
import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker;
import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade;
import org.knowm.xchange.gdax.dto.trade.GDAXFill;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import static io.netty.util.internal.StringUtil.isNullOrEmpty;
import static org.knowm.xchange.gdax.GDAXAdapters.*;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction;
import io.reactivex.Observable;

/**
* Created by luca on 4/3/17.
Expand Down Expand Up @@ -142,7 +152,11 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
.filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") &&
message.getProductId().equals(channelName))
.map(s -> {
Trades adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair);
Trades adaptedTrades = null;
if ( s.getUserId() != null )
adaptedTrades = adaptTradeHistory(new GDAXFill[]{s.toGDAXFill()});
Copy link
Collaborator

@badgerwithagun badgerwithagun Sep 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on something similar for Binance right now, and this bit gave me pause. Are you getting duplicated trades appearing where you get both the Trade from the matches channel and the UserTrade from the user channel?

I certainly do with Binance and it itched. The only way I could think of to resolve the issue looks something like this:

// To avoid duplication of user and public trades, we delay the public trades by one second
// and skip any trade ids which have recently been streamed as user trades. This isn't
// perfect, of course...
Cache<String, ? super Trade> recentTrades = CacheBuilder.newBuilder().maximumSize(50).build();
return publicTrades
  .concatMap(t -> Observable.just(t).delay(1, TimeUnit.SECONDS))
  .skipWhile(t -> recentTrades.asMap().containsKey(t.getId()))
  .mergeWith(
    getUserTrades()
      .filter(t -> t.getCurrencyPair().equals(currencyPair))
      .doOnNext(t -> recentTrades.put(t.getId(), t))
  );

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code replaces the Trade object with a UserTrade object so there wouldn't be a duplicate.

Honestly in my usage (external to this project) I'm just looking for the fills coming in (UserTrade) so it never hurt my bot if there was or wasn't a Trade reported via the regular trade feed.

I suppose if the record came in via the trade feed with no user information there would be a duplicate.

else
adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair);
return adaptedTrades.getTrades().get(0);
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package info.bitrich.xchangestream.gdax;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData;
import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.gdax.netty.WebSocketClientCompressionAllowClientNoContextHandler;
Expand All @@ -11,12 +22,6 @@
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.reactivex.Observable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class GDAXStreamingService extends JsonNettyStreamingService {
private static final Logger LOG = LoggerFactory.getLogger(GDAXStreamingService.class);
Expand All @@ -25,11 +30,13 @@ public class GDAXStreamingService extends JsonNettyStreamingService {
private static final String SHARE_CHANNEL_NAME = "ALL";
private final Map<String, Observable<JsonNode>> subscriptions = new HashMap<>();
private ProductSubscription product = null;
GDAXStreamingExchange exchange;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use private final


private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;

public GDAXStreamingService(String apiUrl) {
public GDAXStreamingService(String apiUrl, GDAXStreamingExchange exchange) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of cyclic class dependencies like this. There should contract "parent" <- "child" in this scenario. So if StreamingService needs something in particular, give it to it. But don't give it main object that holds everything - that leads to spaghetti.

super(apiUrl, Integer.MAX_VALUE);
this.exchange = exchange;
}

public ProductSubscription getProduct() {
Expand Down Expand Up @@ -66,15 +73,27 @@ protected String getChannelNameFromMessage(JsonNode message) {

@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product);
ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification();
GDAXWebsocketAuthData authData = null;
if ( exchangeSpec.getApiKey() != null ) {
GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService();
authData = rawAccountService.getWebsocketAuthData();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to include getWebsocketAuthData call into this particual method as well xchange-stream. User can call this by her own. Just let the user to pass websocet auth data to the xchange-stream (probably in connect() method?). By this you will have easier time with solving cyclic dependency issue as well as more flexible usage of the library.

}
GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product, authData);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(subscribeMessage);
}

@Override
public String getUnsubscribeMessage(String channelName) throws IOException {
ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification();
GDAXWebsocketAuthData authData = null;
if ( exchangeSpec.getApiKey() != null ) {
GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService();
authData = rawAccountService.getWebsocketAuthData();
}
GDAXWebSocketSubscriptionMessage subscribeMessage =
new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"});
new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}, authData);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this unsubscribe message also include "user" if authData != null? Not seen any side-effects myself in use but it seems symmetrical.

ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(subscribeMessage);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package info.bitrich.xchangestream.gdax.dto;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import info.bitrich.xchangestream.core.ProductSubscription;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -19,14 +21,20 @@ public class GDAXWebSocketSubscriptionMessage {
public static final String PRODUCT_IDS = "product_ids";
public static final String NAME = "name";

// if authenticating
public static final String SIGNATURE = "signature";
public static final String KEY = "key";
public static final String PASSPHRASE = "passphrase";
public static final String TIMESTAMP = "timestamp";

class GDAXProductSubsctiption {
@JsonProperty(NAME)
private String name;

@JsonProperty(PRODUCT_IDS)
private String[] productIds;

public GDAXProductSubsctiption(String name, String[] productIds) {
public GDAXProductSubsctiption(String name, String[] productIds, GDAXWebsocketAuthData authData) {
this.name = name;
this.productIds = productIds;
}
Expand All @@ -45,15 +53,31 @@ public String[] getProductIds() {

@JsonProperty(CHANNELS)
private GDAXProductSubsctiption[] channels;

public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product) {

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty(SIGNATURE)
String signature;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty(KEY)
String key;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty(PASSPHRASE)
String passphrase;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty(TIMESTAMP)
String timestamp;

public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product, GDAXWebsocketAuthData authData) {
this.type = type;
generateSubscriptionMessage(product);
generateSubscriptionMessage(product, authData);
}

public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames) {
public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames, GDAXWebsocketAuthData authData) {
this.type = type;
generateSubscriptionMessage(channelNames);
generateSubscriptionMessage(channelNames, authData);
}

private String[] generateProductIds(CurrencyPair[] pairs) {
Expand All @@ -65,39 +89,48 @@ private String[] generateProductIds(CurrencyPair[] pairs) {
return productIds.toArray(new String[productIds.size()]);
}

private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs) {
private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs, GDAXWebsocketAuthData authData) {
String[] productsIds;
productsIds = generateProductIds(pairs);
return new GDAXProductSubsctiption(name, productsIds);
return new GDAXProductSubsctiption(name, productsIds, authData);
}

private void generateSubscriptionMessage(String[] channelNames) {
private void generateSubscriptionMessage(String[] channelNames, GDAXWebsocketAuthData authData) {
List<GDAXProductSubsctiption> channels = new ArrayList<>(3);
for (String name : channelNames) {
channels.add(new GDAXProductSubsctiption(name, null));
channels.add(new GDAXProductSubsctiption(name, null, authData));
}

this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]);
}

private void generateSubscriptionMessage(ProductSubscription productSubscription) {
private void generateSubscriptionMessage(ProductSubscription productSubscription, GDAXWebsocketAuthData authData) {
List<GDAXProductSubsctiption> channels = new ArrayList<>(3);
Map<String, List<CurrencyPair>> pairs = new HashMap<>(3);

pairs.put("level2", productSubscription.getOrderBook());
pairs.put("ticker", productSubscription.getTicker());
pairs.put("matches", productSubscription.getTrades());
if ( authData != null )
pairs.put("user", productSubscription.getTrades());

for (Map.Entry<String, List<CurrencyPair>> product : pairs.entrySet()) {
List<CurrencyPair> currencyPairs = product.getValue();
if (currencyPairs == null || currencyPairs.size() == 0) {
continue;
}
GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()]));
GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()]), authData);
channels.add(gdaxProduct);
}

this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]);

if ( authData != null ) {
this.key = authData.getKey();
this.passphrase = authData.getPassphrase();
this.signature = authData.getSignature();
this.timestamp = String.valueOf(authData.getTimestamp());
}
}

public String getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.knowm.xchange.gdax.dto.marketdata.GDAXProductStats;
import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker;
import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade;
import org.knowm.xchange.gdax.dto.trade.GDAXFill;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -38,7 +39,12 @@ public class GDAXWebSocketTransaction {
private final String reason;
private final long tradeId;
private final String makerOrderId;
private final String takenOrderId;
private final String takerOrderId;

private final String takerUserId;
private final String userId;
private final String takerProfileId;
private final String profileId;

public GDAXWebSocketTransaction(
@JsonProperty("type") String type,
Expand All @@ -65,13 +71,17 @@ public GDAXWebSocketTransaction(
@JsonProperty("reason") String reason,
@JsonProperty("trade_id") long tradeId,
@JsonProperty("maker_order_id") String makerOrderId,
@JsonProperty("taken_order_id") String takenOrderId) {
@JsonProperty("taker_order_id") String takerOrderId,
@JsonProperty("taker_user_id") String takerUserId,
@JsonProperty("user_id") String userId,
@JsonProperty("taker_profile_id") String takerProfileId,
@JsonProperty("profile_id") String profileId) {

this.remainingSize = remainingSize;
this.reason = reason;
this.tradeId = tradeId;
this.makerOrderId = makerOrderId;
this.takenOrderId = takenOrderId;
this.takerOrderId = takerOrderId;
this.type = type;
this.orderId = orderId;
this.orderType = orderType;
Expand All @@ -92,6 +102,10 @@ public GDAXWebSocketTransaction(
this.productId = productId;
this.sequence = sequence;
this.time = time;
this.takerUserId = takerUserId;
this.userId = userId;
this.takerProfileId = takerProfileId;
this.profileId = profileId;
}

private String[][] GDAXOrderBookChanges(String side, String[][] changes, SortedMap<BigDecimal, String> sideEntries,
Expand Down Expand Up @@ -151,6 +165,11 @@ public GDAXTrade toGDAXTrade() {
return new GDAXTrade(time, tradeId, price, size, side);
}

public GDAXFill toGDAXFill() {
boolean taker = userId != null && takerUserId != null && userId.equals(takerUserId);
return new GDAXFill(String.valueOf(tradeId), productId, price, size, taker ? takerOrderId : makerOrderId, time, null, null, true, side);
}

public String getType() {
return type;
}
Expand Down Expand Up @@ -235,8 +254,31 @@ public String getMakerOrderId() {
return makerOrderId;
}

/**
* @deprecated Use {@link #getTakerOrderId()}
*/
public String getTakenOrderId() {
return takenOrderId;
return takerOrderId;
}

public String getTakerOrderId() {
return takerOrderId;
}

public String getTakerUserId() {
return takerUserId;
}

public String getUserId() {
return userId;
}

public String getTakerProfileId() {
return takerProfileId;
}

public String getProfileId() {
return profileId;
}

@Override
Expand All @@ -259,6 +301,14 @@ public String toString() {
sb.append(", productId='").append(productId).append('\'');
sb.append(", sequence=").append(sequence);
sb.append(", time='").append(time).append('\'');
if ( userId != null )
sb.append(", userId='").append(userId).append('\'');
if ( profileId != null )
sb.append(", profileId='").append(profileId).append('\'');
if ( takerUserId != null )
sb.append(", takerUserId='").append(takerUserId).append('\'');
if ( takerProfileId != null )
sb.append(", takerProfileId='").append(takerProfileId).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void testWebSocketMessageSerialization() throws JsonProcessingException {

ProductSubscription productSubscription = ProductSubscription.create().addOrderbook(CurrencyPair.BTC_USD)
.addTrades(CurrencyPair.BTC_USD).addTicker(CurrencyPair.BTC_USD).build();
GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription);
GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription, null);

final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
Expand Down