Skip to content

Commit

Permalink
feat: dataspace protocol negotiation dispatcher (eclipse-edc#2780)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Ronja Quensel <[email protected]>
  • Loading branch information
2 people authored and majadlymhmd committed May 10, 2023
1 parent 28a667b commit 61a7cd2
Show file tree
Hide file tree
Showing 26 changed files with 1,407 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ dependencies {
api(project(":spi:common:catalog-spi"))

api(libs.jakartaJson)

testImplementation(testFixtures(project(":data-protocols:dsp:dsp-http-spi")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.edc.jsonld.spi.transformer.JsonLdTransformerRegistry;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.CatalogRequestHttpDelegate;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcher;
import org.eclipse.edc.protocol.dsp.spi.serialization.JsonLdRemoteMessageSerializer;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -37,6 +38,8 @@ public class DspCatalogHttpDispatcherExtension implements ServiceExtension {
@Inject
private DspHttpRemoteMessageDispatcher messageDispatcher;
@Inject
private JsonLdRemoteMessageSerializer remoteMessageSerializer;
@Inject
private TypeManager typeManager;
@Inject
private JsonLdTransformerRegistry transformerRegistry;
Expand All @@ -48,7 +51,7 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerDelegate(new CatalogRequestHttpDelegate(typeManager.getMapper(TYPE_MANAGER_CONTEXT_JSON_LD), transformerRegistry));
messageDispatcher.registerDelegate(new CatalogRequestHttpDelegate(remoteMessageSerializer, typeManager.getMapper(TYPE_MANAGER_CONTEXT_JSON_LD), transformerRegistry));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,23 @@

package org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.eclipse.edc.catalog.spi.Catalog;
import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.jsonld.spi.transformer.JsonLdTransformerRegistry;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpDispatcherDelegate;
import org.eclipse.edc.protocol.dsp.spi.serialization.JsonLdRemoteMessageSerializer;
import org.eclipse.edc.spi.EdcException;

import java.io.IOException;
import java.util.function.Function;

import static java.lang.String.format;
import static java.lang.String.join;
import static org.eclipse.edc.jsonld.util.JsonLdUtil.compact;
import static org.eclipse.edc.jsonld.util.JsonLdUtil.expand;
import static org.eclipse.edc.protocol.dsp.catalog.spi.CatalogApiPaths.BASE_PATH;
import static org.eclipse.edc.protocol.dsp.catalog.spi.CatalogApiPaths.CATALOG_REQUEST;
Expand All @@ -43,14 +40,13 @@
/**
* Delegate for dispatching catalog requests as defined in the dataspace protocol specification.
*/
public class CatalogRequestHttpDelegate implements DspHttpDispatcherDelegate<CatalogRequestMessage, Catalog> {

private static final String APPLICATION_JSON = "application/json";
public class CatalogRequestHttpDelegate extends DspHttpDispatcherDelegate<CatalogRequestMessage, Catalog> {

private final ObjectMapper mapper;
private final JsonLdTransformerRegistry transformerRegistry;

public CatalogRequestHttpDelegate(ObjectMapper mapper, JsonLdTransformerRegistry transformerRegistry) {
public CatalogRequestHttpDelegate(JsonLdRemoteMessageSerializer serializer, ObjectMapper mapper, JsonLdTransformerRegistry transformerRegistry) {
super(serializer);
this.mapper = mapper;
this.transformerRegistry = transformerRegistry;
}
Expand All @@ -70,12 +66,7 @@ public Class<CatalogRequestMessage> getMessageType() {
*/
@Override
public Request buildRequest(CatalogRequestMessage message) {
var requestBody = RequestBody.create(toJson(message), MediaType.get(APPLICATION_JSON));

return new Request.Builder()
.url(message.getCallbackAddress() + BASE_PATH + CATALOG_REQUEST)
.post(requestBody)
.build();
return buildRequest(message, BASE_PATH + CATALOG_REQUEST, jsonLdContext());
}

/**
Expand Down Expand Up @@ -104,19 +95,6 @@ public Function<Response, Catalog> parseResponse() {
}
};
}

private String toJson(CatalogRequestMessage message) {
try {
var transformResult = transformerRegistry.transform(message, JsonObject.class);
if (transformResult.succeeded()) {
var compacted = compact(transformResult.getContent(), jsonLdContext());
return mapper.writeValueAsString(compacted);
}
throw new EdcException(format("Failed to write request: %s", join(", ", transformResult.getFailureMessages())));
} catch (JsonProcessingException e) {
throw new EdcException("Failed to serialize catalog request", e);
}
}

private JsonObject jsonLdContext() {
return Json.createObjectBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@

package org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.Buffer;
import org.eclipse.edc.catalog.spi.Catalog;
import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.jsonld.spi.transformer.JsonLdTransformerRegistry;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpDispatcherDelegate;
import org.eclipse.edc.protocol.dsp.spi.testfixtures.dispatcher.DspHttpDispatcherDelegateTestBase;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
Expand All @@ -37,27 +35,21 @@
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT;
import static org.eclipse.edc.protocol.dsp.catalog.spi.CatalogApiPaths.BASE_PATH;
import static org.eclipse.edc.protocol.dsp.catalog.spi.CatalogApiPaths.CATALOG_REQUEST;
import static org.eclipse.edc.protocol.dsp.catalog.transform.DspCatalogPropertyAndTypeNames.DSPACE_PREFIX;
import static org.eclipse.edc.protocol.dsp.catalog.transform.DspCatalogPropertyAndTypeNames.DSPACE_SCHEMA;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class CatalogRequestMessageHttpDelegateTest {

private final ObjectMapper mapper = mock(ObjectMapper.class);
private final JsonLdTransformerRegistry registry = mock(JsonLdTransformerRegistry.class);
class CatalogRequestMessageHttpDelegateTest extends DspHttpDispatcherDelegateTestBase<CatalogRequestMessage> {

private CatalogRequestHttpDelegate delegate;

@BeforeEach
void setUp() {
delegate = new CatalogRequestHttpDelegate(mapper, registry);
delegate = new CatalogRequestHttpDelegate(serializer, mapper, registry);
}

@Test
Expand All @@ -67,34 +59,12 @@ void getMessageType_returnCatalogRequest() {

@Test
void buildRequest_returnRequest() throws IOException {
var jsonObject = Json.createObjectBuilder()
.add(DSPACE_SCHEMA + "key", "value")
.build();
var serializedBody = "catalog request";

when(registry.transform(isA(CatalogRequestMessage.class), eq(JsonObject.class)))
.thenReturn(Result.success(jsonObject));
when(mapper.writeValueAsString(any(JsonObject.class))).thenReturn(serializedBody);

var message = getCatalogRequest();
var httpRequest = delegate.buildRequest(message);

assertThat(httpRequest.url().url()).hasToString(message.getCallbackAddress() + BASE_PATH + CATALOG_REQUEST);
assertThat(readRequestBody(httpRequest)).isEqualTo(serializedBody);

verify(registry, times(1))
.transform(argThat(requestMessage -> ((CatalogRequestMessage) requestMessage).getQuerySpec().equals(message.getQuerySpec())), eq(JsonObject.class));
verify(mapper, times(1))
.writeValueAsString(argThat(json -> ((JsonObject) json).get(CONTEXT) != null && ((JsonObject) json).get(DSPACE_PREFIX + ":key") != null));
testBuildRequest_shouldReturnRequest(message(), BASE_PATH + CATALOG_REQUEST);
}

@Test
void buildRequest_transformationFails_throwException() {
when(registry.transform(isA(CatalogRequestMessage.class), eq(JsonObject.class)))
.thenReturn(Result.failure("error"));

var message = getCatalogRequest();
assertThatThrownBy(() -> delegate.buildRequest(message)).isInstanceOf(EdcException.class);
void buildRequest_serializationFails_throwException() {
testBuildRequest_shouldThrowException_whenSerializationFails(message());
}

@Test
Expand Down Expand Up @@ -133,23 +103,12 @@ void parseResponse_transformationFails_throwException() throws IOException {

@Test
void parseResponse_readingResponseBodyFails_throwException() throws IOException {
var response = mock(Response.class);
var responseBody = mock(ResponseBody.class);

when(response.body()).thenReturn(responseBody);
when(responseBody.bytes()).thenReturn("test".getBytes());
when(mapper.readValue(any(byte[].class), eq(JsonObject.class))).thenThrow(IOException.class);

assertThatThrownBy(() -> delegate.parseResponse().apply(response)).isInstanceOf(EdcException.class);
testParseResponse_shouldThrowException_whenReadingResponseBodyFails();
}

@Test
void parseResponse_responseBodyNull_throwException() throws IOException {
var response = mock(Response.class);

when(response.body()).thenReturn(null);

assertThatThrownBy(() -> delegate.parseResponse().apply(response)).isInstanceOf(EdcException.class);
void parseResponse_responseBodyNull_throwException() {
testParseResponse_shouldThrowException_whenResponseBodyNull();
}

@Test
Expand All @@ -175,19 +134,17 @@ private JsonObject getJsonObject() {
.build();
}

private CatalogRequestMessage getCatalogRequest() {
private CatalogRequestMessage message() {
return CatalogRequestMessage.Builder.newInstance()
.callbackAddress("http://connector")
.connectorId("connector-id")
.protocol("protocol")
.querySpec(QuerySpec.max())
.build();
}

private String readRequestBody(Request request) throws IOException {
var buffer = new Buffer();
request.body().writeTo(buffer);
return buffer.readUtf8();

@Override
protected DspHttpDispatcherDelegate<CatalogRequestMessage, ?> delegate() {
return delegate;
}

}
2 changes: 2 additions & 0 deletions data-protocols/dsp/dsp-http-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ plugins {

dependencies {
api(project(":spi:common:http-spi"))
api(project(":spi:common:json-ld-spi"))
api(project(":extensions:common:json-ld"))
api(project(":data-protocols:dsp:dsp-http-spi"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@

package org.eclipse.edc.protocol.dsp;

import org.eclipse.edc.jsonld.spi.transformer.JsonLdTransformerRegistry;
import org.eclipse.edc.protocol.dsp.dispatcher.DspHttpRemoteMessageDispatcherImpl;
import org.eclipse.edc.protocol.dsp.serialization.JsonLdRemoteMessageSerializerImpl;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcher;
import org.eclipse.edc.protocol.dsp.spi.serialization.JsonLdRemoteMessageSerializer;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.types.TypeManager;

import static org.eclipse.edc.jsonld.JsonLdExtension.TYPE_MANAGER_CONTEXT_JSON_LD;

/**
* Provides an implementation of {@link DspHttpRemoteMessageDispatcher} to support sending dataspace
Expand All @@ -40,6 +46,10 @@ public class DspHttpCoreExtension implements ServiceExtension {
private EdcHttpClient httpClient;
@Inject
private IdentityService identityService;
@Inject
private JsonLdTransformerRegistry transformerRegistry;
@Inject
private TypeManager typeManager;

@Override
public String name() {
Expand All @@ -52,5 +62,10 @@ public DspHttpRemoteMessageDispatcher dspHttpRemoteMessageDispatcher() {
dispatcherRegistry.register(dispatcher);
return dispatcher;
}

@Provider
public JsonLdRemoteMessageSerializer jsonLdRemoteMessageSerializer() {
return new JsonLdRemoteMessageSerializerImpl(transformerRegistry, typeManager.getMapper(TYPE_MANAGER_CONTEXT_JSON_LD));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023 Fraunhofer Institute for Software and Systems Engineering
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer Institute for Software and Systems Engineering - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.serialization;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.JsonObject;
import org.eclipse.edc.jsonld.spi.transformer.JsonLdTransformerRegistry;
import org.eclipse.edc.protocol.dsp.spi.serialization.JsonLdRemoteMessageSerializer;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;

import static java.lang.String.format;
import static java.lang.String.join;
import static org.eclipse.edc.jsonld.util.JsonLdUtil.compact;

/**
* Serializes {@link RemoteMessage}s to JSON-LD.
*/
public class JsonLdRemoteMessageSerializerImpl implements JsonLdRemoteMessageSerializer {

private JsonLdTransformerRegistry registry;
private ObjectMapper mapper;

public JsonLdRemoteMessageSerializerImpl(JsonLdTransformerRegistry registry, ObjectMapper mapper) {
this.registry = registry;
this.mapper = mapper;
}

/**
* Serializes a {@link RemoteMessage} to JSON-LD. The message is first transformed using the
* {@link JsonLdTransformerRegistry}, then the resulting JSON-LD structure is compacted using
* the given JSON-LD context before returning it as a string.
*
* @param message the message to serialize
* @param jsonLdContext the JSON-LD context
* @return the serialized message
*/
@Override
public String serialize(RemoteMessage message, JsonObject jsonLdContext) {
try {
var transformResult = registry.transform(message, JsonObject.class);
if (transformResult.succeeded()) {
var compacted = compact(transformResult.getContent(), jsonLdContext);
return mapper.writeValueAsString(compacted);
}
throw new EdcException(format("Failed to transform %s: %s", message.getClass().getSimpleName(), join(", ", transformResult.getFailureMessages())));
} catch (JsonProcessingException e) {
throw new EdcException(format("Failed to serialize %s", message.getClass().getSimpleName()), e);
}
}
}
Loading

0 comments on commit 61a7cd2

Please sign in to comment.