Skip to content

Commit

Permalink
Added timeout for Dukascopy
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed Apr 4, 2020
1 parent 7b03fe7 commit 59845cb
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ In findatapy/examples you will find several demos

# Coding log

* 04 Apr 2020
* Added timeout for Dukascopy download
* 14 Mar 2020
* Fixed bug with downloading short intervals of Dukascopy tick data
* 20 Feb 2020
Expand Down
41 changes: 28 additions & 13 deletions findatapy/market/datavendorweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,8 @@ def _calc_period_size(freq, start_dt, finish_dt):
# for logging and constants
from findatapy.util import ConfigManager, DataConstants, LoggerManager

constants = DataConstants()

class DataVendorDukasCopy(DataVendor):
"""Class for downloading tick data from DukasCopy (note: past month of data is not available). Selecting very large
histories is not recommended as you will likely run out memory given the amount of data requested.
Expand Down Expand Up @@ -1334,16 +1336,29 @@ def download_tick(self, market_data_request):
time_list = self.hour_range(market_data_request.start_date, market_data_request.finish_date)

do_retrieve_df = True # convert inside loop?
multi_threaded = True # multithreading (can sometimes get errors but it's fine when retried, avoid using
multi_threaded = constants.dukascopy_multithreading # multithreading (can sometimes get errors but it's fine when retried, avoid using)

if multi_threaded:
# use threading (not multiprocess interface, which has issues with dukascopy download)
pool = SwimPool().create_pool('thread', DataConstants().market_thread_no['dukascopy'])
results = [pool.apply_async(self.fetch_file, args=(time, symbol, do_retrieve_df,)) for time in time_list]
tick_list = [p.get() for p in results]

pool.close()
pool.join()
completed = False

for i in range(0, 5):
# Use threading (not multiprocess interface, which has issues with dukascopy download)
pool = SwimPool().create_pool('thread', constants.market_thread_no['dukascopy'])
results = [pool.apply_async(self.fetch_file, args=(time, symbol, do_retrieve_df,)) for time in time_list]

logger.debug("Attempting Dukascopy download " + str(i) + "... ")
tick_list = [p.get(timeout=constants.timeout_downloader['dukascopy']) for p in results]

pool.close()
pool.join()

completed = True

break

if not(completed):
logger.warning("Failed to download from Dukascopy after several attempts")

else:
# fully single threaded
Expand Down Expand Up @@ -1394,11 +1409,11 @@ def fetch_file(self, time, symbol, do_retrieve_df):
hour = str(time.hour).rjust(2, '0')
)

tick = self.fetch_tick(DataConstants().dukascopy_base_url + tick_path)
tick = self.fetch_tick(constants.dukascopy_base_url + tick_path)

#print(tick_path)
if DataConstants().dukascopy_write_temp_tick_disk:
out_path = DataConstants().temp_folder + "/dkticks/" + tick_path
if constants.dukascopy_write_temp_tick_disk:
out_path = constants.temp_folder + "/dkticks/" + tick_path

if not os.path.exists(out_path):
if not os.path.exists(os.path.dirname(out_path)):
Expand Down Expand Up @@ -1641,7 +1656,7 @@ def download_tick(self, market_data_request):
week_list = self.week_range(market_data_request.start_date, market_data_request.finish_date)
from findatapy.util import SwimPool

pool = SwimPool().create_pool('thread', DataConstants().market_thread_no['fxcm'])
pool = SwimPool().create_pool('thread', constants.market_thread_no['fxcm'])
results = [pool.apply_async(self.fetch_file, args=(week, symbol)) for week in week_list]
df_list = [p.get() for p in results]
pool.close()
Expand All @@ -1659,7 +1674,7 @@ def fetch_file(self, week_year, symbol):

tick_path = symbol + '/' + str(year) + '/' + str(week) + self.url_suffix

return self.retrieve_df(DataConstants().fxcm_base_url + tick_path)
return self.retrieve_df(constants.fxcm_base_url + tick_path)

def parse_datetime(self):
pass
Expand Down Expand Up @@ -2407,7 +2422,7 @@ def load_ticker(self, market_data_request):
def download(self, market_data_request):
trials = 0

con = fxcmpy.fxcmpy(access_token=DataConstants().fxcm_API, log_level='error')
con = fxcmpy.fxcmpy(access_token=constants.fxcm_API, log_level='error')

data_frame = None

Expand Down
5 changes: 5 additions & 0 deletions findatapy/util/dataconstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class DataConstants(object):
'dukascopy' : 8,
'fxcm' : 4}

# seconds for timeout
timeout_downloader = {'dukascopy' : 20}

dukascopy_multithreading = True

# we can override the thread count and drop back to single thread for certain market data downloads, as can have issues with
# quite large daily datasets from Bloomberg (and other data vendors) when doing multi-threading, so can override and use
# single threading on these (and also split into several chunks)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
tickers, using configuration files. There is also functionality which is particularly useful for those downloading FX market data."""

setup(name='findatapy',
version='0.1.7',
version='0.1.8',
description='Market data library',
author='Saeed Amen',
author_email='[email protected]',
Expand Down

0 comments on commit 59845cb

Please sign in to comment.