-
Notifications
You must be signed in to change notification settings - Fork 8
Full example using Promises.all, promises, AsyncCallback and Reakt Guava bridge to implement Cassandra repository.
This class uses an async supplier to connect to cassandra.
This example includes using Promises.all
, promises
, AsyncCallback
and Reakt/Guava bridge to implement a Cassandra repository
that is used by QBit and implements a circuit breaker. It also uses blockingPromises
for testing.
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.time.Duration;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static io.advantageous.reakt.guava.Guava.*;
import static org.slf4j.LoggerFactory.getLogger;
public class CassandraImprintStorageService implements ImprintStorageService {
/** Table to store impressions. */
public static final String IMPRESSIONS_TABLE = "imprints";
/** key space to do the storage. */
public static final String KEY_SPACE = "keyspace_imprints";
/** Logger. */
private static final Logger logger = getLogger(CassandraImprintStorageService.class);
/** Cassandra Session supplier. */
private final AsyncSupplier<Session> sessionAsyncSupplier;
/** QBit reactor for repeating tasks and callbacks that excecute on the caller's thread. */
private final Reactor reactor;
/** Reference to the cassandra session which get connected to async. */
private Expected<Session> sessionExpected = Expected.empty();
/** Error counts from Cassandra driver for the last time period. */
private AtomicLong errorCount = new AtomicLong();
/**
*
* @param sessionAsyncSupplier supplier to supply Cassandra session.
* @param reactor reactor to manage callbacks and repeating tasks.
*/
public CassandraImprintStorageService(final AsyncSupplier<Session> sessionAsyncSupplier,
final Reactor reactor) {
this.sessionAsyncSupplier = sessionAsyncSupplier;
this.reactor = reactor;
/* Connect the Cassandra session. */
connectSession();
/* This makes sure we are connected.
* Provide circuit breaker if sessionExpected is down to auto reconnect.
*/
reactor.addRepeatingTask(Duration.SECONDS.units(5), this::cassandraCircuitBreaker);
}
/**
* Store imprints into cassandra.
* @param callback callback
* @param imprints imprints
*/
@Override
public void store(final Callback<Boolean> callback,
final List<Imprint> imprints) {
sessionExpected()
/* if we are not connected, fail fast. */
.ifEmpty(() -> callback.reject("Not connected to Cassandra"))
/* If we are connected then call cassandra. */
.ifPresent(session -> doStoreImprints(session, callback, imprints));
}
private void cassandraCircuitBreaker() {
/** If the sessionExpected had more errors than allowed in the last time duration
* then close the sessionExpected and reconnect.
*/
if (errorCount.get() > 10) {
final Expected<Session> oldExpected = sessionExpected(); //Get the old session.
setsessionExpected(null); //shut it down.
try {
oldExpected.ifPresent(Session::close); //Close the old session.
} catch (Exception ex) {
logger.error("Shutting down cassandra and it failed", ex);
}
connectSession();
return;
}
/** If the cassandra sessionExpected is not connected or present, then connect the
* cassandra sessionExpected. */
sessionExpected()
.ifPresent(session -> {
/* If the session is closed then reconnect. */
if (session.isClosed()) {
setsessionExpected(null);
connectSession();
}
})
.ifEmpty(this::connectSession);
}
/**
* Connects the cassandra connection.
*/
private void connectSession() {
sessionAsyncSupplier.get(
Promises.<Session>promise()
.then(session -> {
logger.info("Cassandra sessionExpected is open");
setsessionExpected(session);
})
.catchError(error -> {
logger.error("Error connecting to Cassandra", error);
setsessionExpected(null);
})
);
}
/** Does the low level cassandra storage. */
private void doStoreImprints(final Session session,
final Callback<Boolean> callback,
final List<Imprint> imprints) {
/* Make many calls to cassandra using its async lib to store
each imprint. */
final List<Promise<Boolean>> promises = imprints.stream().map(imprint
-> doStoreImprint(session, imprint)).collect(Collectors.toList());
/* Uses Reakt Promises.all
* Create a parent promise to contain all of the promises we
* just created for each imprint.
*/
final Promise<Void> all = Promises.all(promises);
/*
* Store them all. Uses Reakt Promises.all
*/
all.then(nil -> callback.accept(true))
.catchError(callback::fail);
}
/**
* This gets called one time for each imprint passed to the <code>store(callback, imprints)</code> method.
* @param session cassandra session
* @param imprint imprint to store
* @return promise
*/
private Promise<Boolean> doStoreImprint(final Session session,
final Imprint imprint) {
final ResultSetFuture resultSetFuture = session.executeAsync(QueryBuilder.insertInto(IMPRESSIONS_TABLE)
.value("id", imprint.getId())
.value("metricType", imprint.getMetricType().name().toLowerCase())
.value("metricName", imprint.getMetricName())
.value("provider", imprint.getProvider().toString())
.value("externalId", imprint.getExternalId())
.value("value", imprint.getValue())
.value("created_at", imprint.getTimestamp())
);
final Promise<Boolean> returnedPromise = Promises.promise();
final Promise<ResultSet> promise = Promises.<ResultSet>promise()
.then(resultSet -> returnedPromise.reply(resultSet.wasApplied()))
.catchError((error) -> {
returnedPromise.reject(error);
if (error instanceof DriverException) {
logger.error("Error storing imprint", error);
errorCount.incrementAndGet();
}
});
/** Using Guava/Reakt bridge. */
registerCallback(resultSetFuture, promise);
return returnedPromise;
}
private synchronized void setsessionExpected(Session session) {
this.sessionExpected = Expected.ofNullable(session);
}
private synchronized Expected<Session> sessionExpected() {
return sessionExpected;
}
@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
public void process() {
reactor.process();
}
}
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Callback;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static io.advantageous.reakt.guava.Guava.registerCallback;
import static org.slf4j.LoggerFactory.getLogger;
public class CassandraSessionSupplier implements AsyncSupplier<Session> {
private final static AtomicInteger index = new AtomicInteger();
private static final Logger logger = getLogger(CassandraSessionSupplier.class);
private final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionsAsyncSupplier;
private final int replicationFactor;
private final String keyspace;
private final String tableName;
private final ExecutorService executorService = Executors.newFixedThreadPool(3);
public CassandraSessionSupplier(final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionAsyncSupplier,
final int replicationFactor,
final String keyspace,
final String tableName) {
this.endpointDefinitionsAsyncSupplier = endpointDefinitionAsyncSupplier;
this.replicationFactor = replicationFactor;
this.keyspace = keyspace;
this.tableName = tableName;
}
@Override
public void get(final Callback<Session> callback) {
logger.info("Loading Cassandra Session {} {}", keyspace, tableName);
endpointDefinitionsAsyncSupplier.get(
Promises.<List<EndpointDefinition>>promise()
.thenExpect(listExpected ->
listExpected.filter(endpointDefinitions -> endpointDefinitions.size() > 0)
.map(this::getEndPointDef)
.ifEmpty(() -> callback.reject("Cassandra was not found"))
.ifPresent(endpointDefinition ->
createCassandraSessionWithEndpoint(callback, endpointDefinition))
)
.catchError((e) -> callback.reject("Unable to lookup cassandra", e)));
}
private void createCassandraSessionWithEndpoint(final Callback<Session> callback,
final EndpointDefinition endpointDefinition) {
/** Use Reakt/Guava bridge. */
registerCallback(
Cluster.builder()
.withPort(endpointDefinition.getPort())
.addContactPoints(endpointDefinition.getHost())
.build().connectAsync(),
Promises.<Session>promise()
.catchError(e -> callback.reject("Unable to load initial session", e))
.thenExpect(sessionExpected ->
sessionExpected.ifEmpty(() -> callback.reject("Empty session returned from Cassandra Cluster"))
.ifPresent((Consumer<Session>) sessionWithoutKeyspace ->
buildDBIfNeeded(sessionWithoutKeyspace, callback, endpointDefinition))
));
}
private EndpointDefinition getEndPointDef(List<EndpointDefinition> endpointDefinitions) {
if (index.get() >= endpointDefinitions.size()) {
index.set(0);
}
return endpointDefinitions.get(index.getAndIncrement());
}
private void buildDBIfNeeded(final Session sessionWithoutKeyspace,
final Callback<Session> callback,
final EndpointDefinition endpointDefinition) {
executorService.execute(() -> {
try {
doBuildDatabase(sessionWithoutKeyspace);
} catch (Exception ex) {
callback.reject("Unable to create database", ex);
return;
}
loadSession(callback, endpointDefinition);
});
}
private void doBuildDatabase(Session sessionWithoutKeyspace) {
logger.info("Initializing Cassandra Tables if needed {} {}", keyspace, tableName);
sessionWithoutKeyspace.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION "
+ "= {'class':'SimpleStrategy', 'replication_factor':" + replicationFactor + "};");
sessionWithoutKeyspace.execute("USE " + keyspace);
sessionWithoutKeyspace.execute(
"CREATE TABLE IF NOT EXISTS " + tableName +
" (id bigint,\n" +
" metricType text,\n" +
" metricName text,\n" +
" provider text,\n" +
" externalId text,\n" +
" value bigint,\n" +
" created_at timestamp,\n" +
" primary key (artistId, created_at))\n" +
"WITH CLUSTERING ORDER BY (created_at desc);");
sessionWithoutKeyspace.close();
}
private void loadSession(final Callback<Session> callback,
final EndpointDefinition endpointDefinition) {
logger.info("Loading session with keyspace {} {}", keyspace, tableName);
registerCallback(
Cluster.builder()
.withPort(endpointDefinition.getPort())
.addContactPoints(endpointDefinition.getHost())
.build().connectAsync(keyspace),
Promises.<Session>promise()
.catchError(e -> callback.reject("Unable to load session", e))
.then(callback::reply)
);
}
}
Here is a test that shows using blocking promises to simplify the testing.
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.reakt.Reakt;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.junit.Test;
public class CassandraImprintStorageServiceTest {
@Test
public void testStore() throws Exception {
/* Supplier that will look up cassandra in lookup service. */
final CassandraSessionSupplier supplier = new CassandraSessionSupplier(endpointDefinitionsAsyncSupplier(), 2,
KEY_SPACE, IMPRESSIONS_TABLE);
/* Reactor that we use to react to callbacks in the same thread as the caller. */
final Reactor reactor = ReactorBuilder.reactorBuilder().build();
/* Storage service. */
final CassandraImprintStorageService cassandraImprintStorageService = new CassandraImprintStorageService(supplier,
reactor);
Thread.sleep(1000);
/* Promise to handle the callback.
* Since we are testing, we can use a blocking callback.
*/
final Promise<Boolean> promise = Promises.<Boolean>blockingPromise().then(worked -> {
System.out.println("Did it work? " + worked);
}).catchError(Throwable::printStackTrace);
cassandraImprintStorageService.store(Reakt.convertPromise(promise),
asList(
ImprintBuilder.imprintBuilder()
.setArtistId(1L).setMetricType(MetricType.PLAYS)
.setExternalId("track").setTimestamp(System.currentTimeMillis())
.setMetricName("flips")
.setProvider("flipgram")
.setValue(100).build(),
ImprintBuilder.imprintBuilder()
.setArtistId(2L).setMetricType(MetricType.REACH)
.setExternalId("track").setTimestamp(System.currentTimeMillis())
.setMetricName("views")
.setProvider("facebook")
.setValue(50_000).build()
)
);
assertTrue(promise.get());
}
}
Here is another test showing testing a AsyncSupplier
import com.datastax.driver.core.Session;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.junit.Test;
public class CassandraSessionSupplierTest {
@Test
public void testGet() throws Exception {
CassandraSessionSupplier supplier = new CassandraSessionSupplier(endpointDefinitionsAsyncSupplier(), 2,
KEY_SPACE, IMPRESSIONS_TABLE);
/** Use blocking promise to simplify the test. */
final Promise<Session> promise = Promises.<Session>blockingPromise();
promise.catchError(throwable -> {
System.err.println("Big problems");
throwable.printStackTrace();
});
supplier.get(promise);
final Session session = promise.get();
assertNotNull(session);
}
}
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Reactor, Stream, Results
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Elekt Consul Leadership election
- Elekt Leadership election
- Reactive Microservices
What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Callback, and async Results
Reactor, Stream and Stream Result
Expected & Circuit Breaker
Scala Akka and Reakt