Skip to content

Commit

Permalink
fix: error count on reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Jun 17, 2024
1 parent 9419b85 commit da5f19b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
34 changes: 22 additions & 12 deletions tests/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
})

ginkgo.It("should push config items first to satisfy foreign keys for changes & analyses", func() {
count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "config_items")
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "config_items")
Expect(err).To(BeNil())
Expect(count).To(Not(BeZero()))
Expect(fkFailed).To(BeZero())
})

ginkgo.It("should sync config_changes to upstream", func() {
Expand All @@ -84,8 +85,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(BeZero())

count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "config_changes")
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "config_changes")
Expect(err).ToNot(HaveOccurred())
Expect(fkFailed).To(BeZero())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expand All @@ -112,8 +114,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(BeZero())

count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "config_analysis")
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "config_analysis")
Expect(err).ToNot(HaveOccurred())
Expect(fkFailed).To(BeZero())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expand All @@ -138,8 +141,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
Expect(err).ToNot(HaveOccurred())
Expect(artifacts).To(BeZero())

count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "artifacts")
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "artifacts")
Expect(err).ToNot(HaveOccurred())
Expect(fkFailed).To(BeZero())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.Artifact{}).Scan(&artifacts).Error
Expect(err).ToNot(HaveOccurred())
Expand All @@ -162,8 +166,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
Expect(err).ToNot(HaveOccurred())
Expect(upstreamCount).To(BeZero())

count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "job_history")
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "job_history")
Expect(err).ToNot(HaveOccurred())
Expect(fkFailed).To(BeZero())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.JobHistory{}).Scan(&upstreamCount).Error
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -257,9 +262,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
Where("id IN ?", []uuid.UUID{deployment.ID, pod.ID}).UpdateColumn("is_pushed", true).Error
Expect(err).To(BeNil())

count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, t)
_, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, t)
Expect(err).To(HaveOccurred())
Expect(count).To(Equal(0))
Expect(fkFailed).To(BeNumerically(">", 0))

// After reconciliation, those config items should have been marked as unpushed.
var unpushed int
Expand Down Expand Up @@ -317,8 +322,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
})

ginkgo.It("should reconcile the above canary & checks", func() {
_, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "canaries", "checks")
_, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "canaries", "checks")
Expect(err).To(BeNil())
Expect(fkFailed).To(BeZero())

var canaryCount int
err = DefaultContext.DB().Model(&models.Canary{}).Select("Count(*)").Where("id IN ?", []uuid.UUID{httpCanary.ID, tcpCanary.ID}).Where("is_pushed = ?", true).Scan(&canaryCount).Error
Expand All @@ -339,8 +345,9 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
err = DefaultContext.DB().Model(&models.Check{}).Where("id IN ?", []uuid.UUID{httpChecks.ID, tcpCheck.ID}).Update("is_pushed", false).Error
Expect(err).To(BeNil())

_, err = upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "checks")
_, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "checks")
Expect(err).To(Not(BeNil()))
Expect(fkFailed).To(BeNumerically(">", 0))

// We expect the http check to have been marked as pushed
// while the tcp check & its canary to have been marked as unpushed
Expand All @@ -361,17 +368,19 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
})

ginkgo.It("The next round of reconciliation should have no error", func() {
_, err := upstream.ReconcileAll(DefaultContext, upstreamConf, 100)
_, fkFailed, err := upstream.ReconcileAll(DefaultContext, upstreamConf, 100)
Expect(err).To(BeNil())
Expect(fkFailed).To(BeZero())
})
})
})
})

ginkgo.Context("should handle updates", func() {
ginkgo.It("ensure all the topologies & canaries have been pushed", func() {
_, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "topologies", "canaries")
_, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "topologies", "canaries")
Expect(err).To(BeNil())
Expect(fkFailed).To(BeZero())

var unpushedCanaries int
err = DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.Canary{}).Scan(&unpushedCanaries).Error
Expand All @@ -392,9 +401,10 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
err = DefaultContext.DB().Model(&models.Canary{}).Where("is_pushed = ?", true).Update("is_pushed", false).Error
Expect(err).To(BeNil())

count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "topologies", "canaries")
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "topologies", "canaries")
Expect(err).To(BeNil())
Expect(count).To(Not(BeZero()))
Expect(fkFailed).To(BeZero())

var unpushedCanaries int
err = DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.Canary{}).Scan(&unpushedCanaries).Error
Expand Down
36 changes: 19 additions & 17 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,53 +67,54 @@ var reconciledTables = []pushableTable{
models.JobHistory{},
}

func ReconcileAll(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
func ReconcileAll(ctx context.Context, config UpstreamConfig, batchSize int) (int, int, error) {
return ReconcileSome(ctx, config, batchSize)
}

func ReconcileSome(ctx context.Context, config UpstreamConfig, batchSize int, runOnly ...string) (int, error) {
var count int
func ReconcileSome(ctx context.Context, config UpstreamConfig, batchSize int, runOnly ...string) (int, int, error) {
var count, fkFailed int
for _, table := range reconciledTables {
if len(runOnly) > 0 && !lo.Contains(runOnly, table.TableName()) {
continue
}

if c, err := reconcileTable(ctx, config, table, batchSize); err != nil {
return count, fmt.Errorf("failed to reconcile table %s: %w", table.TableName(), err)
} else {
count += c
success, failed, err := reconcileTable(ctx, config, table, batchSize)
count += success
fkFailed += failed
if err != nil {
return count, fkFailed, fmt.Errorf("failed to reconcile table %s: %w", table.TableName(), err)
}
}

return count, nil
return count, fkFailed, nil
}

// ReconcileTable pushes all unpushed items in a table to upstream.
func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTable, batchSize int) (int, error) {
func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTable, batchSize int) (int, int, error) {
client := NewUpstreamClient(config)

var count int
var count, fkFailed int
for {
items, err := table.GetUnpushed(ctx.DB().Limit(batchSize))
if err != nil {
return 0, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err)
return count, fkFailed, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err)
}

if len(items) == 0 {
return count, nil
return count, fkFailed, nil
}

ctx.Tracef("pushing %s %d to upstream", table.TableName(), len(items))
pushError := client.Push(ctx, NewPushData(items))
if pushError != nil {
httpError := api.HTTPErrorFromErr(pushError)
if httpError == nil || httpError.Data == "" {
return 0, fmt.Errorf("failed to push %s to upstream: %w", table.TableName(), pushError)
return count, fkFailed, fmt.Errorf("failed to push %s to upstream: %w", table.TableName(), pushError)
}

var foreignKeyErr PushFKError
if err := json.Unmarshal([]byte(httpError.Data), &foreignKeyErr); err != nil {
return 0, fmt.Errorf("failed to push %s to upstream (could not decode api error: %w): %w", table.TableName(), err, pushError)
return count, fkFailed, fmt.Errorf("failed to push %s to upstream (could not decode api error: %w): %w", table.TableName(), err, pushError)
}

failedOnes := lo.SliceToMap(foreignKeyErr.IDs, func(item string) (string, struct{}) {
Expand All @@ -123,10 +124,11 @@ func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTa
_, ok := failedOnes[item.PK()]
return ok
})
fkFailed += len(failedItems)

if c, ok := table.(parentIsPushedUpdater); ok && len(failedItems) > 0 {
if err := c.UpdateParentsIsPushed(ctx.DB(), failedItems); err != nil {
return 0, fmt.Errorf("failed to mark parents as unpushed: %w", err)
return count, fkFailed, fmt.Errorf("failed to mark parents as unpushed: %w", err)
}
}

Expand Down Expand Up @@ -166,12 +168,12 @@ func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTa
return nil
})
if err != nil {
return count, err
return count, fkFailed, err
}
}

if pushError != nil {
return count, pushError
return count, fkFailed, pushError
}
}
}

0 comments on commit da5f19b

Please sign in to comment.