Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Tracing for client #106

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,7 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
#.idea/

# ruff Linter / Formatter
.ruff_cache
28 changes: 28 additions & 0 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=827846 to learn about workspace recommendations.
// Extension identifier format: ${publisher}.${name}. Example: vscode.csharp

// List of extensions which should be recommended for users of this workspace.
"recommendations": [
"ms-azuretools.vscode-docker",
"tamasfe.even-better-toml",
"eamodio.gitlens",
"ms-python.isort",
"ms-python.mypy-type-checker",
"ms-python.debugpy",
"ms-python.python",
"ms-python.vscode-pylance",
"charliermarsh.ruff",
"ms-python.autopep8",
"ms-vscode-remote.remote-containers",
"ms-vscode.remote-explorer",
"ms-vscode.remote-server",
"ms-vscode-remote.remote-ssh-edit",
"ms-vscode-remote.remote-ssh",
"ms-toolsai.jupyter-keymap"
],
// List of extensions recommended by VS Code that should not be recommended for users of this workspace.
"unwantedRecommendations": [

]
}
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,33 @@ The following shows how to run `hatch run lint:all` but this also works for any
# Use specific Python version (Must be a valid tag from: https://hub.docker.com/_/python)
./docker-hatch -v 3.9 run lint:all
```

## Vscode setup

### Prerequisites
Install the recommended extensions.

### Instructions

Configure hatch to create virtual env in project folder.
```
hatch config set dirs.env.virtual .env
```

After, create all the python environments needed by running `hatch -e all run tests`.

Finally, configure vscode to use one of the selected environments:
`cmd + shift + p` -> `python: Select Interpreter` -> Pick one of the folders in `./.env`

## Support

The kagglehub library has configured automatic logging which is stored in a log folder. The log destination is resolved via the [os.path.expanduser](https://docs.python.org/3/library/os.path.html#os.path.expanduser)

The table below contains possible locations:
| os | log path |
|---------|--------------------------------------------------|
| osx | /user/$USERNAME/.kaggle/logs/kagglehub.log |
| linux | ~/.kaggle/logs/kagglehub.log |
| windows | C:\Users\\%USERNAME%\\.kaggle\logs\kagglehub.log |

Please include the log to help troubleshoot issues.
10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ skip-string-normalization = true
[tool.ruff]
target-version = "py37"
line-length = 120

[tool.ruff.lint]
select = [
"A",
"ARG",
Expand Down Expand Up @@ -117,7 +119,7 @@ select = [
"ANN001",
"ANN002",
"ANN003",
"ANN102",
"ANN102",
"ANN201",
"ANN202",
"ANN401",
Expand All @@ -141,13 +143,13 @@ unfixable = [
"F401",
]

[tool.ruff.isort]
[tool.ruff.lint.isort]
known-first-party = ["kagglehub"]

[tool.ruff.flake8-tidy-imports]
[tool.ruff.lint.flake8-tidy-imports]
ban-relative-imports = "all"

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
# Tests can use magic values, assertions, and relative imports
"tests/**/*" = ["PLR2004", "S101", "TID252"]
# Ignore unused imports in __init__.py
Expand Down
2 changes: 1 addition & 1 deletion src/kagglehub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__version__ = "0.2.4"

import kagglehub.logging # configures the library logger.
import kagglehub.logger # configures the library logger.
from kagglehub import colab_cache_resolver, http_resolver, kaggle_cache_resolver, registry
from kagglehub.auth import login
from kagglehub.models import model_download, model_upload
Expand Down
15 changes: 9 additions & 6 deletions src/kagglehub/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
import os
from typing import Optional, Tuple
from typing import Callable, Optional, Tuple
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -31,6 +31,7 @@
)
from kagglehub.handle import ResourceHandle
from kagglehub.integrity import get_md5_checksum_from_response, to_b64_digest, update_hash_from_file
from kagglehub.tracing import TraceContext, default_context_factory

CHUNK_SIZE = 1048576
# The `connect` timeout is the number of seconds `requests` will wait for your client to establish a connection.
Expand Down Expand Up @@ -81,9 +82,10 @@ def get_user_agent() -> str:
class KaggleApiV1Client:
BASE_PATH = "api/v1"

def __init__(self) -> None:
def __init__(self, ctx_factory: Optional[Callable[[], TraceContext]] = None) -> None:
self.credentials = get_kaggle_credentials()
self.endpoint = get_kaggle_api_endpoint()
self.ctx_factory = default_context_factory if ctx_factory is None else ctx_factory

def _check_for_version_update(self, response: requests.Response) -> None:
latest_version_str = response.headers.get("X-Kaggle-HubVersion")
Expand All @@ -100,7 +102,7 @@ def get(self, path: str, resource_handle: Optional[ResourceHandle] = None) -> di
url = self._build_url(path)
with requests.get(
url,
headers={"User-Agent": get_user_agent()},
headers={"User-Agent": get_user_agent(), "traceparent": self.ctx_factory().next()},
auth=self._get_http_basic_auth(),
timeout=(DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT),
) as response:
Expand All @@ -112,7 +114,7 @@ def post(self, path: str, data: dict) -> dict:
url = self._build_url(path)
with requests.post(
url,
headers={"User-Agent": get_user_agent()},
headers={"User-Agent": get_user_agent(), "traceparent": self.ctx_factory().next()},
json=data,
auth=self._get_http_basic_auth(),
timeout=(DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT),
Expand All @@ -126,9 +128,10 @@ def post(self, path: str, data: dict) -> dict:
def download_file(self, path: str, out_file: str, resource_handle: Optional[ResourceHandle] = None) -> None:
url = self._build_url(path)
logger.info(f"Downloading from {url}...")
ctx = self.ctx_factory()
with requests.get(
url,
headers={"User-Agent": get_user_agent()},
headers={"User-Agent": get_user_agent(), "traceparent": ctx.next()},
stream=True,
auth=self._get_http_basic_auth(),
timeout=(DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT),
Expand Down Expand Up @@ -156,7 +159,7 @@ def download_file(self, path: str, out_file: str, resource_handle: Optional[Reso
stream=True,
auth=self._get_http_basic_auth(),
timeout=(DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT),
headers={"Range": f"bytes={size_read}-"},
headers={"Range": f"bytes={size_read}-", "traceparent": ctx.next()},
) as resumed_response:
_download_file(resumed_response, out_file, size_read, total_size, hash_object)
else:
Expand Down
37 changes: 28 additions & 9 deletions src/kagglehub/gcs_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import zipfile
from datetime import datetime
from tempfile import TemporaryDirectory
from typing import Dict, List, Optional, Union
from typing import Callable, Dict, List, Optional, Union

import requests
from requests.exceptions import ConnectionError, Timeout
Expand All @@ -13,6 +13,7 @@

from kagglehub.clients import KaggleApiV1Client
from kagglehub.exceptions import BackendError
from kagglehub.tracing import TraceContext, default_context_factory

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,13 +92,14 @@ def _check_uploaded_size(session_uri: str, file_size: int, backoff_factor: int =
return 0 # Return 0 if all retries fail


def _upload_blob(file_path: str, model_type: str) -> str:
def _upload_blob(file_path: str, model_type: str, ctx_factory: Optional[Callable[[], TraceContext]] = None) -> str:
"""Uploads a file to a remote server as a blob and returns an upload token.

Parameters
==========
file_path: The path to the file to be uploaded.
model_type : The type of the model associated with the file.
ctx_factory: The function to initalize a trace context
"""
file_size = os.path.getsize(file_path)
data = {
Expand All @@ -106,7 +108,9 @@ def _upload_blob(file_path: str, model_type: str) -> str:
"contentLength": file_size,
"lastModifiedEpochSeconds": int(os.path.getmtime(file_path)),
}
api_client = KaggleApiV1Client()
if ctx_factory is None:
ctx_factory = default_context_factory
api_client = KaggleApiV1Client(ctx_factory)
response = api_client.post("/blobs/upload", data=data)

# Validate response content
Expand All @@ -118,7 +122,11 @@ def _upload_blob(file_path: str, model_type: str) -> str:
raise BackendError(token_exception)

session_uri = response["createUrl"]
headers = {"Content-Type": "application/octet-stream", "Content-Range": f"bytes 0-{file_size - 1}/{file_size}"}
headers = {
"Content-Type": "application/octet-stream",
"Content-Range": f"bytes 0-{file_size - 1}/{file_size}",
"traceparent": ctx_factory().next(),
}

retry_count = 0
uploaded_bytes = 0
Expand Down Expand Up @@ -150,12 +158,16 @@ def _upload_blob(file_path: str, model_type: str) -> str:
retry_count += 1
uploaded_bytes = _check_uploaded_size(session_uri, file_size)
pbar.n = uploaded_bytes # Update progress bar to reflect actual uploaded bytes
headers["traceparent"] = ctx_factory().next()

return response["token"]


def upload_files_and_directories(
folder: str, model_type: str, quiet: bool = False # noqa: FBT002, FBT001
folder: str,
model_type: str,
quiet: bool = False, # noqa: FBT002, FBT001
ctx_factory: Optional[Callable[[], TraceContext]] = None,
) -> UploadDirectoryInfo:
# Count the total number of files
file_count = 0
Expand All @@ -176,7 +188,7 @@ def upload_files_and_directories(

tokens = [
token
for token in [_upload_file_or_folder(temp_dir, TEMP_ARCHIVE_FILE, model_type, quiet)]
for token in [_upload_file_or_folder(temp_dir, TEMP_ARCHIVE_FILE, model_type, quiet, ctx_factory)]
if token is not None
]
return UploadDirectoryInfo(name="archive", files=tokens)
Expand Down Expand Up @@ -222,6 +234,7 @@ def _upload_file_or_folder(
file_or_folder_name: str,
model_type: str,
quiet: bool = False, # noqa: FBT002, FBT001
ctx_factory: Optional[Callable[[], TraceContext]] = None,
) -> Optional[str]:
"""
Uploads a file or each file inside a folder individually from a specified path to a remote service.
Expand All @@ -236,11 +249,17 @@ def _upload_file_or_folder(
"""
full_path = os.path.join(parent_path, file_or_folder_name)
if os.path.isfile(full_path):
return _upload_file(file_or_folder_name, full_path, quiet, model_type)
return _upload_file(file_or_folder_name, full_path, quiet, model_type, ctx_factory)
return None


def _upload_file(file_name: str, full_path: str, quiet: bool, model_type: str) -> Optional[str]: # noqa: FBT001
def _upload_file(
file_name: str,
full_path: str,
quiet: bool, # noqa: FBT001
model_type: str,
ctx_factory: Optional[Callable[[], TraceContext]] = None,
) -> Optional[str]:
"""Helper function to upload a single file
Parameters
==========
Expand All @@ -255,7 +274,7 @@ def _upload_file(file_name: str, full_path: str, quiet: bool, model_type: str) -
logger.info("Starting upload for file " + file_name)

content_length = os.path.getsize(full_path)
token = _upload_blob(full_path, model_type)
token = _upload_blob(full_path, model_type, ctx_factory)
if not quiet:
logger.info("Upload successful: " + file_name + " (" + File.get_size(content_length) + ")")
return token
30 changes: 30 additions & 0 deletions src/kagglehub/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path

from kagglehub.config import get_log_verbosity


def _configure_logger() -> None:
library_name = __name__.split(".")[0] # i.e. "kagglehub"
library_logger = logging.getLogger(library_name)
while library_logger.handlers:
library_logger.handlers.pop()

log_dir = Path.home() / ".kaggle" / "logs"
log_dir.mkdir(exist_ok=True, parents=True)
file_handler = RotatingFileHandler(str(log_dir / "kagglehub.log"), maxBytes=1024 * 1024 * 5, backupCount=5)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(funcName)s - %(message)s"
)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
library_logger.addHandler(file_handler)
library_logger.addHandler(logging.StreamHandler())
# Disable propagation of the library log outputs.
# This prevents the same message again from being printed again if a root logger is defined.
library_logger.propagate = False
library_logger.setLevel(get_log_verbosity())


_configure_logger()
16 changes: 0 additions & 16 deletions src/kagglehub/logging.py

This file was deleted.

Loading