Skip to content

Commit

Permalink
Fix remaining reactor-netty issues
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexProgrammerDE committed Feb 24, 2024
1 parent 4c0feb8 commit a03ea96
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 49 deletions.
78 changes: 29 additions & 49 deletions src/main/java/net/pistonmaster/soulfire/account/HttpHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import java.io.IOException;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -34,13 +32,14 @@
import net.lenni0451.commons.httpclient.executor.RequestExecutor;
import net.lenni0451.commons.httpclient.requests.HttpContentRequest;
import net.lenni0451.commons.httpclient.requests.HttpRequest;
import net.lenni0451.commons.httpclient.utils.HttpRequestUtils;
import net.lenni0451.commons.httpclient.utils.URLWrapper;
import net.pistonmaster.soulfire.builddata.BuildData;
import net.pistonmaster.soulfire.proxy.SFProxy;
import net.raphimc.minecraftauth.MinecraftAuth;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.transport.ProxyProvider;

Expand Down Expand Up @@ -110,6 +109,8 @@ public HttpResponse execute(@NotNull HttpRequest httpRequest) {
var cookieManager = getCookieManager(httpRequest);
try {
log.debug("Executing request: {}", httpRequest.getURL());
var requestHeaders = getHeaders(httpRequest, cookieManager);

var base =
createReactorClient(proxyData, false)
.followRedirect(
Expand All @@ -119,65 +120,45 @@ public HttpResponse execute(@NotNull HttpRequest httpRequest) {
case IGNORE -> false;
})
.responseTimeout(Duration.ofMillis(client.getReadTimeout()))
.headers(
h -> {
h.clear();
try {
getHeaders(httpRequest, cookieManager).forEach(h::add);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.request(HttpMethod.valueOf(httpRequest.getMethod().toUpperCase(Locale.ROOT)))
.headers(h -> requestHeaders.forEach((k, v) -> h.set(k, String.join("; ", v))))
.request(HttpMethod.valueOf(httpRequest.getMethod()))
.uri(httpRequest.getURL().toURI());

reactor.netty.http.client.HttpClient.ResponseReceiver<?> receiver;
if (httpRequest instanceof HttpContentRequest contentRequest) {
receiver =
base.send(
ByteBufFlux.fromString(
Flux.just(
Objects.requireNonNull(contentRequest.getContent()).getAsString())));
ByteBufFlux.fromInbound(
Flux.just(Objects.requireNonNull(contentRequest.getContent()).getAsBytes())));
} else {
receiver = base;
}

var result = receiver
return receiver
.responseSingle(
(res, content) ->
content
(res, content) -> {
try {
var code = res.status().code();
var url = new URLWrapper(Objects.requireNonNull(res.resourceUrl()));
var urlObj = url.toURL();
var responseHeaders = getAsMap(res.responseHeaders());

HttpRequestUtils.updateCookies(cookieManager, url.toURL(), responseHeaders);

return content
.asByteArray()
.publishOn(Schedulers.boundedElastic())
.<HttpResponse>handle((bytes, sink) -> {
try {
var url = new URLWrapper(Objects.requireNonNull(res.resourceUrl()));
var headers = getAsMap(res.responseHeaders());

if (cookieManager != null) {
try {
cookieManager.put(url.toURI(), headers);
} catch (IOException e) {
sink.error(e);
}
}

sink.next(new HttpResponse(
url.toURL(),
res.status().code(),
bytes,
headers
));
} catch (MalformedURLException e) {
sink.error(e);
}
}))
.mapNotNull(
bytes -> new HttpResponse(
urlObj, code, bytes, responseHeaders))
.switchIfEmpty(Mono.just(new HttpResponse(urlObj, code, null, responseHeaders)));
} catch (Exception e) {
log.error("Error while handling response", e);
return Mono.error(e);
}
})
.blockOptional()
.orElseThrow();

log.debug("Request executed: {}", httpRequest.getURL());
return result;
} catch (Exception e) {
log.error("Error while executing request", e);
throw new IOException(e);
}
}
Expand All @@ -187,7 +168,6 @@ private static Map<String, List<String>> getAsMap(HttpHeaders headers) {
return headers.entries().stream()
.collect(
Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private MinecraftAccount fromStringSingle(String data, AuthType authType) {
try {
return authType.authService().createDataAndLogin(data, null);
} catch (IOException e) {
log.error("Failed to load account from string", e);
throw new RuntimeException(e);
}
}
Expand Down

0 comments on commit a03ea96

Please sign in to comment.