Skip to content

Commit

Permalink
Added support for load-balancing in datafeed id persistence / re-use
Browse files Browse the repository at this point in the history
  • Loading branch information
Yong Sheng Tan committed Oct 30, 2019
1 parent ac9da70 commit c68feef
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 11 deletions.
File renamed without changes.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@
<dependency>
<groupId>org.symphonyoss.symphony</groupId>
<artifactId>messageml</artifactId>
<version>0.9.45</version>
<version>0.9.46</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
Expand All @@ -213,7 +213,7 @@
<dependency>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
<version>0.6.5</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/clients/symphony/api/DatafeedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ public final class DatafeedClient extends APIClient {
private final Logger logger = LoggerFactory.getLogger(DatafeedClient.class);
private SymBotClient botClient;
private SymConfig config;
private static String agentTarget;

public DatafeedClient(SymBotClient client) {
this.botClient = client;
this.config = client.getConfig();
agentTarget = CommonConstants.HTTPS_PREFIX + config.getAgentHost() + ":" + config.getAgentPort();
}

private String getAgentTarget() {
return CommonConstants.HTTPS_PREFIX + config.getAgentHost() + ":" + config.getAgentPort();
}

public String createDatafeed() throws SymClientException {
Response response = null;
StringId datafeedId = null;
try {
logger.info("Creating new datafeed for bot {} .....", botClient.getBotUserInfo().getUsername());
response = botClient.getAgentClient().target(agentTarget)
logger.info("Creating new datafeed for bot {}..", botClient.getBotUserInfo().getUsername());
response = botClient.getAgentClient().target(getAgentTarget())
.path(AgentConstants.CREATEDATAFEED)
.request(MediaType.APPLICATION_JSON)
.header("sessionToken", botClient.getSymAuth().getSessionToken())
Expand All @@ -55,7 +57,8 @@ public String createDatafeed() throws SymClientException {
datafeedId = response.readEntity(StringId.class);
logger.info("Created new datafeed {} for bot {}", datafeedId.getId(),
botClient.getBotUserInfo().getUsername());
writeDatafeedIdToDisk(datafeedId.getId());

writeDatafeedIdToDisk(botClient.getConfig(), datafeedId.getId());
}
return datafeedId.getId();
} finally {
Expand All @@ -65,13 +68,15 @@ public String createDatafeed() throws SymClientException {
}
}

private void writeDatafeedIdToDisk(String datafeedId) {
private void writeDatafeedIdToDisk(SymConfig config, String datafeedId) {
String agentHostPort = config.getAgentHost() + ":" + config.getAgentPort();

File file = new File("." + File.separator + "datafeed.id");
if (file.isDirectory()) {
file = new File("." + File.separator + "datafeed.id" + File.separator + "datafeed.id");
}
try (FileWriter fw = new FileWriter(file)) {
fw.write(datafeedId);
fw.write(datafeedId + "@" + agentHostPort);
fw.flush();
} catch (IOException ex) {
logger.error(ex.getMessage());
Expand All @@ -83,7 +88,7 @@ public List<DatafeedEvent> readDatafeed(String id) throws SymClientException {
Response response = null;
logger.debug("Reading datafeed {}", id);
try {
WebTarget webTarget = botClient.getAgentClient().target(agentTarget);
WebTarget webTarget = botClient.getAgentClient().target(getAgentTarget());
response = webTarget
.path(AgentConstants.READDATAFEED.replace("{id}", id))
.request(MediaType.APPLICATION_JSON)
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/configuration/SymLoadBalancedConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public int getAgentPort() {
return super.getAgentPort();
}

public void setCurrentAgentIndex(int currentAgentIndex) {
this.currentAgentIndex = currentAgentIndex;
}

public void setActualAgentHost(String actualAgentHost) {
this.actualAgentHost = actualAgentHost;
}

private String getActualAgentHost() {
String externalAgentHost = (agentServers != null && agentServers.size() > 0) ?
agentServers.get(0) : getAgentHost();
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/services/DatafeedEventsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import clients.SymBotClient;
import clients.symphony.api.DatafeedClient;
import configuration.LoadBalancingMethod;
import configuration.SymConfig;
import configuration.SymLoadBalancedConfig;
import exceptions.SymClientException;
Expand Down Expand Up @@ -55,7 +56,20 @@ public DatafeedEventsService(SymBotClient client) {
file = new File("." + File.separator + "datafeed.id" + File.separator + "datafeed.id");
}
Path datafeedIdPath = Paths.get(file.getPath());
datafeedId = Files.readAllLines(datafeedIdPath).get(0);
String[] persistedDatafeed = Files.readAllLines(datafeedIdPath).get(0).split("@");
datafeedId = persistedDatafeed[0];

if (client.getConfig() instanceof SymLoadBalancedConfig) {
SymLoadBalancedConfig lbConfig = (SymLoadBalancedConfig) client.getConfig();
String[] agentHostPort = persistedDatafeed[1].split(":");
if (lbConfig.getLoadBalancing().getMethod() == LoadBalancingMethod.external) {
lbConfig.setActualAgentHost(agentHostPort[0]);
} else {
int previousIndex = lbConfig.getAgentServers().indexOf(agentHostPort[0]);
lbConfig.setCurrentAgentIndex(previousIndex);
}
}

logger.info("Using previous datafeed id: {}", datafeedId);
} catch (IOException e) {
logger.info("No previous datafeed id file");
Expand Down

0 comments on commit c68feef

Please sign in to comment.