-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.ts
99 lines (79 loc) · 2.52 KB
/
service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import type { Etl, EtlAttributes, SourceEtl, DestinationEtl, LazyImport, AsyncIterator, AsyncWithData } from './src/types.js'
import BaseSource from './src/base_source.js'
import BaseTransform from './src/base_transform.js'
import BaseDestination from './src/base_destination.js'
import { isLazyImport, isAsyncIterableIterator } from './src/utils.js'
class Object implements Etl {
async run(attributes: EtlAttributes, single: boolean = false) {
let results = []
const { src, trans, dest } = await this.#construct(attributes)
for await (let item of src.each()) {
if (trans) {
item = await trans(item)
}
results.push(await dest.write(item))
}
if (single) {
return results.shift()
}
return results
}
// Private Methods
async #construct({ source, transform, destination }: EtlAttributes) {
const src = await this.#source(source)
const dest = await this.#destination(destination)
let trans = null
if (transform) {
trans = await this.#transform(transform)
}
return { src, trans, dest }
}
async #source(attr: SourceEtl) {
let fn
let source = attr
let options = {}
if (Array.isArray(source)) {
source = (attr as [LazyImport, options: Object])[0]
options = (attr as [LazyImport, options: Object])[1]
}
if (isAsyncIterableIterator(source())) {
fn = {
each: source as AsyncIterator
}
} else {
const { default: SourceUnknown } = await (source as LazyImport)()
fn = new (SourceUnknown as typeof BaseSource)(options)
}
return fn
}
async #destination(attr: DestinationEtl) {
let fn
let destination = attr
let options = {}
if (Array.isArray(destination)) {
destination = (attr as [LazyImport, options: Object])[0]
options = (attr as [LazyImport, options: Object])[1]
}
if (await isLazyImport(destination)) {
const { default: DestinationUnknown } = await (destination as LazyImport)()
fn = new (DestinationUnknown as typeof BaseDestination)(options)
} else {
fn = {
write: destination as AsyncWithData
}
}
return fn
}
async #transform(transform: LazyImport | AsyncWithData) {
let fn
if (await isLazyImport(transform)) {
const { default: TransformUnknown } = await (transform as LazyImport)()
const trans = new (TransformUnknown as typeof BaseTransform)()
fn = trans.process
} else {
fn = transform as AsyncWithData
}
return fn
}
}
export default new Object()