Skip to content

Commit

Permalink
Add HistoricalFeedPool and bug fixes in AbstractFeed
Browse files Browse the repository at this point in the history
  • Loading branch information
Petersoj committed Dec 31, 2021
1 parent fb2feec commit 32747a6
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 58 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ try {
}
```

#### [`HistoricalFeedPool`](src/main/java/net/jacobpeterson/iqfeed4j/feed/lookup/historical/pool/HistoricalFeedPool.java)
[`HistoricalFeedPool`](src/main/java/net/jacobpeterson/iqfeed4j/feed/lookup/historical/pool/HistoricalFeedPool.java) contains a thread-safe pool of [`HistoricalFeeds`](src/main/java/net/jacobpeterson/iqfeed4j/feed/lookup/historical/HistoricalFeed.java) and has IQFeed's history request rate limit built in. This class is designed to allow a user to make as many simultaneous [`HistoricalFeeds`](src/main/java/net/jacobpeterson/iqfeed4j/feed/lookup/historical/HistoricalFeed.java) requests that IQFeed allows.

### [`MarketSummaryFeed`](src/main/java/net/jacobpeterson/iqfeed4j/feed/lookup/marketsummary/MarketSummaryFeed.java)
This feed allows you to retrieve a current summary/snapshot of various market data, both historically and currently.

Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ dependencies {
implementation group: 'org.codehaus.woodstox', name: 'woodstox-core-asl', version: '4.4.1'
// For JDK 8 common Jackson (de)serializers
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.3'

// Apache Commons object pooling
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.11.1'
}

sourceCompatibility = 1.8
Expand Down
138 changes: 89 additions & 49 deletions src/main/java/net/jacobpeterson/iqfeed4j/IQFeed4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import net.jacobpeterson.iqfeed4j.feed.AbstractFeed;
import net.jacobpeterson.iqfeed4j.feed.lookup.AbstractLookupFeed;
import net.jacobpeterson.iqfeed4j.feed.lookup.historical.HistoricalFeed;
import net.jacobpeterson.iqfeed4j.feed.lookup.historical.pool.HistoricalFeedPool;
import net.jacobpeterson.iqfeed4j.feed.lookup.marketsummary.MarketSummaryFeed;
import net.jacobpeterson.iqfeed4j.feed.lookup.news.NewsFeed;
import net.jacobpeterson.iqfeed4j.feed.lookup.optionchains.OptionChainsFeed;
Expand All @@ -13,6 +14,8 @@
import net.jacobpeterson.iqfeed4j.feed.streaming.level1.Level1Feed;
import net.jacobpeterson.iqfeed4j.feed.streaming.marketdepth.MarketDepthFeed;
import net.jacobpeterson.iqfeed4j.properties.IQFeed4jProperties;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -47,6 +50,7 @@ public class IQFeed4j {
private DerivativeFeed derivativeFeed;
private AdminFeed adminFeed;
private HistoricalFeed historicalFeed;
private HistoricalFeedPool historicalFeedPool;
private MarketSummaryFeed marketSummaryFeed;
private NewsFeed newsFeed;
private OptionChainsFeed optionChainsFeed;
Expand Down Expand Up @@ -129,7 +133,7 @@ public IQFeed4j(String iqConnectCommand, String productID, String applicationVer
this.adminFeedPort = adminFeedPort;
this.lookupFeedPort = lookupFeedPort;

LOGGER.debug("{}", this);
LOGGER.trace("{}", this);
}

/**
Expand All @@ -151,171 +155,198 @@ public void stopIQConnect() {
/**
* Starts the {@link Level1Feed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startLevel1Feed() throws IOException, InterruptedException {
public void startLevel1Feed() throws IOException {
level1Feed = startFeed(level1Feed, () -> new Level1Feed(feedName, feedHostname, level1FeedPort));
}

/**
* Stops the {@link Level1Feed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopLevel1Feed() throws IOException, InterruptedException {
public void stopLevel1Feed() throws IOException {
stopFeed(level1Feed);
level1Feed = null;
}

/**
* Starts the {@link DerivativeFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startDerivativeFeed() throws IOException, InterruptedException {
public void startDerivativeFeed() throws IOException {
derivativeFeed = startFeed(derivativeFeed,
() -> new DerivativeFeed(feedName, feedHostname, derivativeFeedPort));
}

/**
* Stops the {@link DerivativeFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopDerivativeFeed() throws IOException, InterruptedException {
public void stopDerivativeFeed() throws IOException {
stopFeed(derivativeFeed);
derivativeFeed = null;
}

/**
* Starts the {@link AdminFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startAdminFeed() throws IOException, InterruptedException {
public void startAdminFeed() throws IOException {
adminFeed = startFeed(adminFeed, () -> new AdminFeed(feedName, feedHostname, adminFeedPort));
}

/**
* Stops the {@link AdminFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopAdminFeed() throws IOException, InterruptedException {
public void stopAdminFeed() throws IOException {
stopFeed(adminFeed);
adminFeed = null;
}

/**
* Starts the {@link HistoricalFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startHistoricalFeed() throws IOException, InterruptedException {
public void startHistoricalFeed() throws IOException {
historicalFeed = startFeed(historicalFeed, () -> new HistoricalFeed(feedName, feedHostname, lookupFeedPort));
}

/**
* Stops the {@link HistoricalFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopHistoricalFeed() throws IOException, InterruptedException {
public void stopHistoricalFeed() throws IOException {
stopFeed(historicalFeed);
historicalFeed = null;
}

/**
* Starts the {@link HistoricalFeedPool} instance, or does nothing if it's already started.
*/
public void startHistoricalFeedPool() {
if (historicalFeedPool == null) {
historicalFeedPool = new HistoricalFeedPool(feedName, feedHostname, lookupFeedPort);
}
}

/**
* Starts the {@link HistoricalFeedPool} instance with a {@link GenericObjectPoolConfig}, or does nothing if it's
* already started.
*
* @param feedPoolConfig the {@link HistoricalFeed} {@link GenericObjectPoolConfig}
*/
public void startHistoricalFeedPool(GenericObjectPoolConfig<HistoricalFeed> feedPoolConfig) {
if (historicalFeedPool == null) {
historicalFeedPool = new HistoricalFeedPool(feedName, feedHostname, lookupFeedPort, feedPoolConfig);
}
}

/**
* Starts the {@link HistoricalFeedPool} instance with a {@link GenericObjectPoolConfig}, or does nothing if it's
* already started.
*
* @param objectPool the {@link ObjectPool} of {@link HistoricalFeed}s to use
*/
public void startHistoricalFeedPool(ObjectPool<HistoricalFeed> objectPool) {
if (historicalFeedPool == null) {
historicalFeedPool = new HistoricalFeedPool(objectPool);
}
}

/**
* Stops the {@link HistoricalFeed} instance, or does nothing if it's already stopped.
*/
public void stopHistoricalFeedPool() {
if (historicalFeedPool != null) {
historicalFeedPool.stop();
}
historicalFeedPool = null;
}

/**
* Starts the {@link MarketSummaryFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startMarketSummaryFeed() throws IOException, InterruptedException {
public void startMarketSummaryFeed() throws IOException {
marketSummaryFeed = startFeed(marketSummaryFeed,
() -> new MarketSummaryFeed(feedName, feedHostname, lookupFeedPort));
}

/**
* Stops the {@link MarketSummaryFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopMarketSummaryFeed() throws IOException, InterruptedException {
public void stopMarketSummaryFeed() throws IOException {
stopFeed(marketSummaryFeed);
marketSummaryFeed = null;
}

/**
* Starts the {@link NewsFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startNewsFeed() throws IOException, InterruptedException {
public void startNewsFeed() throws IOException {
newsFeed = startFeed(newsFeed, () -> new NewsFeed(feedName, feedHostname, lookupFeedPort));
}

/**
* Stops the {@link NewsFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopNewsFeed() throws IOException, InterruptedException {
public void stopNewsFeed() throws IOException {
stopFeed(newsFeed);
newsFeed = null;
}

/**
* Starts the {@link OptionChainsFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startOptionChainsFeed() throws IOException, InterruptedException {
public void startOptionChainsFeed() throws IOException {
optionChainsFeed = startFeed(optionChainsFeed,
() -> new OptionChainsFeed(feedName, feedHostname, lookupFeedPort));
}

/**
* Stops the {@link OptionChainsFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopOptionChainsFeed() throws IOException, InterruptedException {
public void stopOptionChainsFeed() throws IOException {
stopFeed(optionChainsFeed);
optionChainsFeed = null;
}

/**
* Starts the {@link SymbolMarketInfoFeed} instance, or does nothing if it's already started.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void startSymbolMarketInfoFeed() throws IOException, InterruptedException {
public void startSymbolMarketInfoFeed() throws IOException {
symbolMarketInfoFeed = startFeed(symbolMarketInfoFeed,
() -> new SymbolMarketInfoFeed(feedName, feedHostname, lookupFeedPort));
}

/**
* Stops the {@link SymbolMarketInfoFeed} instance, or does nothing if it's already stopped.
*
* @throws IOException thrown for {@link IOException}s
* @throws InterruptedException thrown for {@link InterruptedException}s
* @throws IOException thrown for {@link IOException}s
*/
public void stopSymbolMarketInfoFeed() throws IOException, InterruptedException {
public void stopSymbolMarketInfoFeed() throws IOException {
stopFeed(symbolMarketInfoFeed);
symbolMarketInfoFeed = null;
}
Expand Down Expand Up @@ -403,6 +434,15 @@ public HistoricalFeed historical() {
return historicalFeed;
}

/**
* Gets {@link #historicalFeedPool}
*
* @return the {@link HistoricalFeedPool}
*/
public HistoricalFeedPool historicalPool() {
return historicalFeedPool;
}

/**
* Gets {@link #marketSummaryFeed}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public IQConnectExecutable(String iqConnectCommand, String productID, String app

disableInternalProcessLogging = false;

LOGGER.debug("{}", this);
LOGGER.trace("{}", this);
}

/**
Expand Down Expand Up @@ -152,7 +152,7 @@ private void createProcessReader() {
LOGGER.debug("IQConnect.exe process output: {}", line);
}
} catch (IOException exception) {
LOGGER.debug("Ignored IQConnect.exe process output reading error", exception);
LOGGER.error("IQConnect.exe process output reading error!", exception);
return;
}
}
Expand All @@ -174,12 +174,12 @@ public void stop() {

/**
* Calls {@link #waitForConnection(String, int, int, long)} with {@link IQFeed4jProperties#FEED_HOSTNAME} and {@link
* IQFeed4jProperties#ADMIN_FEED_PORT} and a <code>pollingInterval</code> of 250ms.
* IQFeed4jProperties#LOOKUP_FEED_PORT} and a <code>pollingInterval</code> of 250ms.
*
* @see #waitForConnection(String, int, int, long)
*/
public int waitForConnection(long timeoutMillis) throws TimeoutException {
return waitForConnection(IQFeed4jProperties.FEED_HOSTNAME, IQFeed4jProperties.ADMIN_FEED_PORT,
return waitForConnection(IQFeed4jProperties.FEED_HOSTNAME, IQFeed4jProperties.LOOKUP_FEED_PORT,
250, timeoutMillis);
}

Expand Down Expand Up @@ -214,6 +214,8 @@ public int waitForConnection(String hostname, int port, int pollingInterval, lon
executablePollingFeed.start();
executablePollingFeed.stop(); // This will execute upon successful 'start()'

Thread.sleep(500); // Sleep for a bit longer to ensure that IQConnect ready

return attempts;
} catch (InterruptedException interruptedException) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public void start() throws IOException {
}

socketThreadRunning = true;

socketThread = new Thread(this, feedName);
socketThread.start();
}
Expand Down Expand Up @@ -161,10 +160,8 @@ public void run() {
try {
String line = feedReader.readLine(); // Uses any line ending: CR, LF, or CRLF

if (line == null) { // End of stream has been reached (EOF was sent)
closeSocket();
onFeedSocketClose();
return;
if (line == null) {
throw new IOException("EOF was sent by server!");
} else {
logger.trace("Received message line: {}", line);

Expand Down
Loading

0 comments on commit 32747a6

Please sign in to comment.