Skip to content

Commit

Permalink
feat: batch imports svc (#27321)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Jan 21, 2025
1 parent 2d012f7 commit 22634f0
Show file tree
Hide file tree
Showing 43 changed files with 2,665 additions and 121 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/rust-docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ jobs:
dockerfile: ./rust/Dockerfile
- image: cymbal
dockerfile: ./rust/Dockerfile
- image: batch-import-worker
dockerfile: ./rust/Dockerfile
runs-on: depot-ubuntu-22.04-4
permissions:
id-token: write # allow issuing OIDC tokens for this workflow run
Expand All @@ -44,6 +46,7 @@ jobs:
cyclotron-fetch_digest: ${{ steps.digest.outputs.cyclotron-fetch_digest }}
cyclotron-janitor_digest: ${{ steps.digest.outputs.cyclotron-janitor_digest }}
property-defs-rs_digest: ${{ steps.digest.outputs.property-defs-rs_digest }}
batch-import-worker_digest: ${{ steps.digest.outputs.batch-import-worker_digest }}
hook-api_digest: ${{ steps.digest.outputs.hook-api_digest }}
hook-janitor_digest: ${{ steps.digest.outputs.hook-janitor_digest }}
hook-worker_digest: ${{ steps.digest.outputs.hook-worker_digest }}
Expand Down Expand Up @@ -142,6 +145,10 @@ jobs:
values:
image:
sha: '${{ needs.build.outputs.property-defs-rs_digest }}'
- release: batch-import-worker
values:
image:
sha: '${{ needs.build.outputs.batch-import-worker_digest }}'
- release: cymbal
values:
image:
Expand Down
50 changes: 50 additions & 0 deletions posthog/migrations/0551_batchimport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Generated by Django 4.2.15 on 2024-12-16 11:55

from django.db import migrations, models
import django.db.models.deletion
import posthog.helpers.encrypted_fields
import posthog.models.utils


class Migration(migrations.Migration):
dependencies = [
("posthog", "0550_migrate_action_webhooks_to_destinations"),
]

operations = [
migrations.CreateModel(
name="BatchImport",
fields=[
(
"id",
models.UUIDField(
default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("lease_id", models.TextField(blank=True, null=True)),
("leased_until", models.DateTimeField(blank=True, null=True)),
(
"status",
models.TextField(
choices=[
("completed", "Completed"),
("failed", "Failed"),
("paused", "Paused"),
("running", "Running"),
],
default="running",
),
),
("status_message", models.TextField(blank=True, null=True)),
("state", models.JSONField(blank=True, null=True)),
("import_config", models.JSONField()),
("secrets", posthog.helpers.encrypted_fields.EncryptedJSONStringField()),
("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")),
],
options={
"abstract": False,
},
),
]
2 changes: 1 addition & 1 deletion posthog/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0550_migrate_action_webhooks_to_destinations
0551_batchimport
2 changes: 2 additions & 0 deletions posthog/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .annotation import Annotation
from .async_deletion import AsyncDeletion, DeletionType
from .async_migration import AsyncMigration, AsyncMigrationError, MigrationStatus
from .batch_imports import BatchImport
from .cohort import Cohort, CohortPeople
from .comment import Comment
from .dashboard import Dashboard
Expand Down Expand Up @@ -100,6 +101,7 @@
"BatchExportBackfill",
"BatchExportDestination",
"BatchExportRun",
"BatchImport",
"Cohort",
"CohortPeople",
"Dashboard",
Expand Down
93 changes: 93 additions & 0 deletions posthog/models/batch_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from django.db import models

from posthog.models.utils import UUIDModel
from posthog.models.team import Team

from posthog.helpers.encrypted_fields import EncryptedJSONStringField

from typing import Self
from enum import Enum


class ContentType(str, Enum):
MIXPANEL = "mixpanel"
CAPTURED = "captured"

def serialize(self) -> dict:
return {"type": self.value}


class BatchImport(UUIDModel):
class Status(models.TextChoices):
COMPLETED = "completed", "Completed"
FAILED = "failed", "Failed"
PAUSED = "paused", "Paused"
RUNNING = "running", "Running"

team = models.ForeignKey(Team, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
lease_id = models.TextField(null=True, blank=True)
leased_until = models.DateTimeField(null=True, blank=True)
status = models.TextField(choices=Status.choices, default=Status.RUNNING)
status_message = models.TextField(null=True, blank=True)
state = models.JSONField(null=True, blank=True)
import_config = models.JSONField()
secrets = EncryptedJSONStringField()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._config_builder = BatchImportConfigBuilder(self)

@property
def config(self) -> "BatchImportConfigBuilder":
return self._config_builder


# Mostly used for manual job creation
class BatchImportConfigBuilder:
def __init__(self, batch_import: BatchImport):
self.batch_import = batch_import
self.batch_import.import_config = {}
self.batch_import.secrets = {}

def json_lines(self, content_type: ContentType, skip_blanks: bool = True) -> Self:
self.batch_import.import_config["data_format"] = {
"type": "json_lines",
"skip_blanks": skip_blanks,
"content": content_type.serialize(),
}
return self

def from_folder(self, path: str) -> Self:
self.batch_import.import_config["source"] = {"type": "folder", "path": path}
return self

def from_urls(
self, urls: list[str], urls_key: str = "urls", allow_internal_ips: bool = False, timeout_seconds: int = 30
) -> Self:
self.batch_import.import_config["source"] = {
"type": "url_list",
"urls_key": urls_key,
"allow_internal_ips": allow_internal_ips,
"timeout_seconds": timeout_seconds,
}
self.batch_import.secrets[urls_key] = urls
return self

def to_stdout(self, as_json: bool = True) -> Self:
self.batch_import.import_config["sink"] = {"type": "stdout", "as_json": as_json}
return self

def to_file(self, path: str, as_json: bool = True, cleanup: bool = False) -> Self:
self.batch_import.import_config["sink"] = {"type": "file", "path": path, "as_json": as_json, "cleanup": cleanup}
return self

def to_kafka(self, topic: str, send_rate: int, transaction_timeout_seconds: int) -> Self:
self.batch_import.import_config["sink"] = {
"type": "kafka",
"topic": topic,
"send_rate": send_rate,
"transaction_timeout_seconds": transaction_timeout_seconds,
}
return self

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 22634f0

Please sign in to comment.