Skip to content

Commit

Permalink
Remove wait for thread implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
RushiT0122 committed Apr 14, 2024
1 parent b1e9138 commit 0efeb77
Showing 1 changed file with 0 additions and 18 deletions.
18 changes: 0 additions & 18 deletions tap_mambu/tap_generators/multithreaded_offset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,6 @@ def set_last_sync_completed(self, end_time):
write_bookmark(self.state, self.stream_name,
self.sub_type, datetime_to_utc_str(end_time))

def wait_for_slibling_to_catchup(self):
if not self.sibling_sub_stream:
return
keep_waiting = True
while keep_waiting:
current_bookmark = self.get_default_start_value()
for sibling in self.sibling_sub_stream:
sibling_bookmark = get_bookmark(self.state, sibling, self.sub_type, self.get_default_start_value())
if str_to_datetime(current_bookmark) > str_to_datetime(sibling_bookmark):
keep_waiting = True
LOGGER.info(f"Waiting for sibling {sibling} thread to catch-up!")
time.sleep(5)
else:
keep_waiting = False
break

@backoff.on_exception(backoff.expo, RuntimeError, max_tries=5)
def _all_fetch_batch_steps(self):
if self.date_windowing:
Expand All @@ -177,7 +161,6 @@ def _all_fetch_batch_steps(self):
final_buffer = []
while start < end:
self.write_sub_stream_bookmark(datetime_to_utc_str(start))
self.wait_for_slibling_to_catchup()
# Limit the buffer size by holding generators from creating new batches
if len(self.buffer) > self.max_buffer_size:
while len(self.buffer):
Expand All @@ -191,7 +174,6 @@ def _all_fetch_batch_steps(self):
self.start_windows_datetime_str = start
start = temp
temp = start + timedelta(days=self.date_window_size)

else:
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
self.preprocess_batches(final_buffer)
Expand Down

0 comments on commit 0efeb77

Please sign in to comment.