Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run dbt in batches #158

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
100 changes: 94 additions & 6 deletions dbt/dbt-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ def setup():
""")
conn.commit()

with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE IF NOT EXISTS
{os.getenv('POSTGRES_SCHEMA')}.dbt_batch_status (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP,
status TEXT
)
""")
conn.commit()

def get_package():
package_json = '{}'

Expand Down Expand Up @@ -107,8 +119,7 @@ def save_package_manifest(package_json, manifest_json):
)
conn.commit()


def update_models():
def update_dbt_deps():
# install the cht pipeline package
package_json = get_package()
subprocess.run(["dbt", "deps", "--profiles-dir", ".dbt", "--upgrade"])
Expand All @@ -119,6 +130,8 @@ def update_models():
# save the new manifest and package for the next run
save_package_manifest(package_json, manifest_json)

def update_models():
update_dbt_deps()
# anything that changed, run a full refresh
subprocess.run(["dbt", "run",
"--profiles-dir",
Expand All @@ -133,10 +146,85 @@ def run_incremental_models():
# update incremental models (and tables if there are any)
subprocess.run(["dbt", "run", "--profiles-dir", ".dbt", "--exclude", "config.materialized:view"])

def get_pending_doc_count():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT SUM(pending)
FROM {os.getenv('POSTGRES_SCHEMA')}.couchdb_progress
""")
return cur.fetchone()[0]

def update_batch_status(timestamp, status):
with connection() as conn:
with conn.cursor() as cur:
# insert new entry
cur.execute(
f"INSERT INTO {os.getenv('POSTGRES_SCHEMA')}.dbt_batch_status (timestamp, status) VALUES (%s, %s);", [timestamp, status]
)
conn.commit()

def get_last_processed_timestamp():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT MAX(timestamp)
FROM {os.getenv('POSTGRES_SCHEMA')}.dbt_batch_status
WHERE status = 'success'
""")
result = cur.fetchone()
if result and result[0]:
return result[0]
return '1970-01-01 00:00:00'

def get_max_timestamp():
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT MAX(saved_timestamp)
FROM {os.getenv('POSTGRES_SCHEMA')}.document_metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little strange to reference document_metadata here, since this is part of the pipeline schema.
Is there some way we can make this independent of the pipeline schema?
What if someone wants to use cht-sync with a completely different set of models?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that anyone building their models would still have to make use of our base models and build any additional models on top of that. There would be an issue if we updated the base models and renamed this table so this would have to be updated as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. I've been reading and there's nothing DBT can return by default to batch. Soo disappointing.
Can we add something that tests of this table exists before we start batching, and log a friendly message of why batching won't work? Or even throw an error that running in batches is not possible.

Copy link
Contributor Author

@njuguna-n njuguna-n Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: you are suggesting we throw an error if the batching flag is enabled but the table does not exist?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

""")
return cur.fetchone()[0]

def run_dbt_in_batches():
last_processed_timestamp = get_last_processed_timestamp()
batch_size = int(os.getenv("DBT_BATCH_SIZE") or 10000)
dataemon_interval = int(os.getenv("DATAEMON_INTERVAL") or 5)

while True:
update_dbt_deps()
result = subprocess.run([
"dbt", "run",
"--profiles-dir", ".dbt",
"--vars", f'{{start_timestamp: "{last_processed_timestamp}", batch_size: {batch_size}}}',
"--exclude", "config.materialized:view"
])

if result.returncode != 0:
print("Error running dbt")
update_batch_status(last_processed_timestamp, "error")
time.sleep(dataemon_interval)
continue

update_batch_status(last_processed_timestamp, "success")
max_timestamp = get_max_timestamp()

if max_timestamp == last_processed_timestamp:
time.sleep(dataemon_interval)
continue

last_processed_timestamp = max_timestamp

def run_dbt():
while True:
update_models()
run_incremental_models()
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))

if __name__ == "__main__":
print("Starting dbt run")
setup()
while True:
update_models()
run_incremental_models()
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))
if os.getenv("RUN_DBT_IN_BATCHES"):
run_dbt_in_batches()
else:
run_dbt()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I have another question though:

what happens if a cht-sync instance is started with RUN_DBT_IN_BATCHES=false, and then, later, when there's a large influx of docs, the cht-sync is restarted with RUN_DBT_IN_BATCHES=true. Will this make DBT sync process everything again because we don't have a batch status stored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the document_metadata table check because there was a bug where dbt would not run for new deployments where the tables ans views are not yet created. The dbt ls command should be able to help identify if the model is defined but I didn't manage to get it to work so we'll catch that error elsewhere in the code.

what happens if a cht-sync instance is started with RUN_DBT_IN_BATCHES=false, and then, later, when there's a large influx of docs, the cht-sync is restarted with RUN_DBT_IN_BATCHES=true. Will this make DBT sync process everything again because we don't have a batch status stored?

I have added a check on the document_metadata table for the latest timestamp that handles this scenario.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still unclear what would happen if we run in batches and then change the config, relaunch dbt and run in full. What would happen then? Will dbt know which docs it has already indexed?
What if we switch from running in full to running in batches? Will everything start from 0?

3 changes: 3 additions & 0 deletions dbt/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ target-path: "target"
clean-targets: ["target", "dbt_modules"]
macro-paths: ["macros"]
log-path: "logs"
vars:
start_timestamp: null
batch_size: null
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ services:
- CHT_PIPELINE_BRANCH_URL=${CHT_PIPELINE_BRANCH_URL}
- DATAEMON_INTERVAL=${DATAEMON_INTERVAL}
- DBT_PACKAGE_TARBALL_URL=${DBT_PACKAGE_TARBALL_URL}
- DBT_BATCH_SIZE=${DBT_BATCH_SIZE}
- RUN_DBT_IN_BATCHES=${RUN_DBT_IN_BATCHES}
- PYTHONUNBUFFERED=1
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
"main": "",
"scripts": {
"postinstall": "cd couch2pg && npm ci",
"test:e2e": "npm run test:e2e-data && npm run test:e2e-containers && mocha tests/**/*.spec.js --timeout 50000; npm run test:e2e-stop-containers ",
"test:e2e": "npm run test:e2e-regular && npm run test:e2e-batch",
"test:e2e-regular": "npm run test:e2e-data && npm run test:e2e-containers && mocha tests/e2e-test.spec.js --timeout 50000; npm run test:e2e-stop-containers",
"test:e2e-batch": "npm run test:e2e-data && npm run test:e2e-batch-containers && mocha tests/**/*.spec.js --timeout 50000; npm run test:e2e-stop-containers",
"lint": "eslint --color --cache .",
"test:e2e-stop-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml down -v",
"test:e2e-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml up -d --build --force-recreate",
"test:e2e-batch-containers": "docker compose --env-file ./tests/.e2e-batch-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml up -d --build --force-recreate",
"test:e2e-data": "cd tests/data && rm -rf ./json_docs && cht csv-to-docs",
"test": "cd couch2pg && npm run test"
},
Expand Down
20 changes: 20 additions & 0 deletions tests/.e2e-batch-env
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
POSTGRES_USER="postgres"
POSTGRES_PASSWORD="postgres"
POSTGRES_DB="data"
POSTGRES_TABLE="medic"
POSTGRES_SCHEMA="v1"
DBT_POSTGRES_USER="postgres"
DBT_POSTGRES_PASSWORD="postgres"
DBT_POSTGRES_SCHEMA="dbt"
DBT_POSTGRES_HOST="postgres"
DBT_PACKAGE_TARBALL_URL="http://dbt-package/dbt/package.tar.gz"
DATAEMON_INTERVAL=0
COUCHDB_USER="medic"
COUCHDB_PASSWORD="password"
COUCHDB_DBS="medic,medic-sentinel"
COUCHDB_HOST="host.docker.internal"
COUCHDB_PORT=5984
COUCHDB_SECURE=false
POSTGRES_HOST=postgres
DBT_BATCH_SIZE=500
RUN_DBT_IN_BATCHES=true
108 changes: 108 additions & 0 deletions tests/e2e-batch-test.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import chai from 'chai';
import chaiExclude from 'chai-exclude';
chai.use(chaiExclude);
chai.use(chaiExclude);
import { rootConnect } from './utils/postgres-utils.js';
import {
importAllDocs,
docs,
reports,
contacts,
persons,
} from './utils/couchdb-utils.js';

const {
POSTGRES_SCHEMA,
DBT_POSTGRES_SCHEMA: pgSchema,
POSTGRES_TABLE,
} = process.env;

const PGTABLE = `${POSTGRES_SCHEMA}.${POSTGRES_TABLE}`;

const delay = (seconds) => new Promise(resolve => setTimeout(resolve, seconds * 1000));

const waitForDbt = async (pgClient, retry = 30) => {
if (retry <= 0) {
throw new Error('DBT models missing records after 30s');
}

try {
const dbtReports = await pgClient.query(`SELECT * FROM ${pgSchema}.reports`);
const dbtContacts = await pgClient.query(`SELECT * FROM ${pgSchema}.contacts`);
if (dbtReports.rows.length === reports().length && dbtContacts.rows.length === contacts().length) {
return;
}
} catch {
// not done yet
}

await delay(1);
return waitForDbt(pgClient, --retry);
};

describe('Main workflow Test Suite', () => {
let client;

before(async () => {
console.log('Importing docs');
await importAllDocs();
client = await rootConnect();
console.log('Waiting for DBT');
await waitForDbt(client);
});

after(async () => await client?.end());

describe('Initial Sync', () => {
it('should have data in postgres medic table', async () => {
const couchdbTableResult = await client.query(`SELECT * FROM ${PGTABLE}`);
expect(couchdbTableResult.rows.length).to.equal(docs.length);
});

it('should have data in postgres contacts table', async () => {
const contactsTableResult = await client.query(`SELECT * FROM ${pgSchema}.contacts`);
expect(contactsTableResult.rows.length).to.equal(contacts().length);
});

it('should have data in postgres reports table', async () => {
const reportsTableResult = await client.query(`SELECT * FROM ${pgSchema}.reports`);
expect(reportsTableResult.rows.length).to.equal(reports().length);
});

it('should have data in postgres persons table', async () => {
const personsTableResult = await client.query(`SELECT * FROM ${pgSchema}.persons`);
expect(personsTableResult.rows.length).to.equal(persons().length);
});

it('should have the expected data in a record in contact table', async () => {
const contact = contacts().at(0);
const contactTableResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where uuid=$1`, [contact._id]);
expect(contactTableResult.rows.length).to.equal(1);
expect(contactTableResult.rows[0]).to.deep.include({
parent_uuid: contact.parent._id,
name: contact.name,
contact_type: contact.type,
phone: contact.phone
});
});

it('should have the expected data in a record in person table', async () => {
const person = persons().at(0);
const personTableResult = await client.query(`SELECT * FROM ${pgSchema}.persons where uuid=$1`, [person._id]);
expect(personTableResult.rows.length).to.equal(1);
expect(personTableResult.rows[0].date_of_birth).to.equal(person.date_of_birth);
expect(personTableResult.rows[0].sex).to.equal(person.sex);
});

it('should have the expected data in a record in reports table', async () => {
const report = reports().at(0);
const reportTableResult = await client.query(`SELECT * FROM ${pgSchema}.reports where uuid=$1`, [report._id]);
expect(reportTableResult.rows.length).to.equal(1);
expect(reportTableResult.rows[0].doc).excluding(['_rev', '_id']).to.deep.equal(report);
expect(reportTableResult.rows[0].form).to.equal(report.form);
expect(reportTableResult.rows[0].patient_id).to.equal(report.patient_id);
expect(reportTableResult.rows[0].contact_id).to.equal(report.contact._id);
expect(reportTableResult.rows[0].fields).to.deep.equal(report.fields);
});
});
});
Loading