Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add SSE Support #1508

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft

Conversation

aesteve
Copy link
Contributor

@aesteve aesteve commented Jan 10, 2020

From: https://github.com/aesteve/vertx-sse
Original discussion: reactiverse/reactiverse#5
Status: Still WIP. Misses documentation + implementation must be discussed further on.

Motivation:

Provide a way to deal with Server-Sent-Events with Vert.x
SSE allows unidirectional communication between server and client in a pure HTTP way.

Status:

I migrated most of the code from my 3rd party implementation to make it officially supported by the Vert.x stack, as discussed in this reactiverse issue.

At this point, I'd like to receive feedback on:

  • where to add documentation, and how to write it properly
  • is the implementation correct, does it align with Vert.x 4 upcoming standards (Future/Promise, etc.)
  • is the implementation codegen-compatible
  • are you OK with names, variables, coding standards

Thank you.

Copy link
Contributor

@tsegismont tsegismont left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aesteve thank you and sorry for the delay. I've made a first pass.

import io.vertx.core.http.HttpClientOptions;
import io.vertx.ext.web.handler.sse.impl.EventSourceImpl;

public interface EventSource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget Javadoc


@Test
public void noHeaderTextEventStreamHttpRequest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting issues here (and other places). The project has an editorConfig file so if your IDE supports it, you should be able to reformat easily.

SSEConnection retry(Long delay, String data);

@Fluent
SSEConnection data(List<String> data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending multiple lines at once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why, as a user, would you want to split content into multiple data events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷🏻‍♂️
In the docs here there's an example, but not really obvious:

data: Here's a system message of some kind that will get used
data: to accomplish some task.

We can remove it if you think it's not accurate. It can easily be achieved by invoking data() multiple times. No worries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood that you can send multiple data messages but I was wondering if there are cases when you need to split content.

Copy link
Contributor Author

@aesteve aesteve May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, invoking data(...) multiple times doesn't really achieve the same thing.
Since invoking data(...) finishes the message with \n\n.
But in the example from the docs both data: are separated with just one carriage return, which means on the client side they'll be merged into one single String.

So this point is NOT addressed yet. We need to figure out how to mimic this example on the server, and fix the EventSource client so that it merges multiple data: lines into one single String to conform to the spec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are different ways we can address this. One could be, in SSEConnection to have:

  • a generic: SSEConnection data(String msg, boolean endPacket) method
  • rewrite SSEConnection data(String msg) method to be default SSEConnection data(String msg) { return data(msg, true); }
  • And then let the user invoke data("...", false);data("......", false);data("last message");

What do you think of this @tsegismont if you agree, it'll allow me to test the EventSource counterpart (merging multi-line messages into a single String) as mentioned in the specs very easily, since the multi-line functionnality would be supported on the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this, which allowed me to easily write a unit test for the EventSource client ability to merge multiline data into a Single String. Let me know what you think about it.

@tsegismont
Copy link
Contributor

where to add documentation, and how to write it properly

You should update the index.adoc file in the vertx-web module.

is the implementation correct, does it align with Vert.x 4 upcoming standards (Future/Promise, etc.)

The standard for Vert.x 4 is to have good old callback style as well as futurized versions of the async methods.

is the implementation codegen-compatible

The compiler will tell you for every object annotated with @VertxGen.
Then you could also build the vertx-kotlin and vertx-rx projects locally with your modified version of vertx-web and see how it looks like.

are you OK with names, variables, coding standards

I haven't found anything odd except formatting and missing copyright headers.

Other than that, I wonder if this shouldn't be split into two parts: the Vert.x Web handler in the Vert.x Web module, and the client in a separate module. What do you think @pmlopes ?

@aesteve
Copy link
Contributor Author

aesteve commented Jan 30, 2020

Ok so let me create a check list or there's a chance I'll be missing a lot of stuff since there's a lot of work to do:

  • Reformat the whole code using Vert.x standards (old projects PR-submitted code doesn't have the same indent)
  • Copyright headers
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/EventSource.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEConnection.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEHandler.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEHeaders.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/EventSourceImpl.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/SSEConnectionImpl.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/SSEHandlerImpl.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/SSEPacket.java
    • vertx-web/src/test/java/io/vertx/ext/web/handler/sse/SSETestBase.java
    • vertx-web/src/test/java/io/vertx/ext/web/handler/sse/SSETestClose.java
    • vertx-web/src/test/java/io/vertx/ext/web/handler/sse/SSETestEstablishConnection.java
    • vertx-web/src/test/java/io/vertx/ext/web/handler/sse/SSETestReceiveData.java
    • vertx-web/src/test/java/io/vertx/ext/web/handler/sse/SSETestRequestResponseHeaders.java
  • EventSource.close() should return void
  • SSEHeaders
    • Turn it into an enum
    • Change visibility to package protected => Actually NO, see below. It can be used for event bus messages
    • Move to impl package => Actually NO, see below. It should be back to the root package
  • @VertxGen
    • Add annotation to vertx-web/src/main/java/io/vertx/ext/web/handler/sse/EventSource.java
    • Add annotation to vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEConnection.java
    • Add annotation to vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEHandler.java
    • Add annotation to vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEHeaders.java
    • Check it generates the right stuff
    • Install locally + build vertx-kotlin, vertx-rx against this version
  • Perf improvements
    • In SSEHandlerImpl.handle use request.headers().contains("Accept", "text/event-stream", true) + do not create the connection if not appropriate
  • Remove reject(int statusCode) on SSEConnection
  • Add EventSource.addEventListener as a shortcut for  EventSource.onEvent as per the docs
  • Move SSEHandler.closeHandler to SSEConnection.closeHandler
  • Do not use Long but long in SSEConnection.retry(...)
  • Remove EventSource.onclose since it's not supported by browsers
  • Do not mix events and data
    • Remove 2nd parameter to SSEConnection.retry, data should be sent separately
    • Remove 2nd parameter to SSEConnection.event, data should be sent separately
    • Remove 2nd parameter to SSEConnection.id, data should be sent separately
  • Do not let the ability to send a list of data, it's useless
    • io.vertx.ext.web.handler.sse.SSEConnection#forward(java.util.List<java.lang.String>)
    • io.vertx.ext.web.handler.sse.SSEConnection#retry(java.lang.Long, java.util.List<java.lang.String>)
    • io.vertx.ext.web.handler.sse.SSEConnection#event(java.lang.String, java.util.List<java.lang.String>)
    • io.vertx.ext.web.handler.sse.SSEConnection#id(java.lang.String, java.util.List<java.lang.String>)
  • synchronized for every field and methods in
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/EventSourceImpl.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/SSEConnectionImpl.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/impl/SSEHandlerImpl.java
  • Re-design SSEHandler.connectHandler and SSEHandler.closeHandler so that they take a single handler as parameter (therefore the fields tracking those handlers don't need to be lists anylonger)
  • EventSourceOptions instead of HttpClientOptions
    • Create EventSourceOptions (+ copyright), including  retryPeriod
    • Replace HttpClientOptions with EventSourceOptions
    • Write a test for the retryPeriod option
  • EventSourceImpl
    • Handle HTTP 204 + write a test for it
    • Handle redirect + write a test for it
    • Handle reconnect + write a test for it
  • BugFixes / Spec conformity
  • Improvements
    • SSEHeaders should actually be public, since it can be used as Event Bus Message headers
  • Missing tests
    • Forwarding messages from the event bus
    • Data split on 2 different lines should be concatenated
    • Reconnect with lastId => data received with id, lastId should be up-to-date in eventSource, disconnect / reconnect -> the "lastId" sent to server should be the last one received, not the same as the one used when connecting for the first time (for example)
  • Javadoc for public interfaces / classes
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/EventSourceOptions.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/EventSource.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEConnection.java
    • vertx-web/src/main/java/io/vertx/ext/web/handler/sse/SSEHandler.java

That's a ton of work to do, I'll give it a try by the end of february I think.

Thanks a lot for all the reviews.

@aesteve
Copy link
Contributor Author

aesteve commented May 22, 2020

Just waiting for a second review @tsegismont before next phase (javadocs + official docs).

Plus, converting to draft until Doc is there, so that it doesn't get merged by accident;

@aesteve aesteve requested a review from tsegismont May 22, 2020 18:59
@aesteve aesteve marked this pull request as draft May 22, 2020 18:59
@aesteve
Copy link
Contributor Author

aesteve commented Jun 1, 2020

Note: about EventSource, would RecordParser make the implementation easier?
Something I need to investigate.

}

@Fluent
EventSource connect(String path, Handler<AsyncResult<Void>> handler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather connectHandler

EventSource connect(String path, String lastEventId, Handler<AsyncResult<Void>> handler);

@Fluent
EventSource onMessage(Handler<String> messageHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather messageHandler

EventSource onMessage(Handler<String> messageHandler);

@Fluent
EventSource onEvent(String eventName, Handler<String> handler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather eventHandler

EventSource onEvent(String eventName, Handler<String> handler);

@Fluent
default EventSource onError(Handler<String> handler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather exceptionHandler

}

@Fluent
default EventSource addEventListener(String eventName, Handler<String> handler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather eventHandler

vertx.cancelTimer(retryTimerId);
retryTimerId = null;
}
if (client != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the client close outside of the synchronized block (after copying the reference to a local variable and setting the ref to null)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, do you mean something like this?

  @Override
  public void close() {
    HttpClient client;
    synchronized (this) {
      if (retryTimerId != null) {
        vertx.cancelTimer(retryTimerId);
        retryTimerId = null;
      }

      client = this.client;
      this.client = null;
      connected = false;
    }

    if (client != null) {
      try {
        client.close();
      } catch (Exception e) {
        log.error("An error occurred closing the EventSource: ", e);
      }
    }
  }

try {
client.close();
} catch(Exception e ) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log ?

currentPacket = new SSEPacket();
}
boolean terminated = currentPacket.append(buffer);
Optional<Handler<String>> eventHandler = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid using Optional here ?


String lastId();

@GenIgnore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenIgnore is not needed I think

SSEConnection comment(String comment);

@Fluent
SSEConnection retry(long delay);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does retry ?

Copy link
Member

@pmlopes pmlopes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to discuss the API, it seems to me, that this PR is trying to do client and server SSE on the vertx-web module (instead of being split between web, and web-client). It also looks (but i may be totally wrong here) that it tries to re-implement the sockjs bridge for SSE, which gets us to achieve the same in 2 different ways.

I think we should clearly define the goals, but we're already going on the right direction!

}

@Fluent
SSEHandler connectHandler(Handler<SSEConnection> connection);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be some explanation why is this method needed/what it does.

public synchronized void handle(RoutingContext context) {
HttpServerRequest request = context.request();
HttpServerResponse response = context.response();
List<String> acceptHeader = request.headers().getAll(HttpHeaders.ACCEPT.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the context.parsedHeaders() instead of handling the header yourself. This ensures order and quality are ensured. for example:

List<MIMEHeader> acceptableTypes = context.parsedHeaders().accept();
if(!acceptableTypes.isEmpty()) {
  MIMEHeader selectedAccept = context.parsedHeaders()
    .findBestUserAcceptedIn(acceptableTypes, "text/event-stream");
  
  if (selectedAccept == null) {
    return 406;
...

* This enumeration is public since it can be used as EventBus message headers,
* in case the user needs to forward messages from the event-bus with metadata (like "event:", or "id:")
*/
@VertxGen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should have this. Having this means that we will now have 2 ways to achieve the same:

  1. the eventbus sockjs bridge
  2. the sse handler

IMHO, we should keep SSE as simple SSE. Instead have a single DataObject that defines the EventSource as per spec, or https://developer.mozilla.org/en-US/docs/Web/API/EventSource

{
  event: String,
  data: Buffer,
  id: String,
  retry: Integer
}

import java.util.Map;
import java.util.Optional;

class SSEPacket {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be combined with the SSEHeaders so we have a single DataObject to describe the Event.

// A simple SSE handler that doesn't do anything specific apart accepting the connection
SSEHandler sseHandler = SSEHandler.create();
sseHandler.connectHandler(connection -> {
this.connection = connection; // accept
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this (from an API perspective)? Access Control should be done at a previous handler IMHO (unless this isn't for access control)

import io.vertx.core.json.JsonObject;

@DataObject(generateConverter = true)
public class EventSourceOptions extends HttpClientOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should go to the vertx-web-client module, not vertx-web

import io.vertx.ext.web.RoutingContext;

@VertxGen
public interface SSEConnection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this doing a similar task as SSEPack and SSEHeader?

@rgmz
Copy link
Contributor

rgmz commented May 30, 2021

Hey @aesteve,

Have you had a chance to make any updates? I'd like to help out, but don't want to duplicate effort.


Assuming not, @vietj / @pmlopes / @tsegismont, what's the best way to make this compatible with Vert.x 4.x?

1. HttpClient.redirectHandler method used to expect Function<HttpClientResponse,Future<HttpClientRequest>>, but now expects Function<HttpClientResponse,Future<RequestOptions>>:

client.redirectHandler(resp -> {
String redirect = resp.headers().get(HttpHeaders.LOCATION);
return Future.succeededFuture(createRequest(redirect, lastEventId, handler));
});

2. HttpClient.request used to return HttpClientRequest, but now returns Future<HttpClientRequest>:

private HttpClientRequest createRequest(String path, String lastEventId, Handler<AsyncResult<Void>> handler) {
HttpClientRequest request = client.request(HttpMethod.GET, path);
request.setFollowRedirects(true);
request.onFailure(cause -> handler.handle(Future.failedFuture(cause)));
request.onSuccess(response -> {
if (shouldReconnect(response)) {
client.close();
client = null;
getEventErrorHandler().ifPresent(errorHandler -> errorHandler.handle("")); // FIXME: error type/name

Given those two changes, I'm not sure how I would implement equivalent logic in EventSourceImpl for 4.1.0. My best guess is to handle the redirectHandler and createRequest options separately, however, I'm not sure if that would be equivalent to the existing logic as the redirectHandler no longer has access to the request's onFailure/onSuccess conditions. (Edit: I suppose I'd dispatch the request and then set the redirectHandler -- opposite from the current logic.)

client.redirectHandler(resp -> {
String redirect = resp.headers().get(HttpHeaders.LOCATION);
return Future.succeededFuture(createRequest(redirect, lastEventId, handler));
});
createRequest(path, lastEventId, handler).end();
return this;

I'll upload my attempt later today; any ideas/guidance would be appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants