Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run dbt in batches #158

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
128 changes: 122 additions & 6 deletions dbt/dbt-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ def setup():
""")
conn.commit()

with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE IF NOT EXISTS
{os.getenv('POSTGRES_SCHEMA')}.dbt_batch_status (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP,
status TEXT
)
""")
conn.commit()

def get_package():
package_json = '{}'

Expand Down Expand Up @@ -107,8 +119,7 @@ def save_package_manifest(package_json, manifest_json):
)
conn.commit()


def update_models():
def update_dbt_deps():
# install the cht pipeline package
package_json = get_package()
subprocess.run(["dbt", "deps", "--profiles-dir", ".dbt", "--upgrade"])
Expand All @@ -119,6 +130,8 @@ def update_models():
# save the new manifest and package for the next run
save_package_manifest(package_json, manifest_json)

def update_models():
update_dbt_deps()
# anything that changed, run a full refresh
subprocess.run(["dbt", "run",
"--profiles-dir",
Expand All @@ -133,10 +146,113 @@ def run_incremental_models():
# update incremental models (and tables if there are any)
subprocess.run(["dbt", "run", "--profiles-dir", ".dbt", "--exclude", "config.materialized:view"])

def get_pending_doc_count():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT SUM(pending)
FROM {os.getenv('POSTGRES_SCHEMA')}.couchdb_progress
""")
return cur.fetchone()[0]

def get_batch_ranges():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT
MIN(saved_timestamp) as start_timestamp,
MAX(saved_timestamp) as end_timestamp
FROM {os.getenv('POSTGRES_SCHEMA')}.{os.getenv('POSTGRES_TABLE')}
""")
result = cur.fetchone()
if result is None or len(result) == 0:
return []

start_timestamp, end_timestamp = result
start_timestamp = int(start_timestamp.timestamp())
end_timestamp = int(end_timestamp.timestamp())
batch_size = int(os.getenv("DBT_BATCH_SIZE") or 10000)
return [(start, min(start + batch_size, end_timestamp)) for start in range(start_timestamp, end_timestamp, batch_size)]

def update_batch_status(timestamp, status):
with connection() as conn:
with conn.cursor() as cur:
# insert new entry
cur.execute(
f"INSERT INTO {os.getenv('POSTGRES_SCHEMA')}.dbt_batch_status (timestamp, status) VALUES (%s, %s);", [timestamp, status]
)
conn.commit()

def get_last_processed_timestamp():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT MAX(timestamp)
FROM {os.getenv('POSTGRES_SCHEMA')}.dbt_batch_status
WHERE status = 'success'
""")
result = cur.fetchone()
if result and result[0]:
return result[0]
return '1970-01-01 00:00:00'

def get_max_timestamp():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT MAX(saved_timestamp)
FROM {os.getenv('POSTGRES_SCHEMA')}.document_metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little strange to reference document_metadata here, since this is part of the pipeline schema.
Is there some way we can make this independent of the pipeline schema?
What if someone wants to use cht-sync with a completely different set of models?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that anyone building their models would still have to make use of our base models and build any additional models on top of that. There would be an issue if we updated the base models and renamed this table so this would have to be updated as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. I've been reading and there's nothing DBT can return by default to batch. Soo disappointing.
Can we add something that tests of this table exists before we start batching, and log a friendly message of why batching won't work? Or even throw an error that running in batches is not possible.

Copy link
Contributor Author

@njuguna-n njuguna-n Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: you are suggesting we throw an error if the batching flag is enabled but the table does not exist?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

""")
return cur.fetchone()[0]

def run_dbt_in_batches():
print("Running dbt in batches")
last_processed_timestamp = get_last_processed_timestamp()
batch_size = int(os.getenv("DBT_BATCH_SIZE") or 10000)
print(f"Starting new batch with timestamp: {last_processed_timestamp}")

while True:
print(f"Starting new batch with timestamp: {last_processed_timestamp}")
update_dbt_deps()
result = subprocess.run([
"dbt", "run",
"--profiles-dir", ".dbt",
"--vars", f'{{start_timestamp: "{last_processed_timestamp}", batch_size: {batch_size}}}'
])

if result.returncode != 0:
print("Error running dbt")
update_batch_status(last_processed_timestamp, "error")
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))
continue

update_batch_status(last_processed_timestamp, "success")
max_timestamp = get_max_timestamp()

if max_timestamp == last_processed_timestamp:
print("Finished processing all batches")
break

last_processed_timestamp = max_timestamp

run_dbt()

def run_dbt():
print("Starting regular dbt run")
while True:
update_models()
run_incremental_models()
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))

if __name__ == "__main__":
print("Starting dbt run")
setup()
while True:
update_models()
run_incremental_models()
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))
# check if we need to run in batch
pending_doc_count = get_pending_doc_count()
print(f"Pending doc count: {pending_doc_count}")
process_in_batch = pending_doc_count > int(os.getenv("DBT_BATCH_PROCESS_LIMIT") or 100000)
if process_in_batch:
print("Processing in batches")
run_dbt_in_batches()
else:
run_dbt()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I have another question though:

what happens if a cht-sync instance is started with RUN_DBT_IN_BATCHES=false, and then, later, when there's a large influx of docs, the cht-sync is restarted with RUN_DBT_IN_BATCHES=true. Will this make DBT sync process everything again because we don't have a batch status stored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the document_metadata table check because there was a bug where dbt would not run for new deployments where the tables ans views are not yet created. The dbt ls command should be able to help identify if the model is defined but I didn't manage to get it to work so we'll catch that error elsewhere in the code.

what happens if a cht-sync instance is started with RUN_DBT_IN_BATCHES=false, and then, later, when there's a large influx of docs, the cht-sync is restarted with RUN_DBT_IN_BATCHES=true. Will this make DBT sync process everything again because we don't have a batch status stored?

I have added a check on the document_metadata table for the latest timestamp that handles this scenario.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still unclear what would happen if we run in batches and then change the config, relaunch dbt and run in full. What would happen then? Will dbt know which docs it has already indexed?
What if we switch from running in full to running in batches? Will everything start from 0?

3 changes: 3 additions & 0 deletions dbt/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ target-path: "target"
clean-targets: ["target", "dbt_modules"]
macro-paths: ["macros"]
log-path: "logs"
vars:
start_timestamp: null
batch_size: null
Loading