Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shopify<>GCS (State Cache: Postgres): Frozen After Completion #580

Open
Guilherme-B opened this issue Jan 21, 2025 · 2 comments
Open

Shopify<>GCS (State Cache: Postgres): Frozen After Completion #580

Guilherme-B opened this issue Jan 21, 2025 · 2 comments

Comments

@Guilherme-B
Copy link

I'm running PyAirbyte to sync a Shopify Source to a Google Cloud Storage Destination using a Postgres Cache as a State Cache.
The process runs smoothly and takes around 15 minutes in total, as depicted by the image below:

Image

The odd thing is, the process never completes after this. It just gets stuck and the Python process never ends.

Below are the code and the logs:

Code:

from __future__ import annotations

import datetime

import airbyte as ab
from airbyte.caches import PostgresCache

def get_shopify_source() -> ab.Source:
    return ab.get_source(
        "source-shopify",
        config={
            "shop": "",
            "credentials": {
                "auth_method": "api_password",
                "api_password": "",
            },
        },
        # docker_image=True,
        streams=[
            "blogs",
            "collections",
            "collects",
            "countries",
            "custom_collections",
            "customer_address",
            "customers",
            "discount_codes",
            "draft_orders",
            "fulfillment_orders",
            "inventory_items",
            "inventory_levels",
            "locations",
            "metafield_collections",
            "metafield_customers",
            "metafield_products",
            "metafield_shops",
            "metafield_smart_collections",
            "order_risks",
            "orders",
            "pages",
            "price_rules",
            "product_images",
            "product_variants",
            "products",
            # "products_graph_ql",
            "shop",
            "smart_collections",
            "tender_transactions",
            "transactions",
        ],
    )

def get_gcs_destination() -> ab.Destination:
    # Destination configuration
    gcs_config = {
        "gcs_bucket_name": "",
        "gcs_bucket_path": "",
        "gcs_bucket_region": "",
        "credential": {
            "credential_type": "HMAC_KEY",
            "hmac_key_access_id": "",
            "hmac_key_secret": "",
        },
        "format": {
            "format_type": "Parquet",
            "compression_codec": "UNCOMPRESSED",
            "block_size_mb": 128,
        },
    }

    destination = ab.get_destination(
        name="destination-gcs",
        config=gcs_config,
        install_if_missing=True,
        docker_image=False,
    )

    return destination


def main() -> None:
    """Test writing from the source to the destination."""
    source = get_shopify_source()
    source.check()
    destination = get_gcs_destination()
    destination.check()

    # Define a Postgres Cache and pass the necessary configuration
    pg_state_cache = PostgresCache(
        host="localhost",
        port=5432,
        username="X",
        password="X",
        database="pyairbyte_demo",
        schema_name="airbyte_internal",
    )

    destination._name = "client_" + destination.name
    source._name = "client_" + source.name

    write_result: ab.WriteResult = destination.write(
        source_data=source,
        state_cache=pg_state_cache,
        cache=False,
    )

    print(
        f"Completed writing {write_result.processed_records:,} records "
        f"to destination at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.",
    )


if __name__ == "__main__":
    main()
@aaronsteers
Copy link
Contributor

@Guilherme-B - Thanks for raising this. I don't see anything wrong with your setup. Are you still running into this or did you find a workaround?

I see there are a lot of log messages without descriptions in the destination log. That seems to be internal to the destination though, and not something we can necessarily debug from PyAirbyte side.

The only theories I can suggest would be:

(1) That GCS is locked somehow on finalizing. If so, you would see the docker image for GCS still running.
(2) The GCS destination finishes but PyAirbyte for some reason isn't properly shutting down threads.

It might be worthwhile trying to tail destination logs via docker, and/or trying with another destination to see if it is a connector-specific or PyAirbyte-specific issue.

Let me know if any of this helps or if you have any other clues/observations that might point us to a diagnosis. Thanks!

@Guilherme-B
Copy link
Author

Guilherme-B commented Feb 4, 2025

Hey @aaronsteers !

I temporarily managed to find a workaround by syncing each stream separately. It solved the issue temporarily, although on some occasions, it still occurs. It might be the high number of streams as using a single Faker stream, works fine.

I will keep trying to debug the issue, might definitely be unclosed threads given the writing to destination logs as being successful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants