diff --git a/api/nakadi-event-bus-api.yaml b/api/nakadi-event-bus-api.yaml index d0e4586dc7..131f47018a 100644 --- a/api/nakadi-event-bus-api.yaml +++ b/api/nakadi-event-bus-api.yaml @@ -316,7 +316,10 @@ paths: incoming Events. Validation rules are evaluated in the order they are defined and the Event is **rejected** in the first case of failure. If the offending validation rule provides information about the violation it will be included in the `BatchItemResponse`. If the - `EventType` defines schema validation it will be performed at this moment. + `EventType` defines schema validation it will be performed at this moment. The size of each + Event will also be validated. The maximum size per Event is 999,000 bytes. We use the batch + input to measure the size of events, so unnecessary spaces, tabs, and carriage returns will + count towards the event size. 1. Once the validation succeeded, the content of the Event is updated according to the enrichment rules in the order the rules are defined in the `EventType`. No preexisting diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 56a9985240..2e1cced20c 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -8,7 +8,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; -import org.json.JSONObject; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,6 +53,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_REQUEST_TIMEOUT = 30000; private static final int KAFKA_BATCH_SIZE = 1048576; private static final long KAFKA_LINGER_MS = 0; + private static final long NAKADI_EVENT_MAX_BYTES = 1000000L; private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; @@ -73,7 +73,8 @@ public void setup() { DEFAULT_TOPIC_ROTATION, DEFAULT_COMMIT_TIMEOUT, NAKADI_POLL_TIMEOUT, - NAKADI_SEND_TIMEOUT); + NAKADI_SEND_TIMEOUT, + NAKADI_EVENT_MAX_BYTES); kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS); zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); kafkaHelper = new KafkaTestHelper(KAFKA_URL); @@ -133,7 +134,7 @@ public void whenDeleteTopicThenTopicIsDeleted() throws Exception { @Test(timeout = 10000) public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() throws Exception { final List items = new ArrayList<>(); - final JSONObject event = new JSONObject(); + final String event = "{}"; final String topicId = TestUtils.randomValidEventTypeName(); kafkaHelper.createTopic(topicId, ZOOKEEPER_URL); diff --git a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index 0970159cde..116e0f1a55 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -15,6 +15,7 @@ public class NakadiSettings { private final long defaultCommitTimeoutSeconds; private final long kafkaPollTimeoutMs; private final long kafkaSendTimeoutMs; + private final long eventMaxBytes; @Autowired public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount, @@ -24,7 +25,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs, @Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds, @Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs, - @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs) { + @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs, + @Value("${nakadi.event.max.bytes}") final long eventMaxBytes) { this.maxTopicPartitionCount = maxTopicPartitionCount; this.defaultTopicPartitionCount = defaultTopicPartitionCount; this.defaultTopicReplicaFactor = defaultTopicReplicaFactor; @@ -33,6 +35,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds; this.kafkaPollTimeoutMs = kafkaPollTimeoutMs; this.kafkaSendTimeoutMs = kafkaSendTimeoutMs; + this.eventMaxBytes = eventMaxBytes; } public int getDefaultTopicPartitionCount() { @@ -67,4 +70,8 @@ public long getKafkaSendTimeoutMs() { return kafkaSendTimeoutMs; } + public long getEventMaxBytes() { + return eventMaxBytes; + } + } diff --git a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java index 541ecc7fa3..f60c4ff06c 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.controller; -import org.json.JSONArray; import org.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,11 +81,8 @@ private ResponseEntity postEventInternal(final String eventTypeName, final Client client) { final long startingNanos = System.nanoTime(); try { - final JSONArray eventsAsJsonObjects = new JSONArray(eventsAsString); - - final int eventCount = eventsAsJsonObjects.length(); - final EventPublishResult result = publisher.publish(eventsAsJsonObjects, eventTypeName, client); - reportMetrics(eventTypeMetrics, result, eventsAsString, eventCount); + final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, client); + reportMetrics(eventTypeMetrics, result, eventsAsString, result.getResponses().size()); final ResponseEntity response = response(result); return response; diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index 4a64faf5d1..30e3993d95 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -1,19 +1,82 @@ package org.zalando.nakadi.domain; -import org.json.JSONArray; +import org.json.JSONException; import java.util.ArrayList; import java.util.List; public class BatchFactory { - public static List from(final JSONArray events) { - final List batch = new ArrayList<>(events.length()); - for (int i = 0; i < events.length(); i++) { - batch.add(new BatchItem(events.getJSONObject(i))); + public static List from(final String events) { + final List batch = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + int brackets = 0; + boolean insideQuote = false; + boolean escaped = false; + int start = 0; + final int length = events.length(); + int end = length - 1; + + while (isEmptyCharacter(events.charAt(start)) && start < end) { + start++; + } + while (isEmptyCharacter(events.charAt(end)) && end > start) { + end--; + } + if (!(events.charAt(start) == '[')) { + throw new JSONException(String.format("Unexpected character %s in position %d, expected '['", + events.charAt(start), start)); + } + start++; + if (!(events.charAt(end) == ']')) { + throw new JSONException(String.format("Unexpected character %s in position %d, expected ']'", + events.charAt(end), end)); + } + + for (int i = start; i < end; i++) { + if (!escaped && events.charAt(i) == '"') { + if (insideQuote) { + insideQuote = false; + } else { + insideQuote = true; + } + } + if (escaped) { + sb.append(events.charAt(i)); + escaped = false; + } else if (!escaped && events.charAt(i) == '\\') { + sb.append(events.charAt(i)); + escaped = true; + } else if (insideQuote) { + sb.append(events.charAt(i)); + } else { + if (events.charAt(i) == '{') { + brackets++; + } + if (events.charAt(i) == '}') { + brackets--; + } + if (!((brackets == 0) && ((events.charAt(i) == ',') + || isEmptyCharacter(events.charAt(i))))) { + sb.append(events.charAt(i)); + } + if (brackets == 0 && !isEmptyCharacter(events.charAt(i))) { + if (sb.length() > 0) { + batch.add(new BatchItem(sb.toString())); + } + sb = new StringBuilder(); + } + } + } + + if (sb.length() != 0) { + batch.add(new BatchItem(sb.toString())); } return batch; } + private static boolean isEmptyCharacter(final char c) { + return (c == ' ' || c == '\t' || c == '\n' || c == '\r'); + } } diff --git a/src/main/java/org/zalando/nakadi/domain/BatchItem.java b/src/main/java/org/zalando/nakadi/domain/BatchItem.java index f194b69ec3..2b55c02840 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchItem.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchItem.java @@ -3,6 +3,7 @@ import org.json.JSONObject; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; import java.util.Optional; public class BatchItem { @@ -10,12 +11,14 @@ public class BatchItem { private final JSONObject event; private String partition; private String brokerId; + private int eventSize; - public BatchItem(final JSONObject event) { + public BatchItem(final String event) { + this.event = new JSONObject(event); + this.eventSize = event.getBytes(StandardCharsets.UTF_8).length; this.response = new BatchItemResponse(); - this.event = event; - Optional.ofNullable(event.optJSONObject("metadata")) + Optional.ofNullable(this.event.optJSONObject("metadata")) .map(e -> e.optString("eid", null)) .ifPresent(this.response::setEid); } @@ -58,4 +61,8 @@ public void updateStatusAndDetail(final EventPublishingStatus publishingStatus, response.setPublishingStatus(publishingStatus); response.setDetail(detail); } + + public int getEventSize() { + return eventSize; + } } \ No newline at end of file diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 23a6cfc291..b3b647bc61 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -1,11 +1,11 @@ package org.zalando.nakadi.service; -import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.BatchFactory; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.BatchItemResponse; @@ -36,6 +36,8 @@ public class EventPublisher { private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); + private final NakadiSettings nakadiSettings; + private final TopicRepository topicRepository; private final EventTypeCache eventTypeCache; private final PartitionResolver partitionResolver; @@ -45,14 +47,16 @@ public class EventPublisher { public EventPublisher(final TopicRepository topicRepository, final EventTypeCache eventTypeCache, final PartitionResolver partitionResolver, - final Enrichment enrichment) { + final Enrichment enrichment, + final NakadiSettings nakadiSettings) { this.topicRepository = topicRepository; this.eventTypeCache = eventTypeCache; this.partitionResolver = partitionResolver; this.enrichment = enrichment; + this.nakadiSettings = nakadiSettings; } - public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client) + public EventPublishResult publish(final String events, final String eventTypeName, final Client client) throws NoSuchEventTypeException, InternalNakadiException { final EventType eventType = eventTypeCache.getEventType(eventTypeName); final List batch = BatchFactory.from(events); @@ -118,6 +122,7 @@ private void validate(final List batch, final EventType eventType) th item.setStep(EventPublishingStep.VALIDATING); try { validateSchema(item.getEvent(), eventType); + validateEventSize(item); } catch (final EventValidationException e) { item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage()); throw e; @@ -140,6 +145,13 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } } + private void validateEventSize(final BatchItem item) throws EventValidationException { + if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) { + throw new EventValidationException("Event too large: " + item.getEventSize() + + " bytes, max size is " + nakadiSettings.getEventMaxBytes() + " bytes"); + } + } + private EventPublishResult failed(final List batch) { return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch)); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5eb815be8d..4016aa989f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -78,6 +78,7 @@ nakadi: auth: plugin: factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory + event.max.bytes: 999000 --- spring: diff --git a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java index 87415e618f..7bbfcc5fd0 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java @@ -2,7 +2,7 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; -import org.json.JSONArray; +import org.json.JSONException; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -15,6 +15,8 @@ import org.zalando.nakadi.config.SecuritySettings; import org.zalando.nakadi.domain.BatchItemResponse; import org.zalando.nakadi.domain.EventPublishResult; +import org.zalando.nakadi.domain.EventPublishingStatus; +import org.zalando.nakadi.domain.EventPublishingStep; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.metrics.EventTypeMetricRegistry; @@ -86,12 +88,12 @@ public void setUp() throws Exception { @Test public void whenResultIsSubmittedThen200() throws Exception { - final EventPublishResult result = new EventPublishResult(SUBMITTED, null, null); + final EventPublishResult result = new EventPublishResult(SUBMITTED, null, submittedResponses(1)); Mockito .doReturn(result) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(status().isOk()) @@ -100,11 +102,12 @@ public void whenResultIsSubmittedThen200() throws Exception { @Test public void whenInvalidPostBodyThen400() throws Exception { - final String expectedPayload = "{\"type\":\"http://httpstatus.es/400\"," + - "\"title\":\"Bad Request\",\"status\":400," + - "\"detail\":\"A JSONArray text must start with '[' at 1 [character 2 line 1]\"}"; - postBatch(TOPIC, "invalid json array").andExpect(status().isBadRequest()) - .andExpect(content().string(expectedPayload)); + + Mockito.doThrow(new JSONException("Error")) + .when(publisher) + .publish(any(String.class), eq(TOPIC), any(Client.class)); + + postBatch(TOPIC, "invalid json array").andExpect(status().isBadRequest()); } @Test @@ -114,7 +117,7 @@ public void whenResultIsAbortedThen422() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(status().isUnprocessableEntity()) @@ -128,7 +131,7 @@ public void whenResultIsAbortedThen207() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(status().isMultiStatus()) @@ -140,7 +143,7 @@ public void whenEventTypeNotFoundThen404() throws Exception { Mockito .doThrow(NoSuchEventTypeException.class) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(content().contentType("application/problem+json")) @@ -149,7 +152,7 @@ public void whenEventTypeNotFoundThen404() throws Exception { @Test public void publishedEventsAreReportedPerEventType() throws Exception { - final EventPublishResult success = new EventPublishResult(SUBMITTED, null, null); + final EventPublishResult success = new EventPublishResult(SUBMITTED, null, submittedResponses(3)); Mockito .doReturn(success) .doReturn(success) @@ -178,6 +181,22 @@ private List responses() { return responses; } + private List submittedResponses(final int number) { + return responses(number, SUBMITTED, PUBLISHING); + } + + private List responses(final int number, final EventPublishingStatus status, + final EventPublishingStep step) { + final List responses = new ArrayList<>(); + for (int i = 0; i < number; i++) { + final BatchItemResponse response = new BatchItemResponse(); + response.setPublishingStatus(status); + response.setStep(step); + responses.add(response); + } + return responses; + } + private ResultActions postBatch(final String eventType, final String batch) throws Exception { final String url = "/event-types/" + eventType + "/events"; final MockHttpServletRequestBuilder requestBuilder = post(url) diff --git a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java index 48fe3561f8..6bf7dc861b 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java @@ -95,6 +95,7 @@ public class EventTypeControllerTest { private static final long TOPIC_RETENTION_TIME_MS = 150; private static final int NAKADI_SEND_TIMEOUT = 10000; private static final int NAKADI_POLL_TIMEOUT = 10000; + private static final long NAKADI_EVENT_MAX_BYTES = 1000000; private final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class); private final TopicRepository topicRepository = mock(TopicRepository.class); private final PartitionResolver partitionResolver = mock(PartitionResolver.class); @@ -124,7 +125,7 @@ public void init() throws Exception { final EventTypeOptionsValidator eventTypeOptionsValidator = new EventTypeOptionsValidator(TOPIC_RETENTION_MIN_MS, TOPIC_RETENTION_MAX_MS); final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, - NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT); + NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); final EventTypeController controller = new EventTypeController(eventTypeService, featureToggleService, eventTypeOptionsValidator, applicationService, nakadiSettings); diff --git a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java new file mode 100644 index 0000000000..010ab63f6b --- /dev/null +++ b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java @@ -0,0 +1,117 @@ +package org.zalando.nakadi.domain; + +import org.json.JSONException; +import org.junit.Test; + +import java.util.List; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; + +public class BatchFactoryTest { + + @Test + public void testOneEvent() { + final String events = "[{\"name\":\"MyEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(1, batch.size()); + assertEquals(18, batch.get(0).getEventSize()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + } + + @Test + public void testNoEvent() { + final String events = "[]"; + final List batch = BatchFactory.from(events); + assertEquals(0, batch.size()); + } + + @Test + public void testNoEventAndSpace() { + final String events = "[ ]"; + final List batch = BatchFactory.from(events); + assertEquals(0, batch.size()); + } + + @Test + public void testMultipleEvents() { + final String events = "[{\"name\":\"MyEvent\"},{\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals(18, batch.get(0).getEventSize()); + assertEquals(23, batch.get(1).getEventSize()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testNestedArrays() { + final String events = "[{\"name\":\"MyEvent\", \"array\":[{\"developer\": \"Ricardo\"}," + + "{\"developer\": \"Sergii\"},{\"field\":[\"hello\",\"world\"]}]},{\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals("{\"array\":[{\"developer\":\"Ricardo\"},{\"developer\":\"Sergii\"}," + + "{\"field\":[\"hello\",\"world\"]}],\"name\":\"MyEvent\"}", + batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testMalformedJSON() { + final String events = "[{\"hello\":\"world\",]"; + try { + BatchFactory.from(events); + fail(); + } catch (JSONException e) {} + } + + @Test + public void testEscapedQuotation() { + final String events = "[{\"hello\":\"wor\\\"ld\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(1, batch.size()); + assertEquals("{\"hello\":\"wor\\\"ld\"}", batch.get(0).getEvent().toString()); + } + + @Test + public void testEscapedBrackets() { + final String events = "[{\"hello\":\"wor\\\\}ld\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(1, batch.size()); + assertEquals("{\"hello\":\"wor\\\\}ld\"}", batch.get(0).getEvent().toString()); + } + + @Test + public void testEmptyEvent() { + final String events = "[{\"name\":\"MyEvent\"},,,,{\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testSpacesBetweenEvents() { + final String events = "[{\"name\":\"MyEvent\"}, {\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testEmptyCharactersAroundArray() { + final String events = "\t [{\"name\":\"MyEvent\"},{\"name\":\"MyOtherEvent\"}]\n\n"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + } + + @Test + public void testGarbageBetweenEvents() { + final String events = "[{\"name\":\"MyEvent\"},atb#{\"name\":\"MyOtherEvent\"}]"; + try { + BatchFactory.from(events); + fail(); + } catch (JSONException e) {} + } +} diff --git a/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java b/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java new file mode 100644 index 0000000000..1a39c5fc8b --- /dev/null +++ b/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java @@ -0,0 +1,14 @@ +package org.zalando.nakadi.domain; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BatchItemTest { + + @Test + public void testBatchItemSizeWithMultByteChar() { + final BatchItem item = new BatchItem("{ \"name\": \"香港\"} "); + assertEquals(20, item.getEventSize()); + } +} diff --git a/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java b/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java index e827a704ff..d03d687a2a 100644 --- a/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java +++ b/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java @@ -11,7 +11,7 @@ import static org.zalando.nakadi.domain.EventCategory.BUSINESS; import static org.zalando.nakadi.domain.EventCategory.DATA; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; -import static org.zalando.nakadi.utils.TestUtils.createBatch; +import static org.zalando.nakadi.utils.TestUtils.createBatchItem; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -59,7 +59,7 @@ public void enrichAppliesStrategies() throws Exception { final EventType eventType = buildDefaultEventType(); eventType.getEnrichmentStrategies().add(EnrichmentStrategyDescriptor.METADATA_ENRICHMENT); final JSONObject event = new JSONObject(); - final BatchItem batchItem = createBatch(event); + final BatchItem batchItem = createBatchItem(event); final EnrichmentStrategy strategy = mock(EnrichmentStrategy.class); Mockito diff --git a/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java b/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java index 33f85fb71a..768b9547c2 100644 --- a/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java +++ b/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java @@ -7,10 +7,11 @@ import org.joda.time.DateTimeUtils; import org.json.JSONObject; import org.junit.Test; +import org.zalando.nakadi.utils.TestUtils; import static org.zalando.nakadi.utils.TestUtils.buildBusinessEvent; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; -import static org.zalando.nakadi.utils.TestUtils.createBatch; +import static org.zalando.nakadi.utils.TestUtils.createBatchItem; import static org.zalando.nakadi.utils.TestUtils.randomString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.isEmptyString; @@ -23,17 +24,19 @@ public class MetadataEnrichmentStrategyTest { public void setReceivedAtWithSystemTimeInUTC() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batch = TestUtils.createBatchItem(event); assertThat(event.getJSONObject("metadata").optString("received_at"), isEmptyString()); try { DateTimeUtils.setCurrentMillisFixed(0); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batch, eventType); } finally { DateTimeUtils.setCurrentMillisSystem(); } - assertThat(event.getJSONObject("metadata").getString("received_at"), equalTo("1970-01-01T00:00:00.000Z")); + assertThat(batch.getEvent().getJSONObject("metadata").getString("received_at"), + equalTo("1970-01-01T00:00:00.000Z")); } @Test(expected = EnrichmentException.class) @@ -43,45 +46,48 @@ public void throwsExceptionIfPathNotPresent() throws Exception { event.remove("metadata"); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(TestUtils.createBatchItem(event), eventType); } @Test public void setEventTypeName() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batch = TestUtils.createBatchItem(event); assertThat(event.getJSONObject("metadata").optString("event_type"), isEmptyString()); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batch, eventType); - assertThat(event.getJSONObject("metadata").getString("event_type"), equalTo(eventType.getName())); + assertThat(batch.getEvent().getJSONObject("metadata").getString("event_type"), equalTo(eventType.getName())); } @Test public void setEventTypeSchemaVersion() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batchItem = createBatchItem(event); - assertThat(event.getJSONObject("metadata").optString("version"), isEmptyString()); + assertThat(batchItem.getEvent().getJSONObject("metadata").optString("version"), isEmptyString()); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batchItem, eventType); - assertThat(event.getJSONObject("metadata").getString("version"), equalTo("1.0.0")); + assertThat(batchItem.getEvent().getJSONObject("metadata").getString("version"), equalTo("1.0.0")); } @Test public void setFlowId() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batch = createBatchItem(event); assertThat(event.getJSONObject("metadata").optString("flow_id"), isEmptyString()); final String flowId = randomString(); FlowIdUtils.push(flowId); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batch, eventType); - assertThat(event.getJSONObject("metadata").getString("flow_id"), equalTo(flowId)); + assertThat(batch.getEvent().getJSONObject("metadata").getString("flow_id"), equalTo(flowId)); } @Test @@ -89,11 +95,11 @@ public void setPartition() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); final String partition = randomString(); - final BatchItem batch = createBatch(event); + final BatchItem batch = createBatchItem(event); batch.setPartition(partition); strategy.enrich(batch, eventType); - assertThat(event.getJSONObject("metadata").getString("partition"), equalTo(partition)); + assertThat(batch.getEvent().getJSONObject("metadata").getString("partition"), equalTo(partition)); } } \ No newline at end of file diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index 67c9a6cbf4..eb7d0e3f74 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -13,7 +13,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; -import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -216,7 +215,7 @@ public void invalidateInvalidCursors() throws NakadiException { @Test public void whenPostEventTimesOutThenUpdateItemStatus() throws Exception { - final BatchItem item = new BatchItem(new JSONObject()); + final BatchItem item = new BatchItem("{}"); item.setPartition("1"); final List batch = new ArrayList<>(); batch.add(item); @@ -240,7 +239,7 @@ public void whenPostEventTimesOutThenUpdateItemStatus() throws Exception { @Test public void whenPostEventOverflowsBufferThenUpdateItemStatus() throws Exception { - final BatchItem item = new BatchItem(new JSONObject()); + final BatchItem item = new BatchItem("{}"); item.setPartition("1"); final List batch = new ArrayList<>(); batch.add(item); @@ -265,9 +264,9 @@ public void whenPostEventOverflowsBufferThenUpdateItemStatus() throws Exception @Test public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException() throws Exception { - final BatchItem firstItem = new BatchItem(new JSONObject()); + final BatchItem firstItem = new BatchItem("{}"); firstItem.setPartition("1"); - final BatchItem secondItem = new BatchItem(new JSONObject()); + final BatchItem secondItem = new BatchItem("{}"); secondItem.setPartition("2"); final List batch = ImmutableList.of(firstItem, secondItem); @@ -391,7 +390,7 @@ public void whenKafkaPublishTimeoutThenCircuitIsOpened() throws Exception { final List batches = new LinkedList<>(); for (int i = 0; i < 1000; i++) { try { - final BatchItem batchItem = new BatchItem(new JSONObject()); + final BatchItem batchItem = new BatchItem("{}"); batchItem.setPartition("1"); batches.add(batchItem); kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), ImmutableList.of(batchItem)); diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 10358f398a..56c0c4d027 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -5,6 +5,8 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.BatchItemResponse; import org.zalando.nakadi.domain.EventPublishResult; import org.zalando.nakadi.domain.EventPublishingStatus; @@ -35,6 +37,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.isEmptyString; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -43,8 +46,10 @@ import static org.mockito.Mockito.verify; import static org.zalando.nakadi.utils.TestUtils.buildBusinessEvent; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; -import static org.zalando.nakadi.utils.TestUtils.createBatch; +import static org.zalando.nakadi.utils.TestUtils.createBatchItem; import static org.zalando.nakadi.utils.TestUtils.randomString; +import static org.zalando.nakadi.utils.TestUtils.randomStringOfLength; +import static org.zalando.nakadi.utils.TestUtils.randomValidStringOfLength; public class EventPublisherTest { @@ -52,11 +57,19 @@ public class EventPublisherTest { public static final String CLIENT_ID = "clientId"; private static final Client FULL_ACCESS_CLIENT = new FullAccessClient(CLIENT_ID); + private static final int NAKADI_SEND_TIMEOUT = 10000; + private static final int NAKADI_POLL_TIMEOUT = 10000; + private static final int NAKADI_EVENT_MAX_BYTES = 900; + private static final long TOPIC_RETENTION_TIME_MS = 150; + private final TopicRepository topicRepository = mock(TopicRepository.class); private final EventTypeCache cache = mock(EventTypeCache.class); private final PartitionResolver partitionResolver = mock(PartitionResolver.class); private final Enrichment enrichment = mock(Enrichment.class); - private final EventPublisher publisher = new EventPublisher(topicRepository, cache, partitionResolver, enrichment); + private final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, + TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); + private final EventPublisher publisher = new EventPublisher(topicRepository, cache, partitionResolver, + enrichment, nakadiSettings); @Test public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception { @@ -64,9 +77,9 @@ public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception { final JSONArray batch = buildDefaultBatch(1); final JSONObject event = batch.getJSONObject(0); - mockSuccessfulValidation(eventType, event); + mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); verify(topicRepository, times(1)).syncPostBatch(eq(eventType.getTopic()), any()); @@ -78,9 +91,9 @@ public void whenEventHasEidThenSetItInTheResponse() throws Exception { final JSONObject event = buildBusinessEvent(); final JSONArray batch = new JSONArray(Arrays.asList(event)); - mockSuccessfulValidation(eventType, event); + mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getResponses().get(0).getEid(), equalTo(event.getJSONObject("metadata").optString("eid"))); verify(topicRepository, times(1)).syncPostBatch(eq(eventType.getTopic()), any()); @@ -92,12 +105,12 @@ public void whenValidationFailsThenResultIsAborted() throws Exception { final JSONArray batch = buildDefaultBatch(1); final JSONObject event = batch.getJSONObject(0); - mockFaultValidation(eventType, event, "error"); + mockFaultValidation(eventType, "error"); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); - verify(enrichment, times(0)).enrich(createBatch(event), eventType); + verify(enrichment, times(0)).enrich(createBatchItem(event), eventType); verify(partitionResolver, times(0)).resolvePartition(eventType, event); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } @@ -108,9 +121,9 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception final JSONArray batch = buildDefaultBatch(2); final JSONObject event = batch.getJSONObject(0); - mockFaultValidation(eventType, event, "error"); + mockFaultValidation(eventType, "error"); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -128,15 +141,158 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception } @Test - public void whenPartitionFailsThenResultIsAborted() throws Exception { + public void whenEventIsTooLargeThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildLargeBatch(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + verify(enrichment, times(0)).enrich(any(), any()); + verify(partitionResolver, times(0)).resolvePartition(any(), any()); + verify(topicRepository, times(0)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONArray batch = buildDefaultBatch(1); - final JSONObject event = batch.getJSONObject(0); + final JSONObject largeEvent = new JSONObject(); + largeEvent.put("foo", randomStringOfLength(10000)); + batch.put(largeEvent); + final JSONObject smallEvent = new JSONObject(); + smallEvent.put("foo", randomString()); + batch.put(smallEvent); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + + final BatchItemResponse firstResponse = result.getResponses().get(0); + assertThat(firstResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); + assertThat(firstResponse.getDetail(), is(isEmptyString())); + + final BatchItemResponse secondResponse = result.getResponses().get(1); + assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); + assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); + assertThat(secondResponse.getDetail(), startsWith("Event too large")); + + final BatchItemResponse thirdResponse = result.getResponses().get(2); + assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.NONE)); + assertThat(thirdResponse.getDetail(), is(isEmptyString())); + } + + @Test + public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildDefaultBatch(1); + final JSONObject largeEvent = new JSONObject(); + largeEvent.put("foo", randomStringOfLength(880)); + batch.put(largeEvent); + final JSONObject smallEvent = new JSONObject(); + smallEvent.put("foo", randomString()); + batch.put(smallEvent); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + + final BatchItemResponse firstResponse = result.getResponses().get(0); + assertThat(firstResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); + assertThat(firstResponse.getDetail(), is(isEmptyString())); + + final BatchItemResponse secondResponse = result.getResponses().get(1); + assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); + assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); + assertThat(secondResponse.getDetail(), startsWith("Event too large")); + + final BatchItemResponse thirdResponse = result.getResponses().get(2); + assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.NONE)); + assertThat(thirdResponse.getDetail(), is(isEmptyString())); + } + + @Test + public void whenEventIsExactlyMaxSizeThenResultIsSuccess() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildMaxSizeBatch(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(1)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsOneByteOverMaxSizeThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildOneByteTooLargeBatch(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + verify(enrichment, times(0)).enrich(any(), any()); + verify(partitionResolver, times(0)).resolvePartition(any(), any()); + verify(topicRepository, times(0)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsOneByteOverMaxSizeWithMultiByteCharsThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildOneByteTooLargeBatchMultiByte(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + verify(enrichment, times(0)).enrich(any(), any()); + verify(partitionResolver, times(0)).resolvePartition(any(), any()); + verify(topicRepository, times(0)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsExactlyMaxSizeWithMultiByteCharsThenResultIsSuccess() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildMaxSizeBatchMultiByte(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(1)).syncPostBatch(any(), any()); + } + + @Test + public void whenPartitionFailsThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final List batch = new ArrayList<>(); + batch.add(createBatchItem(buildDefaultBatch(1).getJSONObject(0))); + final JSONObject event = batch.get(0).getEvent(); - mockSuccessfulValidation(eventType, event); - mockFaultPartition(eventType, event); + mockSuccessfulValidation(eventType); + mockFaultPartition(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(createStringFromBatchItems(batch), + eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); } @@ -144,13 +300,16 @@ public void whenPartitionFailsThenResultIsAborted() throws Exception { @Test public void whenPartitionFailsThenSubsequentItemsAreAborted() throws Exception { final EventType eventType = buildDefaultEventType(); - final JSONArray batch = buildDefaultBatch(2); - final JSONObject event = batch.getJSONObject(0); + final JSONArray array = buildDefaultBatch(2); + final List batch = new ArrayList<>(); + batch.add(createBatchItem(array.getJSONObject(0))); + batch.add(createBatchItem(array.getJSONObject(1))); mockSuccessfulValidation(eventType); - mockFaultPartition(eventType, event); + mockFaultPartition(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(createStringFromBatchItems(batch), + eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -176,7 +335,7 @@ public void whenPublishingFailsThenResultIsFailed() throws Exception { mockSuccessfulValidation(eventType); mockFailedPublishing(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.FAILED)); verify(topicRepository, times(1)).syncPostBatch(any(), any()); @@ -188,14 +347,14 @@ public void whenEnrichmentFailsThenResultIsAborted() throws Exception { final JSONArray batch = buildDefaultBatch(1); final JSONObject event = batch.getJSONObject(0); - mockSuccessfulValidation(eventType, event); + mockSuccessfulValidation(eventType); mockFaultEnrichment(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(cache, times(1)).getValidator(eventType.getName()); - verify(partitionResolver, times(1)).resolvePartition(eventType, event); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); verify(enrichment, times(1)).enrich(any(), any()); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } @@ -208,7 +367,7 @@ public void whenEnrichmentFailsThenSubsequentItemsAreAborted() throws Exception mockSuccessfulValidation(eventType); mockFaultEnrichment(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -230,7 +389,7 @@ public void testScopeWrite() throws Exception { final EventType eventType = EventTypeTestBuilder.builder().writeScopes(SCOPE_WRITE).build(); Mockito.when(cache.getEventType(eventType.getName())).thenReturn(eventType); mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(buildDefaultBatch(0), eventType.getName(), + final EventPublishResult result = publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), new NakadiClient(CLIENT_ID, SCOPE_WRITE)); Assert.assertEquals(result.getStatus(), EventPublishingStatus.SUBMITTED); @@ -240,7 +399,7 @@ public void testScopeWrite() throws Exception { public void testNoScopeWrite() throws Exception { final EventType eventType = EventTypeTestBuilder.builder().writeScopes(SCOPE_WRITE).build(); Mockito.when(cache.getEventType(eventType.getName())).thenReturn(eventType); - publisher.publish(buildDefaultBatch(0), eventType.getName(), + publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), new NakadiClient(CLIENT_ID, Collections.emptySet())); } @@ -251,11 +410,18 @@ private void mockFailedPublishing() throws Exception { .syncPostBatch(any(), any()); } - private void mockFaultPartition(final EventType eventType, final JSONObject event) throws PartitioningException { + private void mockFaultPartition(final EventType eventType, final BatchItem item) throws PartitioningException { Mockito .doThrow(new PartitioningException("partition error")) .when(partitionResolver) - .resolvePartition(eventType, event); + .resolvePartition(eventType, item.getEvent()); + } + + private void mockFaultPartition() throws PartitioningException { + Mockito + .doThrow(new PartitioningException("partition error")) + .when(partitionResolver) + .resolvePartition(any(), any()); } private void mockFaultEnrichment() throws EnrichmentException { @@ -285,6 +451,25 @@ private void mockFaultValidation(final EventType eventType, final JSONObject eve .validate(event); } + private void mockFaultValidation(final EventType eventType, final String error) throws Exception { + final EventTypeValidator faultyValidator = mock(EventTypeValidator.class); + + Mockito + .doReturn(eventType) + .when(cache) + .getEventType(eventType.getName()); + + Mockito + .doReturn(faultyValidator) + .when(cache) + .getValidator(eventType.getName()); + + Mockito + .doReturn(Optional.of(new ValidationError(error))) + .when(faultyValidator) + .validate(any()); + } + private void mockSuccessfulValidation(final EventType eventType, final JSONObject event) throws Exception { final EventTypeValidator truthyValidator = mock(EventTypeValidator.class); @@ -324,14 +509,61 @@ private void mockSuccessfulValidation(final EventType eventType) throws Exceptio } private JSONArray buildDefaultBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, 50); + } + + private JSONArray buildLargeBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, NAKADI_EVENT_MAX_BYTES + 100); + } + + private JSONArray buildMaxSizeBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, NAKADI_EVENT_MAX_BYTES); + } + + private JSONArray buildOneByteTooLargeBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, NAKADI_EVENT_MAX_BYTES + 1); + } + + private JSONArray buildMaxSizeBatchMultiByte(final int numberOfEvents) { + return buildBatchMultiByte(numberOfEvents, NAKADI_EVENT_MAX_BYTES); + } + + private JSONArray buildOneByteTooLargeBatchMultiByte(final int numberOfEvents) { + return buildBatchMultiByte(numberOfEvents, NAKADI_EVENT_MAX_BYTES + 1); + } + + private JSONArray buildBatchMultiByte(final int numberOfEvents, final int length) { final List events = new ArrayList<>(); + final int valueLength = length - 16; // each character 2 lines below is 3 bytes + for (int i = 0; i < numberOfEvents; i++) { + final JSONObject event = new JSONObject(); + event.put("foo", randomValidStringOfLength(valueLength) + "温泉"); + events.add(event); + } + + return new JSONArray(events); + } + private JSONArray buildBatch(final int numberOfEvents, final int length) { + final List events = new ArrayList<>(); + final int valueLength = length - 10; // the brackets, key, and quotation marks take 10 characters for (int i = 0; i < numberOfEvents; i++) { final JSONObject event = new JSONObject(); - event.put("foo", randomString()); + event.put("foo", randomValidStringOfLength(valueLength)); events.add(event); } return new JSONArray(events); } + + private String createStringFromBatchItems(final List batch) { + final StringBuilder sb = new StringBuilder(); + sb.append("["); + for (BatchItem item:batch) { + sb.append(item.getEvent().toString()); + sb.append(","); + } + sb.setCharAt(sb.length() - 1, ']'); + return sb.toString(); + } } diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 0630c96fa8..0755b03159 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -54,7 +54,7 @@ public static String randomTextString() { } public static String randomString() { - final int length = RANDOM.nextInt(500); + final int length = RANDOM.nextInt(100); String s = ""; @@ -79,6 +79,26 @@ public static String randomString(final String validChars) { } + public static String randomStringOfLength(final int length) { + final StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < length; i++) { + sb.append((char) RANDOM.nextInt(128)); + } + + return sb.toString(); + } + + public static String randomValidStringOfLength(final int length) { + final StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < length; i++) { + sb.append(VALID_EVENT_TYPE_NAME_CHARS.charAt(RANDOM.nextInt(VALID_EVENT_TYPE_NAME_CHARS.length()))); + } + + return sb.toString(); + } + public static String randomValidEventTypeName() { return String.format("%s.%s", randomString(VALID_EVENT_TYPE_NAME_CHARS), randomString(VALID_EVENT_TYPE_NAME_CHARS)); @@ -157,7 +177,11 @@ public static void waitFor(final Runnable runnable, final long timeoutMs, final .withWaitBetweenEachTry(intervalMs)); } - public static BatchItem createBatch(final JSONObject event) { + public static BatchItem createBatchItem(final JSONObject event) { + return new BatchItem(event.toString()); + } + + public static BatchItem createBatchItem(final String event) { return new BatchItem(event); }