Skip to content

Commit

Permalink
feat: add batching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
batrov committed Dec 6, 2023
1 parent fea60ac commit bb08dba
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 6 deletions.
4 changes: 3 additions & 1 deletion core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ func (s *Service) SuggestAssets(ctx context.Context, cfg SearchConfig) (suggesti

func (s *Service) SyncAssets(ctx context.Context, services []string) error {
for _, service := range services {
s.worker.EnqueueSyncAssetJob(ctx, service)
if err := s.worker.EnqueueSyncAssetJob(ctx, service); err != nil {
return err
}
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -224,6 +225,12 @@ func createUpsertBody(ast asset.Asset) (io.Reader, error) {
}

func (repo *DiscoveryRepository) clone(ctx context.Context, indexName, clonedIndexName string) error {
indexExistsFn := repo.cli.client.Indices.Exists
resp, _ := indexExistsFn([]string{clonedIndexName})
if resp.StatusCode == http.StatusOK {
return nil // skip clone when backup already created
}

cloneFn := repo.cli.client.Indices.Clone
resp, err := cloneFn(indexName, clonedIndexName, cloneFn.WithContext(ctx))
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,51 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
assert.NoError(t, err)
})
}

func TestDiscoveryRepository_SyncAssets(t *testing.T) {
t.Run("should return success", func(t *testing.T) {
var (
ctx = context.Background()
indexName = "bigquery-test"
)

ast1 := asset.Asset{
ID: "id1",
Type: asset.TypeTable,
Service: indexName,
URN: "urn1",
}
ast2 := asset.Asset{
ID: "id2",
Type: asset.TypeTable,
Service: indexName,
URN: "urn2",
}
ast3 := asset.Asset{
ID: "id3",
Type: asset.TypeTable,
Service: indexName,
URN: "urn3",
}

assets := []asset.Asset{ast1, ast2, ast3}

cli, err := esTestServer.NewClient()
require.NoError(t, err)

_, err = cli.Indices.Create(indexName)
require.NoError(t, err)

esClient, err := store.NewClient(
log.NewNoop(),
store.Config{},
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.SyncAssets(ctx, indexName, assets)
require.NoError(t, err)
})
}
42 changes: 38 additions & 4 deletions internal/workermanager/discovery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (m *Manager) syncAssetHandler() worker.JobHandler {
return worker.JobHandler{
Handle: m.SyncAssets,
JobOpts: worker.JobOptions{
MaxAttempts: 3,
Timeout: 5 * time.Second,
MaxAttempts: 1,
Timeout: 5 * time.Minute,
BackoffStrategy: worker.DefaultExponentialBackoff,
},
}
Expand All @@ -72,6 +72,7 @@ func (m *Manager) IndexAsset(ctx context.Context, job worker.JobSpec) error {
}

func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error {
const batchSize = 1000
service := string(job.Payload)

jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service)
Expand All @@ -82,19 +83,52 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error {
if len(jobs) > 1 {
for _, job := range jobs {
if job.RunAt.Before(job.RunAt) {
return nil // mark job as done if there's earlier job with same service
return nil // mark job as done if there's earlier job with same service to prevent race conditions
}
}
}

assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
SortBy: "name",
})
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

return m.discoveryRepo.SyncAssets(ctx, service, assets)
if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil {
return err
}

if len(assets) == batchSize { // do remaining upsert after first batch completed
it := 1

for {
assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
Offset: it * batchSize,
SortBy: "name",
})
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

for _, ast := range assets {
if err := m.discoveryRepo.Upsert(ctx, ast); err != nil {
return err
}
}

if len(assets) != batchSize {
break
}
it++
}

}
return nil
}

func (m *Manager) EnqueueDeleteAssetJob(ctx context.Context, urn string) error {
Expand Down
38 changes: 37 additions & 1 deletion internal/workermanager/in_situ_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (m *InSituWorker) EnqueueDeleteAssetJob(ctx context.Context, urn string) er
}

func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) error {
const batchSize = 1000
jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service)
if err != nil {
return fmt.Errorf("sync asset: get sync jobs by service: %w", err)
Expand All @@ -56,14 +57,49 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string)
_ = m.jobRepo.Delete(ctx, jobID)
}()

// need to limit this
assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
SortBy: "name",
})
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

return m.discoveryRepo.SyncAssets(ctx, service, assets)
if err := m.discoveryRepo.SyncAssets(ctx, service, assets); err != nil {
return err
}

if len(assets) == batchSize { // do remaining upsert after first batch completed
it := 1

for {
assets, err := m.assetRepo.GetAll(ctx, asset.Filter{
Services: []string{service},
Size: batchSize,
Offset: it * batchSize,
SortBy: "name",
})
if err != nil {
return fmt.Errorf("sync asset: get assets: %w", err)
}

for _, ast := range assets {
if err := m.discoveryRepo.Upsert(ctx, ast); err != nil {
return err
}
}

if len(assets) != batchSize {
break
}
it++
}

}

return nil
}

func (*InSituWorker) Close() error { return nil }

0 comments on commit bb08dba

Please sign in to comment.