Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(audit): Remove Task data from aggregator after a response has been responded or expires. #1004

Open
wants to merge 22 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 82 additions & 4 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/yetanotherco/aligned_layer/metrics"

"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
Expand Down Expand Up @@ -61,12 +60,16 @@ type Aggregator struct {
// and can start from zero
batchesIdxByIdentifierHash map[[32]byte]uint32

// Stores the taskCreatedBlock for each batch bt batch index
// Stores the taskCreatedBlock for each batch by batch index
batchCreatedBlockByIdx map[uint32]uint64

// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores if a batch has been finalized, either by response or failure to respond
batchIsFinalizedByIdx map[uint32]struct{} // Id in the key list means it is finalized, using empty struct to save memory
batchIsFinalizedChan chan uint32

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand Down Expand Up @@ -106,6 +109,8 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
batchIsFinalizedByIdx := make(map[uint32]struct{})
batchIsFinalizedChan := make(chan uint32)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
Expand All @@ -119,7 +124,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
aggregatorPrivateKey := aggregatorConfig.EcdsaConfig.PrivateKey

logger := aggregatorConfig.BaseConfig.Logger
clients, err := clients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
clients, err := sdkclients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
if err != nil {
logger.Errorf("Cannot create sdk clients", "err", err)
return nil, err
Expand Down Expand Up @@ -161,6 +166,8 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
batchIsFinalizedByIdx: batchIsFinalizedByIdx,
batchIsFinalizedChan: batchIsFinalizedChan,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand All @@ -171,6 +178,8 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
metrics: aggregatorMetrics,
}

go aggregator.clearTasksFromMaps(garbageCollectorPeriod)

return &aggregator, nil
}

Expand Down Expand Up @@ -202,17 +211,29 @@ func (agg *Aggregator) Start(ctx context.Context) error {
"taskIndex", blsAggServiceResp.TaskIndex)

go agg.handleBlsAggServiceResponse(blsAggServiceResp)
case taskIdx := <-agg.batchIsFinalizedChan:
agg.logger.Info("Batch is finalized", "taskIndex", taskIdx)
agg.finalizeBatchIdx(taskIdx)
}
}
}

const MaxSentTxRetries = 5

const garbageCollectorPeriod = 60 * time.Second

const BLS_AGG_SERVICE_TIMEOUT = 100 * time.Second

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
if blsAggServiceResp.Err != nil {
agg.taskMutex.Lock()
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err, "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
// Task errored, mark as finalized
// TODO:
// Actually, if I remove this task from the maps, Agg will go and fetch it again from chain
// agg.batchIsFinalizedChan <- blsAggServiceResp.TaskIndex
uri-99 marked this conversation as resolved.
Show resolved Hide resolved

agg.taskMutex.Unlock()
return
}
Expand Down Expand Up @@ -265,6 +286,11 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

// Mark the batch as finalized
agg.batchIsFinalizedChan <- blsAggServiceResp.TaskIndex


return
}

Expand All @@ -278,6 +304,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
"merkleRoot", "0x"+hex.EncodeToString(batchData.BatchMerkleRoot[:]),
"senderAddress", "0x"+hex.EncodeToString(batchData.SenderAddress[:]),
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

// Aggregator failed to respond to the task, mark the batch as finalized
agg.batchIsFinalizedChan <- blsAggServiceResp.TaskIndex
}

// / Sends response to contract and waits for transaction receipt
Expand Down Expand Up @@ -347,12 +376,17 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
"batchIdentifierHash", batchIdentifierHash,
)
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, BLS_AGG_SERVICE_TIMEOUT)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
Expand All @@ -362,3 +396,47 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
}

// long-lived gorouting that periodically checks and removes finished Tasks from stored Maps
func (agg *Aggregator) clearTasksFromMaps(period time.Duration) {
agg.AggregatorConfig.BaseConfig.Logger.Info("- Removing finalized Task Infos from Maps every %d seconds", period)

for {
time.Sleep(period)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we sleep the function after execute the main task, this way we can know easily if the task is being executed correctly (even if it is not deleted anything yet, at least we now it is being executed)


agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")

// Reading batchIsFinalizedByIdx map without using a lock because worst case scenario is we miss a newly inserted value
// in which case, the value will be catched in the next iteration

for idx := range agg.batchIsFinalizedByIdx {
agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning up finalized task", "taskIndex", idx)

// Critical section inside anonymous function to ensure defer works
func() {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Removing Task Info from Aggregator")

defer func() {
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Removed Task Info from Aggregator")
agg.taskMutex.Unlock()
}()

batchIdentifierHash := agg.batchesIdentifierHashByIdx[idx]

delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, idx)
delete(agg.batchesIdentifierHashByIdx, idx)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)

delete(agg.batchIsFinalizedByIdx, idx)
}()
}

}
}

// called in the chan so no need for a mutex
func (agg *Aggregator) finalizeBatchIdx(idx uint32) {
agg.batchIsFinalizedByIdx[idx] = struct{}{} //now the key is present, no need to waste memory on a value
}
8 changes: 4 additions & 4 deletions batcher/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading