Skip to content

Commit

Permalink
Merge branch 'track-nontx-inflight-messesages-with-backpressure'
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick DeLuca committed Feb 24, 2017
2 parents 7d8443b + d7dde54 commit 5b98fcd
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.replication.BinlogPosition;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.row.RowMap;
Expand All @@ -8,23 +7,25 @@ public abstract class AbstractAsyncProducer extends AbstractProducer {
public class CallbackCompleter {
private InflightMessageList inflightMessages;
private final MaxwellContext context;
private final BinlogPosition position;
private final int rowId;
private final boolean isTXCommit;

public CallbackCompleter(InflightMessageList inflightMessages, BinlogPosition position, boolean isTXCommit, MaxwellContext context) {
public CallbackCompleter(InflightMessageList inflightMessages, int rowId, boolean isTXCommit, MaxwellContext context) {
this.inflightMessages = inflightMessages;
this.context = context;
this.position = position;
this.rowId = rowId;
this.isTXCommit = isTXCommit;
}

public void markCompleted() {
if(isTXCommit) {
BinlogPosition newPosition = inflightMessages.completeMessage(position);
BinlogPosition newPosition = inflightMessages.completeTXMessage(rowId);

if(newPosition != null) {
context.setPosition(newPosition);
}
} else {
inflightMessages.completeNonTXMessage(rowId);
}
}
}
Expand All @@ -41,24 +42,25 @@ public AbstractAsyncProducer(MaxwellContext context) {

@Override
public final void push(RowMap r) throws Exception {
// Rows that do not get sent to a target will be automatically marked as complete.
// We will attempt to commit a checkpoint up to the current row.
if(!r.shouldOutput(outputConfig)) {
inflightMessages.addMessage(r.getPosition());
BinlogPosition newPosition = inflightMessages.completeMessage(r.getPosition());
if(r.isTXCommit()) {
inflightMessages.addTXMessage(r.getRowId(), r.getPosition());

if(newPosition != null) {
context.setPosition(newPosition);
}
// Rows that do not get sent to a target will be automatically marked as complete.
// We will attempt to commit a checkpoint up to the current row.
if(!r.shouldOutput(outputConfig)) {
BinlogPosition newPosition = inflightMessages.completeTXMessage(r.getRowId());

return;
}
if(newPosition != null) {
context.setPosition(newPosition);
}

if(r.isTXCommit()) {
inflightMessages.addMessage(r.getPosition());
return;
}
} else {
inflightMessages.addNonTXMessage(r.getRowId());
}

CallbackCompleter cc = new CallbackCompleter(inflightMessages, r.getPosition(), r.isTXCommit(), context);
CallbackCompleter cc = new CallbackCompleter(inflightMessages, r.getRowId(), r.isTXCommit(), context);

sendAsync(r, cc);
}
Expand Down
106 changes: 81 additions & 25 deletions src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
Original file line number Diff line number Diff line change
@@ -1,60 +1,116 @@
package com.zendesk.maxwell.producer;
/* respresents a list of inflight messages -- stuff being sent over the
network, that may complete in any order. Allows for only bumping
the binlog position upon completion of the oldest outstanding item.
/*
respresents a list of inflight messages -- stuff being sent over the
network, that may complete in any order. Allows for only bumping
the binlog position upon completion of the oldest outstanding item.
Assumes .addInflight(position) will be call monotonically.
*/
Assumes .addInflight(position) will be call monotonically.
*/

import com.zendesk.maxwell.replication.BinlogPosition;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Iterator;

public class InflightMessageList {
class InflightMessage {
class InflightTXMessage {
public final BinlogPosition position;
public boolean isComplete;
InflightMessage(BinlogPosition position) {

public InflightTXMessage(BinlogPosition position) {
this.position = position;
this.isComplete = false;
}
}

private LinkedHashMap<String, InflightMessage> linkedMap;
private final HashSet<Integer> nonTXMessages;
private final LinkedHashMap<Integer, InflightTXMessage> txMessages;

private final Lock txLock = new ReentrantLock();
private final Lock nonTXLock = new ReentrantLock();
private final Condition nonTXMessagesNotFull = nonTXLock.newCondition();

private final int MAX_INFLIGHT_NON_TX_MESSAGES = 10000;

public InflightMessageList() {
this.linkedMap = new LinkedHashMap<>();
this.nonTXMessages = new HashSet<Integer>();
this.txMessages = new LinkedHashMap<Integer, InflightTXMessage>();
}

public synchronized void addMessage(BinlogPosition p) {
InflightMessage m = new InflightMessage(p);
this.linkedMap.put(p.toString(), m);
public void addTXMessage(int rowId, BinlogPosition position) {
txLock.lock();

try {
InflightTXMessage m = new InflightTXMessage(position);
txMessages.put(rowId, m);
} finally {
txLock.unlock();
}

return;
}

/* returns the position that stuff is complete up to, or null if there were no changes */
public synchronized BinlogPosition completeMessage(BinlogPosition p) {
InflightMessage m = this.linkedMap.get(p.toString());
assert(m != null);
public void addNonTXMessage(int rowId) throws InterruptedException {
nonTXLock.lock();

try {
while(nonTXMessages.size() > MAX_INFLIGHT_NON_TX_MESSAGES) {
nonTXMessagesNotFull.await();
}

m.isComplete = true;
this.nonTXMessages.add(rowId);
} finally {
nonTXLock.unlock();
}
}

/* returns the position that stuff is complete up to, or null if there were no changes */
public BinlogPosition completeTXMessage(int rowId) {
BinlogPosition completeUntil = null;
Iterator<InflightMessage> iterator = this.linkedMap.values().iterator();

while ( iterator.hasNext() ) {
InflightMessage msg = iterator.next();
if ( !msg.isComplete )
break;
txLock.lock();

try {
InflightTXMessage m = txMessages.get(rowId);
assert(m != null);

m.isComplete = true;
Iterator<InflightTXMessage> iterator = txMessages.values().iterator();

while ( iterator.hasNext() ) {
InflightTXMessage msg = iterator.next();
if ( !msg.isComplete )
break;

completeUntil = msg.position;
iterator.remove();
}

completeUntil = msg.position;
iterator.remove();
} finally {
txLock.unlock();
}

return completeUntil;
}

public void completeNonTXMessage(int rowId) {
nonTXLock.lock();

try {
nonTXMessages.remove(rowId);
nonTXMessagesNotFull.signal();
} finally {
nonTXLock.unlock();
}

return;
}

public int size() {
return linkedMap.size();
return txMessages.size() + nonTXMessages.size();
}
}
14 changes: 11 additions & 3 deletions src/main/java/com/zendesk/maxwell/row/RowMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.concurrent.atomic.AtomicInteger;


public class RowMap implements Serializable {

public enum KeyFormat { HASH, ARRAY }

static final Logger LOGGER = LoggerFactory.getLogger(RowMap.class);

private final int rowId;
private final String rowType;
private final String database;
private final String table;
Expand All @@ -38,12 +39,13 @@ public enum KeyFormat { HASH, ARRAY }
private final LinkedHashMap<String, Object> oldData;
private final List<String> pkColumns;

private static final AtomicInteger rowIdCounter = new AtomicInteger(1);
private static final JsonFactory jsonFactory = new JsonFactory();

private long approximateSize;

private static final ThreadLocal<ByteArrayOutputStream> byteArrayThreadLocal =
new ThreadLocal<ByteArrayOutputStream>(){
new ThreadLocal<ByteArrayOutputStream>() {
@Override
protected ByteArrayOutputStream initialValue() {
return new ByteArrayOutputStream();
Expand All @@ -67,7 +69,9 @@ protected JsonGenerator initialValue() {
};

public RowMap(String type, String database, String table, Long timestamp, List<String> pkColumns,
BinlogPosition nextPosition) {
BinlogPosition nextPosition) {
rowIdCounter.compareAndSet(Integer.MAX_VALUE, 1);
this.rowId = rowIdCounter.getAndIncrement();
this.rowType = type;
this.database = database;
this.table = table;
Expand All @@ -79,6 +83,10 @@ public RowMap(String type, String database, String table, Long timestamp, List<S
this.approximateSize = 100L; // more or less 100 bytes of overhead
}

public int getRowId() {
return rowId;
}

public String pkToJson(KeyFormat keyFormat) throws IOException {
if ( keyFormat == KeyFormat.HASH )
return pkToJsonHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@ public class InflightMessageListTest {
@Before
public void setupBefore() {
list = new InflightMessageList();
list.addMessage(p1);
list.addMessage(p2);
list.addMessage(p3);
list.addTXMessage(1, p1);
list.addTXMessage(2, p2);
list.addTXMessage(3, p3);
}

@Test
public void testInOrderCompletion() {
BinlogPosition ret;


ret = list.completeMessage(p1);
ret = list.completeTXMessage(1);
assert(ret.equals(p1));

ret = list.completeMessage(p2);
ret = list.completeTXMessage(2);
assert(ret.equals(p2));

ret = list.completeMessage(p3);
ret = list.completeTXMessage(3);
assert(ret.equals(p3));

assert(list.size() == 0);
Expand All @@ -44,13 +44,13 @@ public void testInOrderCompletion() {
public void testOutOfOrderComplete() {
BinlogPosition ret;

ret = list.completeMessage(p3);
ret = list.completeTXMessage(3);
assert(ret == null);

ret = list.completeMessage(p2);
ret = list.completeTXMessage(2);
assert(ret == null);

ret = list.completeMessage(p1);
ret = list.completeTXMessage(1);
assertEquals(p3, ret);
}
}
}

0 comments on commit 5b98fcd

Please sign in to comment.