Skip to content

Commit

Permalink
openalex: remove ci deployment, add job spec, better logging, add con…
Browse files Browse the repository at this point in the history
…tact email to requests
  • Loading branch information
m0ar committed Jan 28, 2025
1 parent 5c2e932 commit 65d406a
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 146 deletions.
118 changes: 0 additions & 118 deletions .github/workflows/build-openalex-importer.yaml

This file was deleted.

9 changes: 9 additions & 0 deletions openalex-importer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
FROM node:20.18.1-bullseye-slim

RUN apt-get update && apt-get install -y \
jq \
postgresql-client \
socat \
curl \
vim \
bash \
procps

RUN mkdir /app
WORKDIR /app

Expand Down
40 changes: 40 additions & 0 deletions openalex-importer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ ensure atomic delta imports.

Start container. Without env configuration, will run import of previous day every day at noon UTC.

#### Deployment
There is no CI build/deploy for this service, since there is no dev environment for the openalex database.
Alas, be careful 🙏

```bash
# This overwrites openalex-importer:latest ⚠
./build-and-push-to-gcr.sh

# Edit kubernetes/deployment.yaml (this is a job spec, so edit the envvars in the spec to start it in the cluster)
kubectl apply -f kubernetes/deployment.yaml
```

### Script Arguments

```
Expand Down Expand Up @@ -188,3 +200,31 @@ Solution: use `text`

1. Use Job+CronJob to schedule execution without having to provision 24/7
2. Add pkeys and indices to other tables if we want to use them too

## Supported datatypes
Not all OA datatypes are fully supported, the table below shows the status of each.

| Table | Support | Note |
|-------------------------|---------|---------------------------------------|
| authors | 🌓 | only: id, display_name, orcid (1) (2) |
| authors_counts_by_year | ❌ | (2) |
| authors_ids | 🌓 | only: id + orcid (1) (2) |
| topics | ❌ | (2) |
| concepts | ❌ | (2) |
| institutions | ❌ | (2) |
| sources | ❌ | (2) |
| works | ✅ | |
| works_primary_locations | ✅ | |
| works_locations | ✅ | no unique constraint available |
| works_best_oa_locations | ✅ | |
| works_authorships | ❌ | |
| works_topics | ✅ | maps work -> topic ID |
| works_concepts | ✅ | maps work -> concept ID |
| works_ids | ✅ | |
| works_open_access | ✅ | |
| works_referenced_works | ✅ | |
| works_related_works | ✅ | |
q
Footnotes:
1. Populated from the dehydrated `author` field in `work.authorship`, which lacks the rest
2. Needs support for separate API route/format
13 changes: 13 additions & 0 deletions openalex-importer/build-and-push-to-ecr.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#! /bin/env bash

# build docker image and push to ECR
set -euxo pipefail

TAGNAME=openalex-importer

docker build -t $TAGNAME .

docker tag $TAGNAME:latest 523044037273.dkr.ecr.us-east-2.amazonaws.com/$TAGNAME:latest
docker push 523044037273.dkr.ecr.us-east-2.amazonaws.com/$TAGNAME:latest

# aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin 523044037273.dkr.ecr.us-east-2.amazonaws.com
1 change: 1 addition & 0 deletions openalex-importer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ const getRuntimeArgs = (): RuntimeArgs => {
query_to: getParam('query_to', false),
query_schedule: getParam('query_schedule', false),
};
console.log()

const args: Partial<RuntimeArgs> = {};
if (raw.query_type === 'created' || raw.query_type === 'updated') {
Expand Down
56 changes: 32 additions & 24 deletions openalex-importer/kubernetes/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,48 +1,56 @@
apiVersion: v1
kind: Service
apiVersion: batch/v1
kind: Job
metadata:
name: openalex-importer-service
name: openalex-importer
labels:
App: OpenAlexImporter
spec:
replicas: 1
revisionHistoryLimit: 2
selector:
matchLabels:
App: OpenAlexImporter
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
# replicas: 1
# revisionHistoryLimit: 2
# selector:
# matchLabels:
# App: OpenAlexImporter
# strategy:
# type: Recreate
template:
metadata:
annotations:
vault.hashicorp.com/agent-inject: 'true'
vault.hashicorp.com/agent-inject-status: 'update'
vault.hashicorp.com/role: app-vault-reader
vault.hashicorp.com/agent-inject-secret-config: secrets/openalex/db
vault.hashicorp.com/agent-inject-secret-config: secrets/openalex-importer
vault.hashicorp.com/agent-inject-template-config: |
{{- with secret "secrets/desci-server/dev/repo" -}}
echo "sourcing";
export OPENALEX_API_KEY={{ .Data.OPENALEX_API_KEY }}
export NODE_ENV=production
export DATABASE_URL={{ .Data.DATABASE_URL }}
{{- end -}}
{{- with secret "secrets/openalex-importer" -}}
{{- range $k, $v := .Data }}
export {{ $k }}={{ $v }}
{{- end }}
{{- end }}
labels:
App: OpenAlexImporter
spec:
# TODO: change before ultimately deploying
restartPolicy: Never
containers:
- image: 523044037273.dkr.ecr.us-east-2.amazonaws.com/openalex-importer:latest
name: openalex-importer
# command: [ 'tail', '-f', '/dev/null' ]
command: ['/bin/bash', '-c']
args:
- echo "SOURCING ENV"; source /vault/secrets/config; NODE_PATH=./dist node ./dist/index.js;
- echo "SOURCING ENV"; source /vault/secrets/config; node ./dist/index.js;
env:
- name: QUERY_TYPE
value: "updated"
- name: QUERY_FROM
value: "2024-12-06"
- name: QUERY_TO
value: "2024-12-31"
- name: NODE_OPTIONS
value: "--enable-source-maps --max-semi-space-size=256"
resources:
limits:
cpu: '2'
memory: 16Gi
memory: 4Gi
requests:
cpu: '1'
memory: 8Gi
cpu: '2'
memory: 2Gi
serviceAccountName: 'vault-auth'
4 changes: 3 additions & 1 deletion openalex-importer/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Query = {
filter?: FilterParam;
'per-page'?: number;
cursor: string | undefined;
mailto?: string | undefined;
};

export type FilterParam = {
Expand Down Expand Up @@ -52,6 +53,8 @@ export const getInitialWorksQuery = (filter: FilterParam): Query => ({
filter,
'per-page': 200,
cursor: '*',
// https://docs.openalex.org/how-to-use-the-api/rate-limits-and-authentication#the-polite-pool
mailto: '[email protected]',
});

export const fetchWorksPage = (searchQuery: Query) => fetchPage<Work>(WORKS_URL, searchQuery);
Expand All @@ -61,7 +64,6 @@ export async function fetchPage<T>(
searchQuery: Query,
): Promise<{ data: T[]; next_cursor: string | undefined }> {
const query = buildQueryString(searchQuery);
logger.info({ searchQuery }, 'Fetching page...');
const request = new Request(`${url}?${query}`, {
headers: { 'API-KEY': process.env.OPENALEX_API_KEY as string },
});
Expand Down
7 changes: 6 additions & 1 deletion openalex-importer/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ const devTransport = {
},
};

const stdoutTransport = {
target: 'pino/file',
options: { destination: 1 } // this writes to STDOUT,
};

export const logger = pino({
level: logLevel,
serializers: {
files: omitBuffer,
},
transport:
process.env.NODE_ENV === 'production'
? { targets: [] }
? { targets: [stdoutTransport] }
: {
targets: [devTransport],
},
Expand Down
6 changes: 4 additions & 2 deletions openalex-importer/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { appendToLogs, logger, nukeOldLogs } from './logger.js';
import { errWithCause } from 'pino-std-serializers';
import type { Work } from './types/index.js';
import { type DataModels, transformDataModel } from './transformers.js';
import { countArrayLengths } from './util.js';
import { countArrayLengths, getDuration } from './util.js';
import { Writable } from 'node:stream';
import { IS_DEV, MAX_PAGES_TO_FETCH, SKIP_LOG_WRITE } from '../index.js';

Expand Down Expand Up @@ -130,6 +130,7 @@ const createSaveStream = (tx: PgTransactionType, batchId: number): Writable => {

export const runImportPipeline = async (queryInfo: QueryInfo): Promise<void> => {
logger.info(queryInfo, 'Starting import pipeline');
const startTime = Date.now();
await nukeOldLogs();
const filter = filterFromQueryInfo(queryInfo);

Expand All @@ -145,5 +146,6 @@ export const runImportPipeline = async (queryInfo: QueryInfo): Promise<void> =>

await finalizeBatch(tx, batchId);
});
logger.info('Import pipeline finished!');
const duration = getDuration(startTime, Date.now());
logger.info({ duration: `${duration} s`, queryInfo }, 'Import pipeline finished!');
};

0 comments on commit 65d406a

Please sign in to comment.