Skip to content

Commit

Permalink
refactor calc recom score cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
Da.Lei committed Jan 22, 2025
1 parent a8d401b commit 02cce9e
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 38 deletions.
34 changes: 24 additions & 10 deletions _mocks/opencsg.com/csghub-server/component/mock_RecomComponent.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions api/workflow/activity/calc_recom_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ import (
)

func (a *Activities) CalcRecomScore(ctx context.Context) error {
a.recom.CalculateRecomScore(context.Background())
return nil
return a.recom.CalculateRecomScore(ctx, 0)
}
3 changes: 2 additions & 1 deletion api/workflow/cron_calc_recom_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func CalcRecomScoreWorkflow(ctx workflow.Context) error {
}

options := workflow.ActivityOptions{
StartToCloseTimeout: time.Hour * 1,
StartToCloseTimeout: time.Hour * 2,
RetryPolicy: retryPolicy,
}

Expand All @@ -26,5 +26,6 @@ func CalcRecomScoreWorkflow(ctx workflow.Context) error {
logger.Error("failed to calc recom score", "error", err)
return err
}
logger.Info("calc recom score workflow completed")
return nil
}
27 changes: 20 additions & 7 deletions api/workflow/schedule_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package workflow_test

import (
"errors"
"testing"

"github.com/stretchr/testify/mock"
Expand All @@ -12,13 +13,25 @@ import (
)

func TestSchedule_CalcRecomScoreWorkflow(t *testing.T) {
tester, err := newWorkflowTester(t)
require.NoError(t, err)

tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything).Return()
tester.scheduler.Execute("calc-recom-score-schedule", tester.cronEnv)
require.True(t, tester.cronEnv.IsWorkflowCompleted())
require.NoError(t, tester.cronEnv.GetWorkflowError())
t.Run("normal", func(t *testing.T) {
tester, err := newWorkflowTester(t)
require.NoError(t, err)

tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything, 0).Return(nil)
tester.scheduler.Execute("calc-recom-score-schedule", tester.cronEnv)
require.True(t, tester.cronEnv.IsWorkflowCompleted())
require.NoError(t, tester.cronEnv.GetWorkflowError())
})

t.Run("error", func(t *testing.T) {
tester, err := newWorkflowTester(t)
require.NoError(t, err)

tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything, 0).Return(errors.New("error"))
tester.scheduler.Execute("calc-recom-score-schedule", tester.cronEnv)
require.True(t, tester.cronEnv.IsWorkflowCompleted())
require.Error(t, tester.cronEnv.GetWorkflowError())
})
}

func TestSchedule_SyncAsClient(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion cmd/csghub-server/cmd/cron/calc_recom_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var cmdCalcRecomScore = &cobra.Command{
slog.Error("failed to create recom component", "err", err)
return
}
c.CalculateRecomScore(cmd.Context())
err = c.CalculateRecomScore(cmd.Context(), 0)
if err != nil {
slog.Error("failed to calculate recom score", "err", err)
}
},
}
43 changes: 28 additions & 15 deletions component/recom.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package component

import (
"context"
"errors"
"fmt"
"log/slog"
"time"
Expand All @@ -22,7 +23,7 @@ type recomComponentImpl struct {
type RecomComponent interface {
SetOpWeight(ctx context.Context, repoID, weight int64) error
// loop through repositories and calculate the recom score of the repository
CalculateRecomScore(ctx context.Context)
CalculateRecomScore(ctx context.Context, batchSize int) error
CalcTotalScore(ctx context.Context, repo *database.Repository, weights map[string]string) float64
}

Expand All @@ -48,26 +49,38 @@ func (rc *recomComponentImpl) SetOpWeight(ctx context.Context, repoID, weight in
}

// loop through repositories and calculate the recom score of the repository
func (rc *recomComponentImpl) CalculateRecomScore(ctx context.Context) {
func (rc *recomComponentImpl) CalculateRecomScore(ctx context.Context, batchSize int) error {
weights, err := rc.loadWeights()
if err != nil {
slog.Error("Error loading weights", "error", err)
return
return errors.New("error loading weights")
}
repos, err := rc.repoStore.All(ctx)
if err != nil {
slog.Error("Error fetching repositories", "error", err)
return
if batchSize <= 0 {
batchSize = 1000
}
for _, repo := range repos {
repoID := repo.ID
score := rc.CalcTotalScore(ctx, repo, weights)
err := rc.recomStore.UpsertScore(ctx, repoID, score)
batch := 0
for {
repos, err := rc.repoStore.FindWithBatch(ctx, batchSize, batch)
if err != nil {
slog.Error("Error updating recom score", slog.Int64("repo_id", repoID), slog.Float64("score", score),
slog.String("error", err.Error()))
return errors.New("error fetching repositories")
}
for _, repo := range repos {
repoID := repo.ID
score := rc.CalcTotalScore(ctx, &repo, weights)
err := rc.recomStore.UpsertScore(ctx, repoID, score)
if err != nil {
slog.Error("Error updating recom score", slog.Int64("repo_id", repoID), slog.Float64("score", score),
slog.String("error", err.Error()))
}
}

if len(repos) < batchSize {
break
}

batch++
}

return nil
}

func (rc *recomComponentImpl) CalcTotalScore(ctx context.Context, repo *database.Repository, weights map[string]string) float64 {
Expand All @@ -83,7 +96,7 @@ func (rc *recomComponentImpl) CalcTotalScore(ctx context.Context, repo *database

qualityScore, err := rc.calcQualityScore(ctx, repo)
if err != nil {
slog.Error("failed to calculate quality score", slog.Any("error", err))
slog.Error("failed to calculate quality score", slog.Any("error", err), slog.Int64("repo_id", repo.ID), slog.String("repo_path", repo.Path))
} else {
score += qualityScore
}
Expand Down
9 changes: 7 additions & 2 deletions component/recom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ func TestRecomComponent_CalculateRecomScore(t *testing.T) {
rc.mocks.stores.RecomMock().EXPECT().LoadWeights(mock.Anything).Return(
[]*database.RecomWeight{{Name: "freshness", WeightExp: "score = 12.34"}}, nil,
)
rc.mocks.stores.RepoMock().EXPECT().All(ctx).Return([]*database.Repository{
// loop 1
rc.mocks.stores.RepoMock().EXPECT().FindWithBatch(ctx, 1, 0).Return([]database.Repository{
{ID: 1, Path: "foo/bar"},
}, nil)
// loop 2
rc.mocks.stores.RepoMock().EXPECT().FindWithBatch(ctx, 1, 1).Return([]database.Repository{}, nil)

rc.mocks.stores.RecomMock().EXPECT().UpsertScore(ctx, int64(1), 12.34).Return(nil)
rc.mocks.gitServer.EXPECT().GetRepoFileTree(mock.Anything, gitserver.GetRepoInfoByPathReq{
Namespace: "foo",
Name: "bar",
}).Return(nil, nil)

rc.CalculateRecomScore(ctx)
err := rc.CalculateRecomScore(ctx, 1)
require.NoError(t, err)

Check failure on line 48 in component/recom_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: require (typecheck)

Check failure on line 48 in component/recom_test.go

View workflow job for this annotation

GitHub Actions / test (1.23)

undefined: require
}

func TestRecomComponent_CalculateTotalScore(t *testing.T) {
Expand Down

0 comments on commit 02cce9e

Please sign in to comment.