Skip to content

Commit

Permalink
Protect against doing requests for snapshots that are over 400KB. (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
spangaer and Thejipppp authored Oct 17, 2024
1 parent 014ea8d commit c5a029e
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ trait DynamoDBSnapshotRequests extends DynamoDBRequests {

val toUnit: Any => Unit = _ => ()

private def itemSize(partitionKey: String, serializedSnapshot: Array[Byte]) =
DynamoFixedByteSize + partitionKey.length + serializedSnapshot.size

def delete(metadata: SnapshotMetadata): Future[Unit] = {
val request = new DeleteItemRequest()
.withTableName(Table)
Expand Down Expand Up @@ -151,7 +154,8 @@ trait DynamoDBSnapshotRequests extends DynamoDBRequests {
private def toSnapshotItem(persistenceId: String, sequenceNr: Long, timestamp: Long, snapshot: Any): Future[Item] = {
val item: Item = new JHMap

item.put(Key, S(messagePartitionKey(persistenceId)))
val partitionKey = messagePartitionKey(persistenceId)
item.put(Key, S(partitionKey))
item.put(SequenceNr, N(sequenceNr))
item.put(Timestamp, N(timestamp))
val snapshotData = snapshot.asInstanceOf[AnyRef]
Expand All @@ -171,6 +175,11 @@ trait DynamoDBSnapshotRequests extends DynamoDBRequests {
}

fut.map { data =>
val size = itemSize(partitionKey, data)

if (size > MaxItemSize)
throw new DynamoDBSnapshotRejection(s"MaxItemSize exceeded: $size > $MaxItemSize")

item.put(PayloadData, B(data))
if (manifest.nonEmpty) {
item.put(SerializerManifest, S(manifest))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import com.typesafe.config.Config

import scala.concurrent.Future

class DynamoDBSnapshotRejection(message: String, cause: Throwable = null) extends RuntimeException(message, cause)

class DynamoDBSnapshotStore(config: Config) extends SnapshotStore with DynamoDBSnapshotRequests with ActorLogging {
val journalSettings = new DynamoDBSnapshotConfig(config)
val dynamo = dynamoClient(context.system, journalSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,22 @@ package object snapshot {
val SerializerId = "ser_id"
val SerializerManifest = "ser_manifest"
val PayloadData = "pay_data"

/**
* Returns (a slightly overestimated) size of the fixed fields in a snapshot DynamoDB record.
*
* Assumes the maximum of 21 bytes for a number.
*
* Sources
* https://zaccharles.medium.com/calculating-a-dynamodb-items-size-and-consumed-capacity-d1728942eb7c
* https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/CapacityUnitCalculations.html
*/
val DynamoFixedByteSize =
Key.length() + // + partitionKey.size
SequenceNr.length() + 21 +
Timestamp.length() + 21 +
Payload.length() + // + payload.size
100 + // Standard 100 bytes overhead
100 // Safety factor for enabling extra features

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.dynamodb.snapshot

import com.typesafe.config.ConfigFactory
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit._
import org.apache.pekko.persistence._
import org.apache.pekko.persistence.SnapshotProtocol._
import org.apache.pekko.persistence.dynamodb.IntegSpec
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

class SnapshotTooBigSpec extends TestKit(ActorSystem("SnapshotTooBigSpec"))
with AnyWordSpecLike
with BeforeAndAfterAll
with BeforeAndAfterEach
with Matchers
with IntegSpec
with DynamoDBUtils {

override def beforeAll(): Unit = {
super.beforeAll()
this.ensureSnapshotTableExists()
}

override protected def beforeEach(): Unit = {
super.beforeEach()
senderProbe = TestProbe()
}

override def afterAll(): Unit = {
super.afterAll()
client.shutdown()
}

private var senderProbe: TestProbe = _
val persistenceId = "SnapshotTooBigSpec"
val snapshotStore = Persistence(system).snapshotStoreFor("")

"DynamoDB snapshot too big spec" must {

"1 reject a snapshot that is over 400 KB compressed." in {
// Expect 1 MB of random data to exceed 400KB compressed.
val bytes = new Array[Byte](1 << 20)
scala.util.Random.nextBytes(bytes)
val metadata = SnapshotMetadata.apply(persistenceId, 1, 0)
snapshotStore.tell(SaveSnapshot(metadata, bytes), senderProbe.ref)
val rej = senderProbe.expectMsgType[SaveSnapshotFailure]
rej.cause shouldBe a[DynamoDBSnapshotRejection]
rej.cause.getMessage().startsWith("MaxItemSize exceeded") shouldBe true
}
}
}

0 comments on commit c5a029e

Please sign in to comment.