Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

duckdb.duckdb.IOException: IO Error: Connection error for HTTP HEAD to ... error while reading from moto mock s3 bucket #48

Open
mayankanand007 opened this issue Jul 19, 2024 · 1 comment

Comments

@mayankanand007
Copy link

mayankanand007 commented Jul 19, 2024

Thanks a lot for all the work with duckdb! I have found it to be extremely useful!

I'm running into an issue with querying files stored in a mock s3 bucket (working with the real s3 bucket), here are the details:

Error:

FAILED error.py::test_duckdb_aws - duckdb.duckdb.IOException: IO Error: Connection error for HTTP HEAD to 'https://some-random-bucket.test/_url/random_output_dir/test.parquet'

Steps to reproduce:

duckdb version: 1.0.0
s3fs: 2023.6.0
aiohttp: 3.9.0
aibotocore: 2.5.2
moto: 4.1.0

I'm using moto for mocking the s3 bucket. Specifically, I use the fixture defined in this comment: aio-libs/aiobotocore#755 (comment)

from collections.abc import Callable
from typing import Any
from unittest.mock import MagicMock

import aiobotocore.awsrequest
import aiobotocore.endpoint
import aiohttp
import aiohttp.client_reqrep
import aiohttp.typedefs
import boto3
import botocore.awsrequest
import botocore.model
import duckdb
import pandas as pd
import pytest
from moto import mock_s3
from upath import UPath

"""
Source: https://github.com/aio-libs/aiobotocore/issues/755#issuecomment-1424945194
"""


class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse):
    """
    Mocked AWS Response.
    https://github.com/aio-libs/aiobotocore/issues/755
    https://gist.github.com/giles-betteromics/12e68b88e261402fbe31c2e918ea4168
    """

    def __init__(self, response: botocore.awsrequest.AWSResponse):
        self._moto_response = response
        self.status_code = response.status_code
        self.raw = MockHttpClientResponse(response)

    # adapt async methods to use moto's response
    async def _content_prop(self) -> bytes:
        return self._moto_response.content

    async def _text_prop(self) -> str:
        return self._moto_response.text


class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
    """
    Mocked HTP Response.
    See <MockAWSResponse> Notes
    """

    read_count = 0

    _loop = None  # type: ignore

    def __init__(self, response: botocore.awsrequest.AWSResponse):
        """
        Mocked Response Init.
        """

        async def read(bytes_size: int = -1) -> bytes:
            if self.read_count == 0:
                self.read_count += 1
                return response.content
            else:
                return b""

        self.content = MagicMock(aiohttp.StreamReader)
        self.content.read = read
        self.response = response

    @property
    def raw_headers(self) -> Any:
        """
        Return the headers encoded the way that aiobotocore expects them.
        """
        return {
            k.encode("utf-8"): str(v).encode("utf-8") for k, v in self.response.headers.items()
        }.items()


@pytest.fixture(scope="session", autouse=True)
def patch_aiobotocore() -> None:
    """
    Pytest Fixture Supporting S3FS Mocks.
    See <MockAWSResponse> Notes
    """

    def factory(original: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
        """
        Response Conversion Factory.
        """

        def patched_convert_to_response_dict(
            http_response: botocore.awsrequest.AWSResponse,
            operation_model: botocore.model.OperationModel,
        ) -> Any:
            return original(MockAWSResponse(http_response), operation_model)

        return patched_convert_to_response_dict

    aiobotocore.endpoint.convert_to_response_dict = factory(
        aiobotocore.endpoint.convert_to_response_dict
    )


@pytest.fixture
def s3_fixture() -> boto3.resources.base.ServiceResource:
    with mock_s3():
        conn = boto3.resource("s3", region_name="us-east-1")
        yield conn


def test_duckdb_aws(s3_fixture: boto3.resources.base.ServiceResource):
    # Initialize data to lists.
    data = [{"a": 1, "b": 2, "c": 3}, {"a": 10, "b": 20, "c": 30}]

    # Creates DataFrame.
    df = pd.DataFrame(data)

    TEST_S3_BUCKET_NAME = "some-random-bucket"
    s3_fixture.create_bucket(Bucket=TEST_S3_BUCKET_NAME)

    output_dir = UPath(
        f"s3://{TEST_S3_BUCKET_NAME}/random_output_dir", s3_additional_kwargs={"ACL": "private"}
    )

    output_pth = output_dir / "test.parquet"

    df.to_parquet(output_pth, storage_options={"s3_additional_kwargs": {"ACL": "private"}})

    with duckdb.connect() as con:
        con.sql("INSTALL httpfs")
        con.sql("LOAD https")
        con.sql(f"SET s3_access_key_id='test_access_key'")
        con.sql(f"SET s3_secret_access_key='test_secret_key'")
        con.sql(f"SET s3_endpoint='test_url'")
        con.sql(f"""select * from '{output_pth}'; """)
        result = con.fetchdf()
        print(result)

Any advice for this? Thanks in advance!

@samansmink
Copy link
Collaborator

hey @mayankanand007 thanks for reporting this! I think this has to do with the fact that DuckDB uses a custom S3 implementation instead of an official SDK.

You could try switching to fsspec instead (https://duckdb.org/docs/guides/python/filesystems.html) but that may kindof defeat the purpose of your tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants