Skip to content

Commit

Permalink
#4 add timeout support for Iso8583Client.send. Reduce logging, improv…
Browse files Browse the repository at this point in the history
…e integration test
  • Loading branch information
kpavlov committed Feb 11, 2018
1 parent ec9dc9d commit 3b2175f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 37 deletions.
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Then add dependency to your project:
<dependency>
<groupId>com.github.kpavlov.jreactive8583</groupId>
<artifactId>netty-iso8583</artifactId>
<version>0.2.1</version>
<version>0.2.2</version>
</dependency>
<dependencies>

Expand All @@ -78,11 +78,15 @@ if (client.isConnected()) { // [7]

IsoMessage message = messageFactory.newMessage(...);
...
client.send(message);// [8]
client.sendAsync(message);// [8]
// or
client.send(message);// [9]
// or
client.send(message, 1, TimeUnit.SECONDS);// [10]
}

...
client.shutdown();// [9]
client.shutdown();// [11]
```
1. First you need to create a `MessageFactory`
2. Then you create a [`Iso8583Client`][Iso8583Client] providing `MessageFactory` and, optionally, `SocketAddress`
Expand All @@ -91,8 +95,10 @@ client.shutdown();// [9]
5. Initialize client. Now it is ready to connect.
6. Establish a connection. By default, if connection will is lost, it reconnects automatically. You may disable this behaviour or change _reconnectInterval_.
7. Verify that connection is established
8. Send `IsoMessage`
9. Disconnect when you're done.
8. Send `IsoMessage` asynchronously
9. Send `IsoMessage` synchronously
10. Send `IsoMessage` synchronously with timeout support
11. Disconnect when you're done.

## Creating and Using ISO-8583 Server

Expand Down Expand Up @@ -156,8 +162,8 @@ For frequently asked questions check the [FAQ](https://github.com/kpavlov/jreact

## ISO 8583 Links

- Beginner's guide: http://www.lytsing.org/downloads/iso8583.pdf.
- Introduction to ISO8583: http://www.codeproject.com/Articles/100084/Introduction-to-ISO-8583.
- Beginner's guide: http://www.lytsing.org/downloads/iso8583.pdf
- Introduction to ISO8583: http://www.codeproject.com/Articles/100084/Introduction-to-ISO-8583
- NPM package for Packing and unpacking ISO 8583 messages: https://www.npmjs.com/package/iso-8583

[iso8583]: https://en.wikipedia.org/wiki/ISO_8583
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

public class Iso8583Client<T extends IsoMessage> extends AbstractIso8583Connector<ClientConfiguration, Bootstrap, T> {

Expand Down Expand Up @@ -82,7 +83,7 @@ public ChannelFuture connect(SocketAddress serverAddress) throws InterruptedExce
* channel is active.
*/
public ChannelFuture connectAsync() {
logger.info("Connecting to {}", getSocketAddress());
logger.debug("Connecting to {}", getSocketAddress());
final Bootstrap b = getBootstrap();
reconnectOnCloseListener.requestReconnect();
final ChannelFuture connectFuture = b.connect();
Expand All @@ -92,7 +93,7 @@ public ChannelFuture connectAsync() {
return;
}
Channel channel = connectFuture.channel();
logger.info("Client is connected to {}", channel.remoteAddress());
logger.debug("Client is connected to {}", channel.remoteAddress());
setChannel(channel);
channel.closeFuture().addListener(reconnectOnCloseListener);
});
Expand Down Expand Up @@ -167,6 +168,10 @@ public void send(IsoMessage isoMessage) throws InterruptedException {
sendAsync(isoMessage).sync().await();
}

public void send(IsoMessage isoMessage, long timeout, TimeUnit timeUnit) throws InterruptedException {
sendAsync(isoMessage).sync().await(timeout, timeUnit);
}

public boolean isConnected() {
Channel channel = getChannel();
return channel != null && channel.isActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ public Iso8583Decoder(MessageFactory messageFactory) {
this.messageFactory = messageFactory;
}

/**
* @implNote Message body starts immediately, no length header,
* see <code>"lengthFieldFameDecoder"</code> in
* {@link com.github.kpavlov.jreactive8583.netty.pipeline.Iso8583ChannelInitializer#initChannel}
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List out) throws Exception {
//message body starts immediately, no length header
if (!byteBuf.isReadable()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public class Iso8583ChannelInitializer<
private final MessageFactory isoMessageFactory;
private final ChannelHandler[] customChannelHandlers;
private final Iso8583Encoder isoMessageEncoder;
private ChannelHandler loggingHandler;
private final ChannelHandler loggingHandler;
private final ChannelHandler parseExceptionHandler;
private int headerLength = DEFAULT_LENGTH_HEADER_LENGTH;

public Iso8583ChannelInitializer(
Expand All @@ -62,6 +63,7 @@ public Iso8583ChannelInitializer(

this.isoMessageEncoder = createIso8583Encoder(headerLength);
this.loggingHandler = createLoggingHandler(configuration);
this.parseExceptionHandler = createParseExceptionHandler();
}

@Override
Expand All @@ -79,7 +81,7 @@ public void initChannel(T ch) {
}

if (configuration.replyOnError()) {
pipeline.addLast(workerGroup, "replyOnError", createParseExceptionHandler());
pipeline.addLast(workerGroup, "replyOnError", parseExceptionHandler);
}

pipeline.addLast("idleState", new IdleStateHandler(0, 0, configuration.getIdleTimeout()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
31=Amount, settlement processing fee
32=Acquiring institution identification code
33=Forwarding institution identification code
34= Primary account number, extended
34=Primary account number, extended
35=Track 2 data
36=Track 3 data
37=Retrieval reference number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,65 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;

public class ClientServerIT extends AbstractIT {

private volatile IsoMessage capturedRequest;
private final Map<Integer, IsoMessage> receivedMessages = new ConcurrentHashMap<>();

@Before
public void beforeTest() {
server.addMessageListener(new IsoMessageListener<IsoMessage>() {
@Override
public boolean applies(IsoMessage isoMessage) {
return true;
}

@Override
public boolean onMessage(ChannelHandlerContext ctx, IsoMessage isoMessage) {
final Integer stan = Integer.valueOf(isoMessage.getObjectValue(11));
receivedMessages.put(stan, isoMessage);
return true;
}
});
server.addMessageListener(new IsoMessageListener<IsoMessage>() {

@Override
public boolean applies(IsoMessage isoMessage) {
return isoMessage.getType() == 0x200;
return isoMessage.getType() == 0x200;
}

@Override
public boolean onMessage(ChannelHandlerContext ctx, IsoMessage isoMessage) {
capturedRequest = isoMessage;
final IsoMessage response = server.getIsoMessageFactory().createResponse(isoMessage);
response.setField(39, IsoType.ALPHA.value("00", 2));
response.setField(60, IsoType.LLLVAR.value("XXX", 3));
ctx.writeAndFlush(response);
return false;
}
});
}

@Test
public void testConnected() throws Exception {
TestUtil.waitFor("server started", server::isStarted);
TestUtil.waitFor("client connected", client::isConnected);
}

@Test
public void shouldSendAsyncCaptureRequest() {
// given
final IsoMessage finMessage = client.getIsoMessageFactory().newMessage(0x0200);
finMessage.setField(60, IsoType.LLLVAR.value("foo", 3));
client.send(finMessage);

TestUtil.waitFor("capture request received", ()->(capturedRequest != null));
final Integer stan = finMessage.getObjectValue(11);
// when
client.sendAsync(finMessage);
// then
TestUtil.waitFor("capture request received", () -> receivedMessages.containsKey(stan));

IsoMessage capturedRequest = receivedMessages.remove(stan);
assertThat("fin request", capturedRequest, notNullValue());
assertThat("fin request", capturedRequest.debugString(), equalTo(finMessage.debugString()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.github.kpavlov.jreactive8583.example;

import com.github.kpavlov.jreactive8583.example.client.Iso8583ClientConfig;
import com.github.kpavlov.jreactive8583.example.server.Iso8583ServerConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;

@Configuration
@ImportResource("classpath:/test-config.xml")
@Import({
Iso8583ClientConfig.class,
Iso8583ServerConfig.class
})
@PropertySource("application-test.properties")
public class TestConfig {

static {
Expand Down
13 changes: 0 additions & 13 deletions src/test/resources/test-config.xml

This file was deleted.

0 comments on commit 3b2175f

Please sign in to comment.