Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #819 from zalando/ARUHA-1442
Browse files Browse the repository at this point in the history
ARUHA-1442 Limit memory consumption of subscription streamer to 50 Mb
  • Loading branch information
antban authored Jan 15, 2018
2 parents 184d8a4 + 0ddb7ed commit 30622a7
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 32 deletions.
2 changes: 2 additions & 0 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2667,6 +2667,8 @@ parameters:
is reached, the messages are immediately flushed to the client and batch flush timer is reset.
* If 0 or undefined, will assume 30 seconds.
* Value is treated as a recommendation. Nakadi may flush chunks with a smaller timeout.
type: number
format: int32
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.response.Header;
import com.jayway.restassured.response.Response;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
Expand All @@ -15,15 +17,18 @@
import org.springframework.http.HttpStatus;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.EventTypeStatistics;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.repository.kafka.KafkaTestHelper;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.utils.EventTypeTestBuilder;
import org.zalando.nakadi.utils.TestUtils;
import org.zalando.nakadi.view.Cursor;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;

import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
Expand All @@ -34,9 +39,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.jayway.restassured.RestAssured.given;
import static java.text.MessageFormat.format;
Expand Down Expand Up @@ -65,7 +70,7 @@ public class EventStreamReadingAT extends BaseAT {
private List<Cursor> kafkaInitialNextOffsets;

@BeforeClass
public static void setupClass() throws JsonProcessingException, NoSuchEventTypeException {
public static void setupClass() throws JsonProcessingException {
eventType = EventTypeTestBuilder.builder()
.defaultStatistic(new EventTypeStatistics(PARTITIONS_NUM, PARTITIONS_NUM))
.build();
Expand All @@ -76,7 +81,7 @@ public static void setupClass() throws JsonProcessingException, NoSuchEventTypeE
}

@Before
public void setUp() throws InterruptedException, JsonProcessingException {
public void setUp() throws JsonProcessingException {
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
initialCursors = kafkaHelper.getOffsetsToReadFromLatest(topicName);
kafkaInitialNextOffsets = kafkaHelper.getNextOffsets(topicName);
Expand All @@ -86,7 +91,7 @@ public void setUp() throws InterruptedException, JsonProcessingException {
@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenPushFewEventsAndReadThenGetEventsInStream()
throws ExecutionException, InterruptedException, JsonProcessingException {
throws ExecutionException, InterruptedException {

// ARRANGE //
// push events to one of the partitions
Expand Down Expand Up @@ -127,7 +132,7 @@ public void whenPushFewEventsAndReadThenGetEventsInStream()

@Test(timeout = 10000)
public void whenAcceptEncodingGzipReceiveCompressedStream()
throws ExecutionException, InterruptedException, JsonProcessingException {
throws ExecutionException, InterruptedException {

// ARRANGE //
// push events to one of the partitions
Expand All @@ -152,7 +157,7 @@ public void whenAcceptEncodingGzipReceiveCompressedStream()
@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenPushedAmountOfEventsMoreThanBatchSizeAndReadThenGetEventsInMultipleBatches()
throws ExecutionException, InterruptedException, JsonProcessingException {
throws ExecutionException, InterruptedException {

// ARRANGE //
// push events to one of the partitions so that they don't fit into one branch
Expand Down Expand Up @@ -203,8 +208,7 @@ public void whenPushedAmountOfEventsMoreThanBatchSizeAndReadThenGetEventsInMulti

@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenReadFromTheEndThenLatestOffsetsInStream()
throws ExecutionException, InterruptedException, JsonProcessingException {
public void whenReadFromTheEndThenLatestOffsetsInStream() {

// ACT //
// just stream without X-nakadi-cursors; that should make nakadi to read from the very end
Expand Down Expand Up @@ -237,8 +241,7 @@ public void whenReadFromTheEndThenLatestOffsetsInStream()

@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenReachKeepAliveLimitThenStreamIsClosed()
throws ExecutionException, InterruptedException, JsonProcessingException {
public void whenReachKeepAliveLimitThenStreamIsClosed() {
// ACT //
final int keepAliveLimit = 3;
final Response response = given()
Expand Down Expand Up @@ -362,8 +365,7 @@ private Response readEvents() {

@Ignore
@Test(timeout = 10000)
public void whenExceedMaxConsumersNumThen429() throws IOException, InterruptedException, ExecutionException,
TimeoutException {
public void whenExceedMaxConsumersNumThen429() throws IOException, InterruptedException, ExecutionException {
final String etName = NakadiTestUtils.createEventType().getName();

// try to create 8 consuming connections
Expand Down Expand Up @@ -458,6 +460,43 @@ public void whenReadEventsConsumerIsBlocked() throws Exception {
}
}

@Test(timeout = 10000)
public void whenMemoryOverflowEventsDumped() throws IOException {
// Create event type
final EventType loadEt = EventTypeTestBuilder.builder()
.defaultStatistic(new EventTypeStatistics(PARTITIONS_NUM, PARTITIONS_NUM))
.build();
NakadiTestUtils.createEventTypeInNakadi(loadEt);

// Publish events to event type, that are not fitting memory
final String evt = "{\"foo\":\"barbarbar\"}";
final int eventCount = 2 * (10000 / evt.length());
NakadiTestUtils.publishEvents(loadEt.getName(), eventCount, i -> evt);

// Configure streaming so it will:(more than 10s and batch_limit
// - definitely wait for more than test timeout (10s)
// - collect batch, which size is greater than events published to this event type
final String url = RestAssured.baseURI + ":" + RestAssured.port + createStreamEndpointUrl(loadEt.getName())
+ "?batch_limit=" + (10 * eventCount)
+ "&stream_limit=" + (10 * eventCount)
+ "&batch_flush_timeout=11"
+ "&stream_timeout=11";
final HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
// Start from the begin.
connection.setRequestProperty("X-Nakadi-Cursors",
"[" + IntStream.range(0, PARTITIONS_NUM)
.mapToObj(i -> "{\"partition\": \"" + i + "\",\"offset\":\"begin\"}")
.collect(Collectors.joining(",")) + "]");
Assert.assertEquals(HttpServletResponse.SC_OK, connection.getResponseCode());

final InputStream inputStream = connection.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

final String line = reader.readLine();
Assert.assertNotNull(line);
// If we read at least one line, than it means, that we were able to read data before test timeout reached.
}


private static String createStreamEndpointUrl(final String eventType) {
return format("/event-types/{0}/events", eventType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class EventStreamController {
private final MetricRegistry streamMetrics;
private final AuthorizationValidator authorizationValidator;
private final EventTypeChangeListener eventTypeChangeListener;
private final Long maxMemoryUsageBytes;

@Autowired
public EventStreamController(final EventTypeRepository eventTypeRepository,
Expand All @@ -111,7 +113,8 @@ public EventStreamController(final EventTypeRepository eventTypeRepository,
final FeatureToggleService featureToggleService,
final CursorConverter cursorConverter,
final AuthorizationValidator authorizationValidator,
final EventTypeChangeListener eventTypeChangeListener) {
final EventTypeChangeListener eventTypeChangeListener,
@Value("${nakadi.stream.maxStreamMemoryBytes}") final Long maxMemoryUsageBytes) {
this.eventTypeRepository = eventTypeRepository;
this.timelineService = timelineService;
this.jsonMapper = jsonMapper;
Expand All @@ -125,6 +128,7 @@ public EventStreamController(final EventTypeRepository eventTypeRepository,
this.cursorConverter = cursorConverter;
this.authorizationValidator = authorizationValidator;
this.eventTypeChangeListener = eventTypeChangeListener;
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
}

@VisibleForTesting
Expand Down Expand Up @@ -235,6 +239,7 @@ public StreamingResponseBody streamEvents(
.withEtName(eventTypeName)
.withConsumingAppId(client.getClientId())
.withCursors(getStreamingStart(eventType, cursorsStr))
.withMaxMemoryUsageBytes(maxMemoryUsageBytes)
.build();

// acquire connection slots to limit the number of simultaneous connections from one client
Expand Down
37 changes: 30 additions & 7 deletions src/main/java/org/zalando/nakadi/service/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -76,6 +77,7 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
final Map<String, Long> batchStartTimes = createMapWithPartitionKeys(partition -> start);
final List<ConsumedEvent> consumedEvents = new LinkedList<>();
long lastKpiEventSent = System.currentTimeMillis();
long bytesInMemory = 0;

while (connectionReady.get() &&
!blacklistService.isConsumptionBlocked(config.getEtName(), config.getConsumingAppId())) {
Expand All @@ -99,6 +101,7 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
// put message to batch
currentBatches.get(event.getPosition().getPartition()).add(event.getEvent());
messagesRead++;
bytesInMemory += event.getEvent().length;

// if we read the message - reset keep alive counter for this partition
keepAliveInARow.put(event.getPosition().getPartition(), 0);
Expand All @@ -109,19 +112,35 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
final long timeSinceBatchStart = currentTimeMillis() - batchStartTimes.get(partition);
if (config.getBatchTimeout() * 1000 <= timeSinceBatchStart
|| currentBatches.get(partition).size() >= config.getBatchLimit()) {

sendBatch(latestOffsets.get(partition), currentBatches.get(partition));

// if we hit keep alive count limit - close the stream
if (currentBatches.get(partition).size() == 0) {
final List<byte[]> eventsToSend = currentBatches.get(partition);
sendBatch(latestOffsets.get(partition), eventsToSend);

if (!eventsToSend.isEmpty()) {
bytesInMemory -= eventsToSend.stream().mapToLong(v -> v.length).sum();
eventsToSend.clear();
} else {
// if we hit keep alive count limit - close the stream
keepAliveInARow.put(partition, keepAliveInARow.get(partition) + 1);
}

// init new batch for partition
currentBatches.get(partition).clear();
batchStartTimes.put(partition, currentTimeMillis());
}
}
// Dump some data that is exceeding memory limits
while (isMemoryLimitReached(bytesInMemory)) {
final Map.Entry<String, List<byte[]>> heaviestPartition = currentBatches.entrySet().stream()
.max(Comparator.comparing(
entry -> entry.getValue().stream().mapToLong(event -> event.length).sum()))
.get();
sendBatch(latestOffsets.get(heaviestPartition.getKey()), heaviestPartition.getValue());
final long freed = heaviestPartition.getValue().stream().mapToLong(v -> v.length).sum();
LOG.warn("Memory limit reached for event type {}: {} bytes. Freed: {} bytes, {} messages",
config.getEtName(), bytesInMemory, freed, heaviestPartition.getValue().size());
bytesInMemory -= freed;
// Init new batch for subscription
heaviestPartition.getValue().clear();
batchStartTimes.put(heaviestPartition.getKey(), currentTimeMillis());
}

if (lastKpiEventSent + kpiFrequencyMs < System.currentTimeMillis()) {
final long count = kpiData.getAndResetNumberOfEventsSent();
Expand Down Expand Up @@ -173,6 +192,10 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
}
}

private boolean isMemoryLimitReached(final long memoryUsed) {
return memoryUsed > config.getMaxMemoryUsageBytes();
}

private void publishKpi(final String appName, final long count, final long bytes) {
final String appNameHashed = kpiPublisher.hash(appName);

Expand Down
20 changes: 18 additions & 2 deletions src/main/java/org/zalando/nakadi/service/EventStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class EventStreamConfig {
private static final int STREAM_LIMIT_DEFAULT = 0;
private static final int BATCH_FLUSH_TIMEOUT_DEFAULT = 30;
private static final int STREAM_KEEP_ALIVE_LIMIT_DEFAULT = 0;
private static final long DEF_MAX_MEMORY_USAGE_BYTES = 50 * 1024 * 1024;
private static final Random RANDOM = new Random();

private final List<NakadiCursor> cursors;
Expand All @@ -27,10 +28,12 @@ public class EventStreamConfig {
private final int streamKeepAliveLimit;
private final String etName;
private final String consumingAppId;
private final long maxMemoryUsageBytes;

private EventStreamConfig(final List<NakadiCursor> cursors, final int batchLimit,
final int streamLimit, final int batchTimeout, final int streamTimeout,
final int streamKeepAliveLimit, final String etName, final String consumingAppId) {
final int streamKeepAliveLimit, final String etName, final String consumingAppId,
final long maxMemoryUsageBytes) {
this.cursors = cursors;
this.batchLimit = batchLimit;
this.streamLimit = streamLimit;
Expand All @@ -39,6 +42,7 @@ private EventStreamConfig(final List<NakadiCursor> cursors, final int batchLimit
this.streamKeepAliveLimit = streamKeepAliveLimit;
this.etName = etName;
this.consumingAppId = consumingAppId;
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
}

public List<NakadiCursor> getCursors() {
Expand Down Expand Up @@ -73,6 +77,10 @@ public String getConsumingAppId() {
return consumingAppId;
}

public long getMaxMemoryUsageBytes() {
return maxMemoryUsageBytes;
}

@Override
public String toString() {
return "EventStreamConfig{cursors=" + cursors + ", batchLimit=" + batchLimit
Expand Down Expand Up @@ -123,6 +131,7 @@ public static class Builder {
private int batchTimeout = BATCH_FLUSH_TIMEOUT_DEFAULT;
private int streamTimeout = generateDefaultStreamTimeout();
private int streamKeepAliveLimit = STREAM_KEEP_ALIVE_LIMIT_DEFAULT;
private long maxMemoryUsageBytes = DEF_MAX_MEMORY_USAGE_BYTES;
private String etName;
private String consumingAppId;

Expand All @@ -131,6 +140,13 @@ public Builder withCursors(final List<NakadiCursor> cursors) {
return this;
}

public Builder withMaxMemoryUsageBytes(@Nullable final Long maxMemoryUsageBytes) {
if (null != maxMemoryUsageBytes) {
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
}
return this;
}

public Builder withBatchLimit(@Nullable final Integer batchLimit) {
if (batchLimit != null) {
this.batchLimit = batchLimit;
Expand Down Expand Up @@ -190,7 +206,7 @@ public EventStreamConfig build() throws UnprocessableEntityException {
throw new UnprocessableEntityException("batch_limit can't be lower than 1");
}
return new EventStreamConfig(cursors, batchLimit, streamLimit, batchTimeout, streamTimeout,
streamKeepAliveLimit, etName, consumingAppId);
streamKeepAliveLimit, etName, consumingAppId, maxMemoryUsageBytes);
}
}

Expand Down
Loading

0 comments on commit 30622a7

Please sign in to comment.