Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(audit): Remove Task data from aggregator after a response has been responded or expires. #1004

Open
wants to merge 22 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"time"

"github.com/urfave/cli/v2"
"github.com/yetanotherco/aligned_layer/aggregator/internal/pkg"
Expand Down Expand Up @@ -38,6 +39,9 @@ func main() {
}
}

const garbageCollectorPeriod = time.Second * 150 //TODO change to time.Day * 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should we change it? If we have to change it, it may be a config variable, so we do not need to change code

const garbageCollectorTasksAge = uint64(10) //TODO change to 2592000, 1 month of blocks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should we change it? If we have to change it, it may be a config variable, so we do not need to change code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Age does not tell much more. What does it mean?


func aggregatorMain(ctx *cli.Context) error {

configFilePath := ctx.String(config.ConfigFileFlag.Name)
Expand All @@ -49,6 +53,13 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

// Supervisor revives garbage collector
go func() {
for {
aggregator.ClearTasksFromMaps(garbageCollectorPeriod, garbageCollectorTasksAge)
}
}()

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
Expand Down
55 changes: 53 additions & 2 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Aggregator struct {
// and can start from zero
batchesIdxByIdentifierHash map[[32]byte]uint32

// Stores the taskCreatedBlock for each batch bt batch index
// Stores the taskCreatedBlock for each batch by batch index
batchCreatedBlockByIdx map[uint32]uint64

// Stores the TaskResponse for each batch by batchIdentifierHash
Expand Down Expand Up @@ -178,6 +178,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
telemetry: aggregatorTelemetry,
}


return &aggregator, nil
}

Expand Down Expand Up @@ -215,6 +216,9 @@ func (agg *Aggregator) Start(ctx context.Context) error {

const MaxSentTxRetries = 5


const BLS_AGG_SERVICE_TIMEOUT = 100 * time.Second

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
Expand Down Expand Up @@ -275,6 +279,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down Expand Up @@ -361,12 +366,17 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
"batchIdentifierHash", batchIdentifierHash,
)
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, BLS_AGG_SERVICE_TIMEOUT)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
Expand All @@ -377,3 +387,44 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
}

// long-lived gorouting that periodically checks and removes old Tasks from stored Maps
func (agg *Aggregator) ClearTasksFromMaps(period time.Duration, blocksOld uint64) {
defer func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to document it. At least a brief explanation of how it works and its motivation

err := recover() //stops panics
if err != nil {
agg.logger.Error(err.(string))
}
}()

agg.AggregatorConfig.BaseConfig.Logger.Info(fmt.Sprintf("- Removing finalized Task Infos from Maps every %v", period))
lastIdxDeleted := uint32(0)

for {
time.Sleep(period)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we sleep the function after execute the main task, this way we can know easily if the task is being executed correctly (even if it is not deleted anything yet, at least we now it is being executed)


agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")
oldTaskIdHash, err := agg.avsReader.GetOldTaskHash(blocksOld)
if err != nil {
agg.logger.Error("Error getting old task hash, skipping this garbage collect", "err", err)
continue // Retry in the next iteration
}

oldTaskIdx := agg.batchesIdxByIdentifierHash[*oldTaskIdHash]
agg.logger.Info("Old task found", "taskIndex", oldTaskIdx)
for i := lastIdxDeleted+1; i <= oldTaskIdx; i++ {
batchIdentifierHash, exists := agg.batchesIdentifierHashByIdx[i]
if exists {
agg.logger.Info("Cleaning up finalized task", "taskIndex", i)
delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
}
lastIdxDeleted = oldTaskIdx
agg.AggregatorConfig.BaseConfig.Logger.Info("Done cleaning finalized tasks from maps")
}
}
Loading
Loading