Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Add event handler operations to client (#2502)
Browse files Browse the repository at this point in the history
* Add event handler operations to client

* Disable system task workers for integration tests and remove ignored test
  • Loading branch information
jxu-nflx authored Oct 7, 2021
1 parent ab746d1 commit 0af6a11
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private void handleUniformInterfaceException(UniformInterfaceException exception
return;
}
String errorMessage = clientResponse.getEntity(String.class);
LOGGER.error(
LOGGER.warn(
"Unable to invoke Conductor API with uri: {}, unexpected response from server: statusCode={}, responseBody='{}'.",
uri, clientResponse.getStatus(), errorMessage);
ErrorResponse errorResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.netflix.conductor.client.http;

import com.google.common.base.Preconditions;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import org.apache.commons.lang.StringUtils;

import java.util.List;

// Client class for all Event Handler operations
public class EventClient extends ClientBase {
private static final GenericType<List<EventHandler>> eventHandlerList = new GenericType<List<EventHandler>>() {
};
/**
* Creates a default metadata client
*/
public EventClient() {
this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
}

/**
* @param clientConfig REST Client configuration
*/
public EventClient(ClientConfig clientConfig) {
this(clientConfig, new DefaultConductorClientConfiguration(), null);
}

/**
* @param clientConfig REST Client configuration
* @param clientHandler Jersey client handler. Useful when plugging in various http client interaction modules (e.g.
* ribbon)
*/
public EventClient(ClientConfig clientConfig, ClientHandler clientHandler) {
this(clientConfig, new DefaultConductorClientConfiguration(), clientHandler);
}

/**
* @param config config REST Client configuration
* @param handler handler Jersey client handler. Useful when plugging in various http client interaction modules
* (e.g. ribbon)
* @param filters Chain of client side filters to be applied per request
*/
public EventClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) {
this(config, new DefaultConductorClientConfiguration(), handler, filters);
}

/**
* @param config REST Client configuration
* @param clientConfiguration Specific properties configured for the client, see {@link ConductorClientConfiguration}
* @param handler Jersey client handler. Useful when plugging in various http client interaction modules
* (e.g. ribbon)
* @param filters Chain of client side filters to be applied per request
*/
public EventClient(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler,
ClientFilter... filters) {
super(config, clientConfiguration, handler);
for (ClientFilter filter : filters) {
super.client.addFilter(filter);
}
}

/**
* Register an event handler with the server
*
* @param eventHandler the eventHandler definition
*/
public void registerEventHandler(EventHandler eventHandler) {
Preconditions.checkNotNull(eventHandler, "Event Handler definition cannot be null");
postForEntityWithRequestOnly("event", eventHandler);
}

/**
* Updates an event handler with the server
*
* @param eventHandler the eventHandler definition
*/
public void updateEventHandler(EventHandler eventHandler) {
Preconditions.checkNotNull(eventHandler, "Event Handler definition cannot be null");
put("event", null, eventHandler);
}

/**
* @param event name of the event
* @param activeOnly if true, returns only the active handlers
* @return Returns the list of all the event handlers for a given event
*/
public List<EventHandler> getEventHandlers(String event, boolean activeOnly) {
Preconditions.checkArgument(org.apache.commons.lang3.StringUtils.isNotBlank(event), "Event cannot be blank");

return getForEntity("event/{event}", new Object[]{"activeOnly", activeOnly}, eventHandlerList, event);
}

/**
* Removes the event handler definition from the conductor server
*
* @param name the name of the event handler to be unregistered
*/
public void unregisterEventHandler(String name) {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "Event handler name cannot be blank");
delete("event/{name}", name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.netflix.conductor.client.http;

import com.netflix.conductor.common.metadata.events.EventHandler;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.config.ClientConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.springframework.test.context.junit4.SpringRunner;

import java.net.URI;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


@RunWith(SpringRunner.class)
public class EventClientTest {

@Mock
private ClientHandler clientHandler;

@Mock
private ClientConfig clientConfig;

private EventClient eventClient;

@Before
public void before() {
this.eventClient = new EventClient(clientConfig, clientHandler);
this.eventClient.setRootURI("http://myuri:8080/");
}

@Test
public void testRegisterEventHandler() {
EventHandler eventHandler = mock(EventHandler.class);
when(clientHandler.handle(argThat(argument ->
argument.getURI().equals(URI.create("http://myuri:8080/event")))))
.thenReturn(mock(ClientResponse.class));
eventClient.registerEventHandler(eventHandler);
verify(clientHandler).handle(any());
}

@Test
public void testUpdateEventHandler() {
EventHandler eventHandler = mock(EventHandler.class);
when(clientHandler.handle(argThat(argument ->
argument.getURI().equals(URI.create("http://myuri:8080/event")))))
.thenReturn(mock(ClientResponse.class));
eventClient.updateEventHandler(eventHandler);
verify(clientHandler).handle(any());
}

@Test
public void testGetEventHandlers() {
when(clientHandler.handle(argThat(argument ->
argument.getURI().equals(URI.create("http://myuri:8080/event/test?activeOnly=true")))))
.thenReturn(mock(ClientResponse.class));
eventClient.getEventHandlers("test", true);
verify(clientHandler).handle(any());
}

@Test
public void testUnregisterEventHandler() {
when(clientHandler.handle(argThat(argument ->
argument.getURI().equals(URI.create("http://myuri:8080/event/test")))))
.thenReturn(mock(ClientResponse.class));
eventClient.unregisterEventHandler("test");
verify(clientHandler).handle(any());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.netflix.conductor.client.grpc;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.grpc.EventServiceGrpc;
import com.netflix.conductor.grpc.EventServicePb;
import com.netflix.conductor.proto.EventHandlerPb;
import org.apache.commons.lang3.StringUtils;

import java.util.Iterator;

public class EventClient extends ClientBase {

private final EventServiceGrpc.EventServiceBlockingStub stub;

public EventClient(String address, int port) {
super(address, port);
this.stub = EventServiceGrpc.newBlockingStub(this.channel);
}

/**
* Register an event handler with the server
*
* @param eventHandler the event handler definition
*/
public void registerEventHandler(EventHandler eventHandler) {
Preconditions.checkNotNull(eventHandler, "Event handler definition cannot be null");
stub.addEventHandler(
EventServicePb.AddEventHandlerRequest.newBuilder()
.setHandler(protoMapper.toProto(eventHandler))
.build()
);
}

/**
* Updates an existing event handler
*
* @param eventHandler the event handler to be updated
*/
public void updateEventHandler(EventHandler eventHandler) {
Preconditions.checkNotNull(eventHandler, "Event handler definition cannot be null");
stub.updateEventHandler(
EventServicePb.UpdateEventHandlerRequest.newBuilder()
.setHandler(protoMapper.toProto(eventHandler))
.build()
);
}

/**
* @param event name of the event
* @param activeOnly if true, returns only the active handlers
* @return Returns the list of all the event handlers for a given event
*/
public Iterator<EventHandler> getEventHandlers(String event, boolean activeOnly) {
Preconditions.checkArgument(StringUtils.isNotBlank(event), "Event cannot be blank");

EventServicePb.GetEventHandlersForEventRequest.Builder request =
EventServicePb.GetEventHandlersForEventRequest.newBuilder()
.setEvent(event)
.setActiveOnly(activeOnly);
Iterator<EventHandlerPb.EventHandler> it = stub.getEventHandlersForEvent(request.build());
return Iterators.transform(it, protoMapper::fromProto);
}

/**
* Removes the event handler from the conductor server
*
* @param name the name of the event handler
*/
public void unregisterEventHandler(String name) {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "Name cannot be blank");
stub.removeEventHandler(EventServicePb.RemoveEventHandlerRequest.newBuilder()
.setName(name)
.build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.netflix.conductor.client.grpc;

import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.grpc.EventServiceGrpc;
import com.netflix.conductor.grpc.EventServicePb;
import com.netflix.conductor.grpc.ProtoMapper;
import com.netflix.conductor.proto.EventHandlerPb;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static junit.framework.TestCase.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(SpringRunner.class)
public class EventClientTest {

@Mock
ProtoMapper mockedProtoMapper;

@Mock
EventServiceGrpc.EventServiceBlockingStub mockedStub;

EventClient eventClient;

@Before
public void init() {
eventClient = new EventClient("test", 0);
ReflectionTestUtils.setField(eventClient, "stub", mockedStub);
ReflectionTestUtils.setField(eventClient, "protoMapper", mockedProtoMapper);
}

@Test
public void testRegisterEventHandler() {
EventHandler eventHandler = mock(EventHandler.class);
EventHandlerPb.EventHandler eventHandlerPB = mock(EventHandlerPb.EventHandler.class);
when(mockedProtoMapper.toProto(eventHandler)).thenReturn(eventHandlerPB);

EventServicePb.AddEventHandlerRequest request = EventServicePb.AddEventHandlerRequest.newBuilder()
.setHandler(eventHandlerPB)
.build();
eventClient.registerEventHandler(eventHandler);
verify(mockedStub, times(1)).addEventHandler(request);
}

@Test
public void testUpdateEventHandler() {
EventHandler eventHandler = mock(EventHandler.class);
EventHandlerPb.EventHandler eventHandlerPB = mock(EventHandlerPb.EventHandler.class);
when(mockedProtoMapper.toProto(eventHandler)).thenReturn(eventHandlerPB);

EventServicePb.UpdateEventHandlerRequest request = EventServicePb.UpdateEventHandlerRequest.newBuilder()
.setHandler(eventHandlerPB)
.build();
eventClient.updateEventHandler(eventHandler);
verify(mockedStub, times(1)).updateEventHandler(request);
}

@Test
public void testGetEventHandlers() {
EventHandler eventHandler = mock(EventHandler.class);
EventHandlerPb.EventHandler eventHandlerPB = mock(EventHandlerPb.EventHandler.class);
when(mockedProtoMapper.fromProto(eventHandlerPB)).thenReturn(eventHandler);
EventServicePb.GetEventHandlersForEventRequest request = EventServicePb.GetEventHandlersForEventRequest.newBuilder()
.setEvent("test")
.setActiveOnly(true)
.build();
List<EventHandlerPb.EventHandler> result = new ArrayList<>();
result.add(eventHandlerPB);
when(mockedStub.getEventHandlersForEvent(request)).thenReturn(result.iterator());
Iterator<EventHandler> response = eventClient.getEventHandlers("test", true);
verify(mockedStub, times(1)).getEventHandlersForEvent(request);
assertEquals(response.next(), eventHandler);
}

@Test
public void testUnregisterEventHandler() {
EventServicePb.RemoveEventHandlerRequest request = EventServicePb.RemoveEventHandlerRequest.newBuilder()
.setName("test")
.build();
eventClient.unregisterEventHandler("test");
verify(mockedStub, times(1)).removeEventHandler(request);
}
}
Loading

0 comments on commit 0af6a11

Please sign in to comment.