Skip to content

Commit

Permalink
Merge pull request #221 from ropalka/WEJBHTTP-140_round1
Browse files Browse the repository at this point in the history
[WEJBHTTP-140] First round of refactorings in TXN part - focus on client side of the protocol
  • Loading branch information
ropalka authored Sep 25, 2024
2 parents 497ec1c + 36add25 commit ff6ca6d
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.wildfly.httpclient.transaction;

import io.undertow.client.ClientRequest;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.Marshalling;
import org.wildfly.httpclient.common.HttpTargetContext;
Expand All @@ -39,13 +37,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.wildfly.httpclient.common.Protocol.VERSION_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.EXCEPTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.TXN_CONTEXT;
import static org.wildfly.httpclient.transaction.TransactionConstants.UT_COMMIT_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.UT_ROLLBACK_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID;

/**
* Represents a remote transaction that is managed over HTTP protocol.
*
Expand Down Expand Up @@ -87,14 +78,12 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
}
final CompletableFuture<Void> result = new CompletableFuture<>();
statusRef.set(Status.STATUS_COMMITTING);
ClientRequest cr = new ClientRequest()
.setMethod(Methods.POST)
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH +
targetContext.getProtocolVersion() + UT_COMMIT_PATH);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
cr.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(cr, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(cr).createMarshaller();

final RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.UT_COMMIT).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());

targetContext.sendRequest(request, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(request).createMarshaller();
marshaller.start(Marshalling.createByteOutput(output));
marshaller.writeInt(id.getFormatId());
final byte[] gtid = id.getGlobalTransactionId();
Expand Down Expand Up @@ -157,14 +146,12 @@ public void rollback() throws SecurityException, SystemException {

final CompletableFuture<Void> result = new CompletableFuture<>();
statusRef.set(Status.STATUS_COMMITTING);
ClientRequest cr = new ClientRequest()
.setMethod(Methods.POST)
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion()
+ UT_ROLLBACK_PATH);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
cr.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(cr, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(cr).createMarshaller();

final RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.UT_ROLLBACK).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());

targetContext.sendRequest(request, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(request).createMarshaller();
marshaller.start(Marshalling.createByteOutput(output));
marshaller.writeInt(id.getFormatId());
final byte[] gtid = id.getGlobalTransactionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.wildfly.httpclient.transaction;

import io.undertow.client.ClientRequest;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import org.jboss.marshalling.InputStreamByteInput;
import org.jboss.marshalling.Unmarshaller;
import org.wildfly.httpclient.common.HttpTargetContext;
Expand All @@ -43,16 +41,7 @@
import java.util.concurrent.ExecutionException;

import static java.security.AccessController.doPrivileged;
import static org.wildfly.httpclient.common.Protocol.VERSION_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.EXCEPTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.NEW_TRANSACTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.RECOVERY_FLAGS;
import static org.wildfly.httpclient.transaction.TransactionConstants.RECOVERY_PARENT_NAME;
import static org.wildfly.httpclient.transaction.TransactionConstants.TIMEOUT;
import static org.wildfly.httpclient.transaction.TransactionConstants.TXN_CONTEXT;
import static org.wildfly.httpclient.transaction.TransactionConstants.UT_BEGIN_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_RECOVER_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID_LIST;

/**
* @author Stuart Douglas
Expand Down Expand Up @@ -87,13 +76,8 @@ public SubordinateTransactionControl lookupXid(Xid xid) throws XAException {
public Xid[] recover(int flag, String parentName) throws XAException {
final CompletableFuture<Xid[]> xidList = new CompletableFuture<>();

ClientRequest cr = new ClientRequest()
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion() +
XA_RECOVER_PATH + "/" + parentName)
.setMethod(Methods.GET);
cr.getRequestHeaders().put(Headers.ACCEPT, XID_LIST + "," + NEW_TRANSACTION);
cr.getRequestHeaders().put(RECOVERY_PARENT_NAME, parentName);
cr.getRequestHeaders().put(RECOVERY_FLAGS, Integer.toString(flag));
final RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.XA_RECOVER).setVersion(targetContext.getProtocolVersion()).setFlags(flag).setParent(parentName);
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());

final AuthenticationConfiguration authenticationConfiguration = getAuthenticationConfiguration(targetContext.getUri());
final SSLContext sslContext;
Expand All @@ -105,9 +89,9 @@ public Xid[] recover(int flag, String parentName) throws XAException {
throw xaException;
}

targetContext.sendRequest(cr, sslContext, authenticationConfiguration,null, (result, response, closeable) -> {
targetContext.sendRequest(request, sslContext, authenticationConfiguration,null, (result, response, closeable) -> {
try {
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(cr).createUnmarshaller();
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(request).createUnmarshaller();
unmarshaller.start(new InputStreamByteInput(result));
int length = unmarshaller.readInt();
Xid[] ret = new Xid[length];
Expand Down Expand Up @@ -149,12 +133,8 @@ public Xid[] recover(int flag, String parentName) throws XAException {
public SimpleTransactionControl begin(int timeout) throws SystemException {
final CompletableFuture<Xid> beginXid = new CompletableFuture<>();

ClientRequest cr = new ClientRequest()
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH +
targetContext.getProtocolVersion() + UT_BEGIN_PATH)
.setMethod(Methods.POST);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION + "," + NEW_TRANSACTION);
cr.getRequestHeaders().put(TIMEOUT, timeout);
final RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.UT_BEGIN).setVersion(targetContext.getProtocolVersion()).setTimeout(timeout);
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());

final AuthenticationConfiguration authenticationConfiguration = getAuthenticationConfiguration(targetContext.getUri());
final SSLContext sslContext;
Expand All @@ -164,9 +144,9 @@ public SimpleTransactionControl begin(int timeout) throws SystemException {
throw new SystemException(e.getMessage());
}

targetContext.sendRequest(cr, sslContext, authenticationConfiguration, null, (result, response, closeable) -> {
targetContext.sendRequest(request, sslContext, authenticationConfiguration, null, (result, response, closeable) -> {
try {
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(cr).createUnmarshaller();
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(request).createUnmarshaller();
unmarshaller.start(new InputStreamByteInput(result));
int formatId = unmarshaller.readInt();
int len = unmarshaller.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.wildfly.httpclient.transaction;

import static org.wildfly.httpclient.transaction.RequestType.XA_BEFORE_COMPLETION;
import static org.wildfly.httpclient.transaction.RequestType.XA_COMMIT;
import static org.wildfly.httpclient.transaction.RequestType.XA_FORGET;
import static org.wildfly.httpclient.transaction.RequestType.XA_PREPARE;
import static org.wildfly.httpclient.transaction.RequestType.XA_ROLLBACK;
import static org.wildfly.httpclient.transaction.TransactionConstants.READ_ONLY;

import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.Marshalling;
import org.wildfly.httpclient.common.HttpTargetContext;
Expand All @@ -37,17 +42,6 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import static org.wildfly.httpclient.common.Protocol.VERSION_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.EXCEPTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.READ_ONLY;
import static org.wildfly.httpclient.transaction.TransactionConstants.TXN_CONTEXT;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_BC_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_COMMIT_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_FORGET_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_PREP_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_ROLLBACK_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID;

/**
* Represents a remote subordinate transaction that is managed over HTTP protocol.
*
Expand All @@ -73,13 +67,12 @@ Xid getId() {

@Override
public void commit(boolean onePhase) throws XAException {
String operationPath = XA_COMMIT_PATH + (onePhase ? "?opc=true" : "");
processOperation(operationPath);
processOperation(XA_COMMIT, null, onePhase ? Boolean.TRUE : null);
}

@Override
public void rollback() throws XAException {
processOperation(XA_ROLLBACK_PATH);
processOperation(XA_ROLLBACK);
}

@Override
Expand All @@ -89,36 +82,34 @@ public void end(int flags) throws XAException {

@Override
public void beforeCompletion() throws XAException {
processOperation(XA_BC_PATH);
processOperation(XA_BEFORE_COMPLETION);
}

@Override
public int prepare() throws XAException {
boolean readOnly = processOperation(XA_PREP_PATH, (result) -> {
boolean readOnly = processOperation(XA_PREPARE, (result) -> {
String header = result.getResponseHeaders().getFirst(READ_ONLY);
return header != null && Boolean.parseBoolean(header);
});
}, null);
return readOnly ? XAResource.XA_RDONLY : XAResource.XA_OK;
}

@Override
public void forget() throws XAException {
processOperation(XA_FORGET_PATH);
processOperation(XA_FORGET);
}

private void processOperation(String operationPath) throws XAException {
processOperation(operationPath, null);
private void processOperation(RequestType requestType) throws XAException {
processOperation(requestType, null, null);
}

private <T> T processOperation(String operationPath, Function<ClientResponse, T> resultFunction) throws XAException {
private <T> T processOperation(RequestType requestType, Function<ClientResponse, T> resultFunction, Boolean onePhase) throws XAException {
final CompletableFuture<T> result = new CompletableFuture<>();
ClientRequest cr = new ClientRequest()
.setMethod(Methods.POST)
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion() + operationPath);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
cr.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(cr, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(cr).createMarshaller();

final RequestBuilder builder = new RequestBuilder().setRequestType(requestType).setVersion(targetContext.getProtocolVersion()).setOnePhase(onePhase);
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());
targetContext.sendRequest(request, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(request).createMarshaller();
marshaller.start(Marshalling.createByteOutput(output));
marshaller.writeInt(id.getFormatId());
final byte[] gtid = id.getGlobalTransactionId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2024 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wildfly.httpclient.transaction;

import static io.undertow.util.Headers.ACCEPT;
import static io.undertow.util.Headers.CONTENT_TYPE;
import static java.net.URLEncoder.encode;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.wildfly.httpclient.common.Protocol.VERSION_PATH;
import static org.wildfly.httpclient.transaction.RequestType.UT_BEGIN;
import static org.wildfly.httpclient.transaction.RequestType.XA_COMMIT;
import static org.wildfly.httpclient.transaction.RequestType.XA_RECOVER;
import static org.wildfly.httpclient.transaction.TransactionConstants.EXCEPTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.NEW_TRANSACTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.RECOVERY_FLAGS;
import static org.wildfly.httpclient.transaction.TransactionConstants.RECOVERY_PARENT_NAME;
import static org.wildfly.httpclient.transaction.TransactionConstants.TIMEOUT;
import static org.wildfly.httpclient.transaction.TransactionConstants.TXN_CONTEXT;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID_LIST;

import io.undertow.client.ClientRequest;
import io.undertow.util.HeaderMap;
import org.wildfly.httpclient.common.Protocol;

/**
* HTTP TXN module client request builder. Encapsulates all information needed to create HTTP TXN client requests.
* Use setter methods (those returning {@link RequestBuilder}) to configure the builder.
* Once configured {@link #createRequest(String)} method must be called to build HTTP client request.
*
* @author <a href="mailto:[email protected]">Richard Opalka</a>
*/
final class RequestBuilder {

private RequestType requestType;
private int version = Protocol.LATEST;
private int timeout;
private int flags;
private String parentName;
private Boolean onePhase;

// setters

RequestBuilder setRequestType(final RequestType requestType) {
this.requestType = requestType;
return this;
}

RequestBuilder setVersion(final int version) {
this.version = version;
return this;
}

RequestBuilder setTimeout(final int timeout) {
this.timeout = timeout;
return this;
}

RequestBuilder setFlags(final int flags) {
this.flags = flags;
return this;
}

RequestBuilder setOnePhase(final Boolean onePhase) {
this.onePhase = onePhase;
return this;
}

RequestBuilder setParent(final String parentName) {
this.parentName = parentName;
return this;
}

// helper methods

ClientRequest createRequest(final String prefix) {
final ClientRequest clientRequest = new ClientRequest();
setRequestMethod(clientRequest);
setRequestPath(clientRequest, prefix);
setRequestHeaders(clientRequest);
return clientRequest;
}

private void setRequestMethod(final ClientRequest request) {
request.setMethod(requestType.getMethod());
}

private void setRequestPath(final ClientRequest request, final String prefix) {
final StringBuilder sb = new StringBuilder();
if (prefix != null) {
sb.append(prefix);
}
appendPath(sb, TXN_CONTEXT, false);
appendPath(sb, VERSION_PATH + version, false);
appendPath(sb, requestType.getPath(), false);
if (requestType == XA_COMMIT) {
sb.append(onePhase != null && onePhase ? "?opc=true" : "");
} else if (requestType == XA_RECOVER) {
appendPath(sb, parentName, false);
}
request.setPath(sb.toString());
}


private void setRequestHeaders(final ClientRequest request) {
final HeaderMap headers = request.getRequestHeaders();
if (requestType == UT_BEGIN) {
headers.put(ACCEPT, EXCEPTION + "," + NEW_TRANSACTION);
headers.put(TIMEOUT, timeout);
} else if (requestType == XA_RECOVER) {
headers.put(ACCEPT, XID_LIST + "," + NEW_TRANSACTION);
headers.put(RECOVERY_PARENT_NAME, parentName);
headers.put(RECOVERY_FLAGS, Integer.toString(flags));
} else {
headers.add(ACCEPT, EXCEPTION.toString());
headers.put(CONTENT_TYPE, XID.toString());
}
}

private static void appendPath(final StringBuilder sb, final String path, final boolean encode) {
if (!path.startsWith("/")) {
sb.append("/");
}
sb.append(encode ? encode(path, UTF_8) : path);
}

}
Loading

0 comments on commit ff6ca6d

Please sign in to comment.