Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(audit): Remove Task data from aggregator after a response has been responded or expires. #1004

Open
wants to merge 22 commits into
base: staging
Choose a base branch
from

Conversation

PatStiles
Copy link
Contributor

@PatStiles PatStiles commented Sep 18, 2024

This PR:

Removes Task data from aggregator response maps after a response has been responded to or expired.

closes #977

#To Test:
Setup local dev net as per the guide

make anvil_start_with_block_time
 make aggregator_start
make batcher_start_local

(optional:)

make operator_register_and_start

And leave a task sender running until the end of the test

make batcher_send_burst_groth16

Everything should work as normally.

Also, every 120 seconds, you should see the log: Cleaning finalized tasks from maps , with some extra information about which tasks are being removed from the Aggregator. These are the tasks being removed, all older than 10 blocks of age.

In prod this variables change to bigger numbers, but the flow is the same.

Note: in dev, first iteration may be small because of the constraints, but the second iteration should include more tasks.

@PatStiles PatStiles self-assigned this Sep 19, 2024
Copy link
Contributor

@MauroToscano MauroToscano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if an answer comes after the batch is closed ? It should probably be tested with more operators

@PatStiles
Copy link
Contributor Author

I ran a local testnet with 3 operators produced. The aggregator did not experience any errors and and successfully removed added task information from the aggregator after successfully responding.

@entropidelic entropidelic marked this pull request as draft September 23, 2024 19:43
@entropidelic
Copy link
Contributor

Changed this PR to draft since we should prioritize other issues first

@PatStiles PatStiles marked this pull request as ready for review September 27, 2024 14:18
@MarcosNicolau
Copy link
Collaborator

MarcosNicolau commented Sep 27, 2024

What happens if an answer comes after the batch is closed? It should probably be tested with more operators

The aggregator when receiving the response checks that the batch indeed exists here. Anyway, I tested it on my machine doing:

  1. Registered four operators
  2. Killed one
  3. Sent a bunch of proofs
  4. Wait for the aggregator to respond to the task
  5. Woke the killed operator and when it got the latest task, it sent his response and the aggregator ignored it, logging the following message:
[2024-09-27 11:54:10.904 -03] INFO (pkg/server.go:63) - Locked Resources: Starting processing of Response
[2024-09-27 11:54:10.904 -03] INFO (pkg/server.go:67) - Unlocked Resources: Task not found in the internal map

Copy link
Collaborator

@MarcosNicolau MarcosNicolau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything worked as expected on my machine!

@JuArce
Copy link
Collaborator

JuArce commented Sep 27, 2024

Imo the deletion of the task in the maps should be delayed, so it will accept responses from operators after reaching the quorum

It can be done with a go routine like

go func() {
		time.Sleep(10 * time.Second)
		// Deletion logic
	}()

The deletion is done once the task was verified in Ethereum (so there is a time between the quorum is reached and the transaction is accepted in Ethereum). So I'm not sure if we should add an extra delay in the deletion

@Oppen
Copy link
Collaborator

Oppen commented Oct 3, 2024

The deletion is done once the task was verified in Ethereum (so there is a time between the quorum is reached and the transaction is accepted in Ethereum). So I'm not sure if we should add an extra delay in the deletion

That sounds to me like what we really want is a kind of queue (jargon for "an array") and a periodic task that checks if we're done with stuff.

EDIT: actually, I think it's simpler than that. We already have indices for each batch. Every once in a while (the appropriate time is probably around the time Ethereum takes to confirm our transactions) we check the oldest (lowest) index, find its Merkle root, and use that and the index to clean the dictionaries. This could be a long-lived goroutine started by main.

Copy link
Contributor

@MauroToscano MauroToscano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no docs on why a garbage collector is needed. This seems overengineered.

@uri-99 uri-99 marked this pull request as draft October 8, 2024 20:39
@uri-99 uri-99 marked this pull request as ready for review October 14, 2024 15:41
@@ -178,6 +178,8 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
telemetry: aggregatorTelemetry,
}

go aggregator.clearTasksFromMaps(garbageCollectorPeriod, garbageCollectorTasksAge)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like the right place to instantiate the task. I would do it at the call-site. Here we're just asking for a struct to be initialized and returned.

Copy link
Contributor

@uri-99 uri-99 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, with supervisor

Copy link
Contributor

@uri-99 uri-99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

biased self-approve

@@ -38,6 +39,9 @@ func main() {
}
}

const garbageCollectorPeriod = time.Second * 150 //TODO change to time.Day * 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should we change it? If we have to change it, it may be a config variable, so we do not need to change code

@@ -38,6 +39,9 @@ func main() {
}
}

const garbageCollectorPeriod = time.Second * 150 //TODO change to time.Day * 1
const garbageCollectorTasksAge = uint64(10) //TODO change to 2592000, 1 month of blocks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should we change it? If we have to change it, it may be a config variable, so we do not need to change code

@@ -38,6 +39,9 @@ func main() {
}
}

const garbageCollectorPeriod = time.Second * 150 //TODO change to time.Day * 1
const garbageCollectorTasksAge = uint64(10) //TODO change to 2592000, 1 month of blocks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Age does not tell much more. What does it mean?


// long-lived gorouting that periodically checks and removes old Tasks from stored Maps
func (agg *Aggregator) ClearTasksFromMaps(period time.Duration, blocksOld uint64) {
defer func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to document it. At least a brief explanation of how it works and its motivation

lastIdxDeleted := uint32(0)

for {
time.Sleep(period)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we sleep the function after execute the main task, this way we can know easily if the task is being executed correctly (even if it is not deleted anything yet, at least we now it is being executed)


// This function is a helper to get a task hash of aproximately nBlocksOld blocks ago
func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64) (*[32]byte, error) {
// r.ChainReader.ethClient.CallContext(context.Background(), &blockNumber, "eth_blockNumber")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe delete this comment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants