Skip to content

Commit

Permalink
Event carries binlog filename and offset. Simplifies logic for
Browse files Browse the repository at this point in the history
EventListeners that care about checkpointing progress.
  • Loading branch information
Michael Schiff committed May 2, 2018
1 parent 6cdecfd commit 7f2de13
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ private void listenForEventPackets() throws IOException {
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
inputStream, binlogFilename, binlogPosition);
if (event == null) {
throw new EOFException();
}
Expand Down Expand Up @@ -1047,7 +1047,8 @@ public void unregisterEventListener(EventListener eventListener) {

private void notifyEventListeners(Event event) {
if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal());
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal(),
binlogFilename, binlogPosition);
}
synchronized (eventListeners) {
for (EventListener eventListener : eventListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,39 @@
*/
public class BinaryLogFileReader implements Closeable {

public static final byte[] MAGIC_HEADER = new byte[]{(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e};
public static final byte[] MAGIC_HEADER = new byte[] { (byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e };

private final ByteArrayInputStream inputStream;
private final EventDeserializer eventDeserializer;
private final String filename;
private long offset;

public BinaryLogFileReader(File file) throws IOException {
this(file, new EventDeserializer());
}

public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException {
this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer);
this(file != null ? file.getName() : null,
file != null ? new BufferedInputStream(new FileInputStream(file)) : null,
eventDeserializer);
}

public BinaryLogFileReader(InputStream inputStream) throws IOException {
this(inputStream, new EventDeserializer());
public BinaryLogFileReader(String filename, InputStream inputStream) throws IOException {
this(filename, inputStream, new EventDeserializer());
}

public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException {
public BinaryLogFileReader(String filename, InputStream inputStream, EventDeserializer eventDeserializer)
throws IOException {
if (filename == null) {
throw new IllegalArgumentException("File name cannot be NULL");
}
if (inputStream == null) {
throw new IllegalArgumentException("Input stream cannot be NULL");
}
if (eventDeserializer == null) {
throw new IllegalArgumentException("Event deserializer cannot be NULL");
}
this.filename = filename;
this.inputStream = new ByteArrayInputStream(inputStream);
try {
byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length);
Expand All @@ -79,7 +88,7 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser
* @return deserialized event or null in case of end-of-stream
*/
public Event readEvent() throws IOException {
return eventDeserializer.nextEvent(inputStream);
return eventDeserializer.nextEvent(inputStream, filename, offset++);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ public class Event implements Serializable {

private EventHeader header;
private EventData data;
private String binlogFilename;
private long binlogPosition;

public Event(EventHeader header, EventData data) {
public Event(EventHeader header, EventData data, String binlogFilename, long binlogPosition) {
this.header = header;
this.data = data;
this.binlogFilename = binlogFilename;
this.binlogPosition = binlogPosition;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
/**
* @return deserialized event or null in case of end-of-stream
*/
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
public Event nextEvent(ByteArrayInputStream inputStream, String binlogFilename, long binlogOffset)
throws IOException {
if (inputStream.peek() == -1) {
return null;
}
Expand All @@ -212,7 +213,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
}
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
}
return new Event(eventHeader, eventData);
return new Event(eventHeader, eventData, binlogFilename, binlogOffset);
}

private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class BinaryLogFileReaderIntegrationTest {

@Test
public void testNextEvent() throws Exception {
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila",
new GZIPInputStream(new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
try {
int numberOfEvents = 0;
while ((reader.readEvent()) != null) {
Expand All @@ -68,7 +68,7 @@ public void testDeserializationSuppressionByEventType() throws Exception {
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setEventDataDeserializer(EventType.XID, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer());
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila.gz", new GZIPInputStream(
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")), eventDeserializer);
try {
boolean n = true, b = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ private Event generateEvent(long timestamp, EventType type, long serverId, long
header.setEventType(type);
header.setServerId(serverId);
header.setNextPosition(nextPosition);
return new Event(header, null);
return new Event(header, null, "filename", 4);
}
}

0 comments on commit 7f2de13

Please sign in to comment.