Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[Bitmex] There is a bug on orderChanges endpoint #497

Open
makarid opened this issue Jan 9, 2020 · 11 comments
Open

[Bitmex] There is a bug on orderChanges endpoint #497

makarid opened this issue Jan 9, 2020 · 11 comments

Comments

@makarid
Copy link
Contributor

makarid commented Jan 9, 2020

Hello, the implementation of orderChanges in BitmexStreamingTradeService has a bug when an order has been filled. When Bitmex sends a FILLED order message, it doesn't include many required fields in order to create a new LimitOrder and send it to the stream. These data are null from the response:
side,price,ordType and orderQty have null values and because of that a NullPointException occures.
Here is the output which i have encounter:

2020-01-09 16:54:37 INFO BitmexStreamingMarketDataService:37 - Bitmex connection succeeded. Clearing orderbooks.
2020-01-09 16:54:37 INFO BitmexStreamingService:312 - Subscribing to channel order
BitmexOrder{orderID='9f2f8774-0573-9d3c-a45c-216ed7eabadd', account=4644, side='Buy', price=7907.5, avgPx=null, ordType='Limit', ordStatus=NEW, clOrdID='', orderQty=1, cumQty=0, workingIndicator=false, timestamp='2020-01-09T14:55:10.125Z', symbol='XBTUSD'}
LimitOrder [limitPrice=null, Order [type=BID, originalAmount=1, cumulativeAmount=0, averagePrice=null, fee=null, currencyPair=XBT/USD, id=9f2f8774-0573-9d3c-a45c-216ed7eabadd, timestamp=null, status=NEW, flags=[], userReference=null]]
BitmexOrder{orderID='9f2f8774-0573-9d3c-a45c-216ed7eabadd', account=4644, side='null', price=null, avgPx=7905, ordType='null', ordStatus=FILLED, clOrdID='', orderQty=null, cumQty=1, workingIndicator=false, timestamp='2020-01-09T14:55:10.125Z', symbol='XBTUSD'}
null

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.NullPointerException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onError(ObservableFlattenIterable.java:125)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:81)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:395)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:358)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:129)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:33)
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:96)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at info.bitrich.xchangestream.bitmex.dto.BitmexOrder.toOrder(BitmexOrder.java:85)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:442)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at info.bitrich.xchangestream.bitmex.BitmexStreamingTradeService.lambda$getOrderChanges$2(BitmexStreamingTradeService.java:39)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:77)
... 45 more
Exception in thread "nioEventLoopGroup-2-1" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.NullPointerException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onError(ObservableFlattenIterable.java:125)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:81)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:395)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:358)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:129)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:33)
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:96)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at info.bitrich.xchangestream.bitmex.dto.BitmexOrder.toOrder(BitmexOrder.java:85)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:442)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at info.bitrich.xchangestream.bitmex.BitmexStreamingTradeService.lambda$getOrderChanges$2(BitmexStreamingTradeService.java:39)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:77)
... 45 more

@makarid
Copy link
Contributor Author

makarid commented Jan 9, 2020

How can we send a limitOrder/marketOrder to the stream in which we don't have the side field?

@mdvx
Copy link
Contributor

mdvx commented Jan 10, 2020

You may notice that Kraken too will send orderChanges with out a side or currencyPair, this is because the updates are deltas, and the fields did not change. Maybe this is a larger discussing, because 'fixing' this requires an order cache. @badgerwithagun

@makarid
Copy link
Contributor Author

makarid commented Jan 10, 2020

Indeed, we need to discuss about it. I am waiting for @badgerwithagun input.

@makarid
Copy link
Contributor Author

makarid commented Jan 11, 2020

I suggest to make an openOrderList cache and everytime an update occurs to send the updated list of openOrders. So instead of having Observable.Order we should have Observable.OpenOrders. What do you thing? Because openOrders updates depend to one another and they are not like userTrades which they are one time only events.

@mdvx
Copy link
Contributor

mdvx commented Jan 12, 2020

How do we handle delta orderbook updates? should we try and use the same pattern. I am not sold on the idea of caching order at the exchange level, because it requires synchronisation (however if we could do something where the Exchange holds a cache instance, then different exchanges could implement different caching strategues.

synchronisation: When we have updates with 2+ fields (without sync, the order state is temporarily invalid)

@makarid
Copy link
Contributor Author

makarid commented Jan 12, 2020

What do you mean temporarily invalid? If we save a local copy of a List of openOrders, every update that will come from the stream will update this list. So even if there are 2 updates for the same order, they will be 2 separate messages and we will handle with FIFO. Can you please give an example? Thanks

@mdvx
Copy link
Contributor

mdvx commented Jan 13, 2020

lets say you have an order, that get an update to executedQuantity, averagePrice and state: you can:
A) copy the existing order in the cache, update the copy, and replace the orignal order with the copy.

  1. This can leave hanging (previous) order objects
    B) sync(lock) the exiting order, upate all the fields, release lock
  2. lots of locking, but probably ok
    C) Other means?
    but basically, my thought is cache should operate in a maner that best matches the exchanges update cycle (Still one cache per exchange, but maybe cache for Kraken is different to BitMex)

@makarid
Copy link
Contributor Author

makarid commented Jan 13, 2020 via email

@mdvx
Copy link
Contributor

mdvx commented Jan 13, 2020

When we broadcast an OpenOrder object, we broadcast a reference to an OpenOrder object.

When another thread gets this broadcast it recieves the reference to the OpenOrder object, which might now have processed next updates, or be in the middle of processing them, or not. The OpenOrder is in an indeterminate state.

@mdvx
Copy link
Contributor

mdvx commented Jan 13, 2020

Whereas the are always going to some states where and order is expected to be in situe (PENDING,WORKING) there are other states are considered to final and non-changeable (FILLED, DONE, CANCELED). This allows the algos to make decisions abount sending the next set of orders.

@makarid
Copy link
Contributor Author

makarid commented Jan 13, 2020 via email

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants