Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use aiofiles to avoid blocking file functions #407

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Install dependencies
shell: bash -l {0}
run: |
conda install -c conda-forge pytest ujson requests decorator google-auth vcrpy aiohttp google-auth-oauthlib flake8 black google-cloud-core google-api-core google-api-python-client -y
conda install -c conda-forge pytest ujson requests decorator google-auth vcrpy aiofiles aiohttp google-auth-oauthlib flake8 black google-cloud-core google-api-core google-api-python-client -y
pip install git+https://github.com/intake/filesystem_spec --no-deps
conda list
conda --version
Expand Down
15 changes: 8 additions & 7 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import asyncio
import fsspec
import aiofiles

import io
import json
Expand Down Expand Up @@ -892,15 +893,15 @@ async def _put_file(
consistency = consistency or self.consistency
checker = get_consistency_checker(consistency)
bucket, key = self.split_path(rpath)
with open(lpath, "rb") as f0:
size = f0.seek(0, 2)
f0.seek(0)
async with aiofiles.open(lpath, "rb") as f0:
size = await f0.seek(0, 2)
await f0.seek(0)
if size < 5 * 2 ** 20:
return await simple_upload(
self,
bucket,
key,
f0.read(),
await f0.read(),
consistency=consistency,
metadatain=metadata,
content_type=content_type,
Expand All @@ -911,7 +912,7 @@ async def _put_file(
)
offset = 0
while True:
bit = f0.read(chunksize)
bit = await f0.read(chunksize)
if not bit:
break
out = await upload_chunk(
Expand Down Expand Up @@ -994,12 +995,12 @@ async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs):
checker = get_consistency_checker(consistency)

os.makedirs(os.path.dirname(lpath), exist_ok=True)
with open(lpath, "wb") as f2:
async with aiofiles.open(lpath, "wb") as f2:
while True:
data = await r.content.read(4096 * 32)
if not data:
break
f2.write(data)
await f2.write(data)
checker.update(data)

validate_response(r.status, data, rpath) # validate http request
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ google-auth-oauthlib
requests
decorator
fsspec==2021.07.0
aiofiles
aiohttp