Skip to content

Commit

Permalink
refactor: remove job repository
Browse files Browse the repository at this point in the history
  • Loading branch information
batrov committed Dec 7, 2023
1 parent e6109d3 commit 8a01b7a
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 212 deletions.
6 changes: 0 additions & 6 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,10 @@ func runServer(ctx context.Context, cfg *Config) error {
return fmt.Errorf("create new lineage repository: %w", err)
}

jobRepository, err := postgres.NewJobRepository(pgClient)
if err != nil {
return fmt.Errorf("create new job repository: %w", err)
}

wrkr, err := initAssetWorker(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: discoveryRepository,
AssetRepo: assetRepository,
JobRepo: jobRepository,
Logger: logger,
})
if err != nil {
Expand Down
6 changes: 0 additions & 6 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,11 @@ func runWorker(ctx context.Context, cfg *Config) error {
return fmt.Errorf("create new asset repository: %w", err)
}

jobRepository, err := postgres.NewJobRepository(pgClient)
if err != nil {
return fmt.Errorf("create new job repository: %w", err)
}

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout,
strings.Split(cfg.ColSearchExclusionKeywords, ",")),
AssetRepo: assetRepository,
JobRepo: jobRepository,
Logger: logger,
})
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions core/job/job.go

This file was deleted.

31 changes: 0 additions & 31 deletions core/job/service.go

This file was deleted.

21 changes: 13 additions & 8 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -85,7 +84,7 @@ func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName strin
return nil, err
}

err = repo.updateAlias(ctx, backupIndexName, "universe")
err = repo.updateAlias(ctx, backupIndexName, defaultSearchIndex)
if err != nil {
return nil, err
}
Expand All @@ -101,7 +100,7 @@ func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName strin
}

cleanup := func() error {
err = repo.updateAlias(ctx, indexName, "universe")
err = repo.updateAlias(ctx, indexName, defaultSearchIndex)
if err != nil {
return err
}
Expand Down Expand Up @@ -237,17 +236,23 @@ func createUpsertBody(ast asset.Asset) (io.Reader, error) {
}

func (repo *DiscoveryRepository) clone(ctx context.Context, indexName, clonedIndexName string) error {
indexExistsFn := repo.cli.client.Indices.Exists
resp, _ := indexExistsFn([]string{clonedIndexName})
if resp.StatusCode == http.StatusOK {
idxExists, err := repo.cli.indexExists(ctx, "CloneIndex", clonedIndexName)
if err != nil {
return asset.DiscoveryError{
Op: "IndexExists",
Index: indexName,
Err: err,
}
}
if idxExists {
return nil // skip clone when backup already created
}

cloneFn := repo.cli.client.Indices.Clone
resp, err := cloneFn(indexName, clonedIndexName, cloneFn.WithContext(ctx))
if err != nil {
return asset.DiscoveryError{
Op: "CloneDoc",
Op: "CloneIndex",
Index: indexName,
Err: err,
}
Expand All @@ -256,7 +261,7 @@ func (repo *DiscoveryRepository) clone(ctx context.Context, indexName, clonedInd
if resp.IsError() {
code, reason := errorCodeAndReason(resp)
return asset.DiscoveryError{
Op: "CloneDoc",
Op: "CloneIndex",
Index: indexName,
ESCode: code,
Err: errors.New(reason),
Expand Down
13 changes: 12 additions & 1 deletion internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,22 @@ func TestDiscoveryRepository_SyncAssets(t *testing.T) {

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

_, err = repo.SyncAssets(ctx, indexName)
cleanup, err := repo.SyncAssets(ctx, indexName)
require.NoError(t, err)

alias := cli.Indices.GetAlias
resp, _ := alias(alias.WithIndex(indexName))
require.NotEmpty(t, resp)

err = cleanup()
require.NoError(t, err)

res, err := cli.Indices.Exists([]string{"bigquery-test"})
require.Equal(t, res.StatusCode, 200)
require.NoError(t, err)

res, err = cli.Indices.Exists([]string{"bigquery-test-bak"})
require.Equal(t, res.StatusCode, 404)
require.NoError(t, err)
})
}
49 changes: 0 additions & 49 deletions internal/store/postgres/job_model.go

This file was deleted.

63 changes: 0 additions & 63 deletions internal/store/postgres/job_repository.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/workermanager/discovery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error {
const batchSize = 1000
service := string(job.Payload)

jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service)
jobs, err := m.worker.GetSyncJobsByService(ctx, service)
if err != nil {
return fmt.Errorf("sync asset: get sync jobs by service: %w", err)
}
Expand Down
23 changes: 4 additions & 19 deletions internal/workermanager/in_situ_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ package workermanager
import (
"context"
"fmt"
"time"
"sync"

"github.com/goto/compass/core/asset"
"github.com/goto/compass/core/job"
)

type InSituWorker struct {
discoveryRepo DiscoveryRepository
jobRepo job.Repository
assetRepo asset.Repository
mutex sync.Mutex
}

func NewInSituWorker(deps Deps) *InSituWorker {
return &InSituWorker{
discoveryRepo: deps.DiscoveryRepo,
jobRepo: deps.JobRepo,
assetRepo: deps.AssetRepo,
}
}
Expand All @@ -40,22 +38,9 @@ func (m *InSituWorker) EnqueueDeleteAssetJob(ctx context.Context, urn string) er

func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) error {
const batchSize = 1000
jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service)
if err != nil {
return fmt.Errorf("sync asset: get sync jobs by service: %w", err)
}

if len(jobs) > 0 {
return nil // mark job as done if there's earlier job with same service
}

jobID, err := m.jobRepo.Insert(ctx, jobSyncAsset, ([]byte)(service), time.Now().UTC())
if err != nil {
return fmt.Errorf("sync asset: insert job queue: %w", err)
}
defer func() {
_ = m.jobRepo.Delete(ctx, jobID)
}()
m.mutex.Lock()
defer m.mutex.Unlock()

cleanup, err := m.discoveryRepo.SyncAssets(ctx, service)
if err != nil {
Expand Down
Loading

0 comments on commit 8a01b7a

Please sign in to comment.