Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Data Storage #125

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ logs
# local data
postgres-data
parquet-data
parquet-data-indexers
data

# streamlit
Expand Down
14 changes: 14 additions & 0 deletions clickhouse/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM clickhouse/clickhouse-server:24.1.2.1

# Set environment variables
ENV CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml
ENV CLICKHOUSE_USER_CONFIG=/etc/clickhouse-server/users.xml

# Copy custom configuration file
COPY config.yaml /etc/clickhouse-server/config.yaml

# Expose ClickHouse ports
EXPOSE 8123 9000 9009

# Set the default command
CMD ["clickhouse-server"]
83 changes: 83 additions & 0 deletions clickhouse/clickhouse_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import clickhouse_connect
from clickhouse_connect.driver.asyncclient import AsyncClient
import argparse
import os
import asyncio
import time

BASE_PATH = "/var/lib/clickhouse/user_files/parquet-data-indexers"


def get_event_list(network: str) -> set[str]:
event_list = set()
for root, dirs, files in os.walk(f"/parquet-data-indexers/{network}"):
for file in files:
if file.endswith(".parquet"):
event_name = file.split(".")[0]
event_list.add(event_name)
print(f"Found {len(event_list)} events for {network}")
return event_list


async def create_table(
client: AsyncClient,
event_name: str,
network: str,
):
print(event_name)
print(f"{BASE_PATH}/{network}/*/{event_name}.parquet")
query = (
f"create table if not exists {event_name}_{network} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{BASE_PATH}/{network}/*/{event_name}.parquet')"
)
await client.command(query)


async def semaphore_wrapper(
client: AsyncClient,
sm: asyncio.Semaphore,
event_name: str,
network: str,
):
print(event_name)
print(f"{BASE_PATH}/{network}/*/{event_name}.parquet")
async with sm:
data = await client.query(
f"select * from file('{BASE_PATH}/{network}/*/{event_name}.parquet', 'Parquet')"
)
await client.insert(
f"{event_name}_{network}",
data.result_rows,
settings={"async_insert": 1, "wait_for_async_insert": 1},
)


async def main(network: str):
client = await clickhouse_connect.get_async_client(
host="clickhouse", port=8123, user="default"
)

event_list = get_event_list(network)

for event_name in event_list:
await create_table(client, event_name, network)

semaphore = asyncio.Semaphore(4)

start_time = time.time()
await asyncio.gather(
*[
semaphore_wrapper(client, semaphore, event_name, network)
for event_name in event_list
]
)
end_time = time.time()
print(f"Elapsed time: {end_time-start_time:.2f} seconds")


if __name__ == "__main__":
arg = argparse.ArgumentParser()
arg.add_argument("--network", type=str, required=True)
args = arg.parse_args()
asyncio.run(main(args.network))
Empty file added clickhouse/config.yaml
Empty file.
1 change: 1 addition & 0 deletions clickhouse/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
clickhouse-connect
32 changes: 26 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
version: "2.4"

services:
clickhouse:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse
networks:
- data
ulimits:
nofile:
soft: 262144
hard: 262144
volumes:
- ./parquet-data-indexers:/var/lib/clickhouse/user_files/parquet-data-indexers
ports:
- 8123:8123
- 9000:9000
- 9009:9009
deploy:
resources:
limits:
cpus: "4.0"
memory: 8192M
db:
build:
context: ./postgres
Expand All @@ -19,6 +39,7 @@ services:
- ./postgres/initdb:/docker-entrypoint-initdb.d
- ./postgres-data:/var/lib/postgresql/data
- ./parquet-data:/parquet-data
- ./parquet-data-indexers:/parquet-data-indexers
ports:
- "${DB_PORT}:5432"
deploy:
Expand Down Expand Up @@ -80,16 +101,12 @@ services:
context: ./indexers/arbitrum-sepolia
networks:
- data
depends_on:
- db
restart: always
environment:
DB_HOST: db
DB_PORT: 5432
DB_NAME: arbitrum_sepolia
DB_PASS: $PG_PASSWORD
GQL_PORT: 4350
RPC_ENDPOINT: https://sepolia-rollup.arbitrum.io/rpc
volumes:
- ./parquet-data-indexers:/parquet-data-indexers

arbitrum-mainnet-processor:
build:
Expand Down Expand Up @@ -167,6 +184,7 @@ services:
volumes:
- ./parquet-data:/parquet-data
- ./transformers/synthetix:/app/synthetix
- ./parquet-data-indexers:/parquet-data-indexers
networks:
- data

Expand Down Expand Up @@ -218,6 +236,8 @@ services:
- ./scheduler/plugins:/opt/airflow/plugins
- /var/run/docker.sock:/var/run/docker.sock
- ./parquet-data:/parquet-data
- ./parquet-data-indexers:/parquet-data-indexers
- ./clickhouse:/clickhouse
ports:
- "${AIRFLOW_PORT}:8080"

Expand Down
4 changes: 2 additions & 2 deletions indexers/arbitrum-sepolia/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ WORKDIR /app

COPY package*.json ./

RUN npm install
RUN npm ci

COPY . .

RUN npm run generate:processor
RUN npm run build

CMD npm run generate:migration ; npm run start
CMD npm run start
58 changes: 1 addition & 57 deletions indexers/arbitrum-sepolia/commands.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,25 @@
"description": "delete all build artifacts",
"cmd": ["npx", "--yes", "rimraf", "lib"]
},
"generate": {
"description": "Generate a squid from an ABI file",
"cmd": ["squid-gen-abi"]
},
"squid-gen-abi": {
"description": "Generate a squid from an ABI file",
"cmd": ["squid-gen-abi"],
"hidden": true
},
"build": {
"description": "Build the squid project",
"deps": ["clean"],
"cmd": ["tsc"]
},
"up": {
"description": "Start a PG database",
"cmd": ["docker-compose", "up", "-d"]
},
"down": {
"description": "Drop a PG database",
"cmd": ["docker-compose", "down"]
},
"migration:apply": {
"description": "Apply the DB migrations",
"cmd": ["squid-typeorm-migration", "apply"]
},
"migration:generate": {
"description": "Generate a DB migration matching the TypeORM entities",
"deps": ["build", "migration:clean"],
"cmd": ["squid-typeorm-migration", "generate"],
},
"migration:clean": {
"description": "Clean the migrations folder",
"cmd": ["npx", "--yes", "rimraf", "./db/migrations"],
},
"migration": {
"deps": ["build"],
"cmd": ["squid-typeorm-migration", "generate"],
"hidden": true
},
"codegen": {
"description": "Generate TypeORM entities from the schema file",
"cmd": ["squid-typeorm-codegen"]
},
"typegen": {
"description": "Generate data access classes for an ABI file(s) in the ./abi folder",
"cmd": ["squid-evm-typegen", "./src/abi", {"glob": "./abi/*.json"}, "--multicall"]
},
"process": {
"description": "Load .env and start the squid processor",
"deps": ["build", "migration:apply"],
"deps": ["build"],
"cmd": ["node", "--require=dotenv/config", "lib/main.js"]
},
"process:prod": {
"description": "Start the squid processor",
"deps": ["migration:apply"],
"cmd": ["node", "lib/main.js"],
"hidden": true
},
"serve": {
"description": "Start the GraphQL API server",
"cmd": ["squid-graphql-server"]
},
"serve:prod": {
"description": "Start the GraphQL API server with caching and limits",
"cmd": ["squid-graphql-server",
"--dumb-cache", "in-memory",
"--dumb-cache-ttl", "1000",
"--dumb-cache-size", "100",
"--dumb-cache-max-age", "1000" ]
},
"check-updates": {
"cmd": ["npx", "--yes", "npm-check-updates", "--filter=/subsquid/", "--upgrade"],
"hidden": true
Expand All @@ -84,10 +32,6 @@
"description": "Bump @subsquid packages to the latest versions",
"deps": ["check-updates"],
"cmd": ["npm", "i", "-f"]
},
"open": {
"description": "Open a local browser window",
"cmd": ["npx", "--yes", "opener"]
}
}
}
Loading
Loading