forked from commercetools/commercetools-sync-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClientConfigurationUtils.java
119 lines (108 loc) · 5.71 KB
/
ClientConfigurationUtils.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.commercetools.sync.commons.utils;
import io.sphere.sdk.client.BlockingSphereClient;
import io.sphere.sdk.client.QueueSphereClientDecorator;
import io.sphere.sdk.client.RetrySphereClientDecorator;
import io.sphere.sdk.client.SphereAccessTokenSupplier;
import io.sphere.sdk.client.SphereClient;
import io.sphere.sdk.client.SphereClientConfig;
import io.sphere.sdk.http.AsyncHttpClientAdapter;
import io.sphere.sdk.http.HttpClient;
import io.sphere.sdk.retry.RetryAction;
import io.sphere.sdk.retry.RetryPredicate;
import io.sphere.sdk.retry.RetryRule;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static io.sphere.sdk.http.HttpStatusCode.BAD_GATEWAY_502;
import static io.sphere.sdk.http.HttpStatusCode.GATEWAY_TIMEOUT_504;
import static io.sphere.sdk.http.HttpStatusCode.SERVICE_UNAVAILABLE_503;
public class ClientConfigurationUtils {
private static HttpClient httpClient;
private static final long DEFAULT_TIMEOUT = 30000;
private static final TimeUnit DEFAULT_TIMEOUT_TIME_UNIT = TimeUnit.MILLISECONDS;
private static Map<SphereClientConfig, SphereClient> delegatesCache = new HashMap<>();
/**
* Creates a {@link BlockingSphereClient} with a custom {@code timeout} with a custom {@link TimeUnit}.
*
* @param clientConfig the client configuration for the client.
* @param timeout the timeout value for the client requests.
* @param timeUnit the timeout time unit.
* @return the instantiated {@link BlockingSphereClient}.
*/
public static synchronized SphereClient createClient(@Nonnull final SphereClientConfig clientConfig,
final long timeout,
@Nonnull final TimeUnit timeUnit) {
if (!delegatesCache.containsKey(clientConfig)) {
final HttpClient httpClient = getHttpClient();
final SphereAccessTokenSupplier tokenSupplier =
SphereAccessTokenSupplier.ofAutoRefresh(clientConfig, httpClient, false);
final SphereClient underlying = SphereClient.of(clientConfig, httpClient, tokenSupplier);
final SphereClient retryClient = withRetry(underlying);
final SphereClient limitedParallelRequestsClient = withLimitedParallelRequests(retryClient);
delegatesCache.put(clientConfig, limitedParallelRequestsClient);
}
return BlockingSphereClient.of(delegatesCache.get(clientConfig), timeout, timeUnit);
}
/**
* Creates a {@link BlockingSphereClient} with a default {@code timeout} value of 30 seconds.
*
* @param clientConfig the client configuration for the client.
* @return the instantiated {@link BlockingSphereClient}.
*/
public static SphereClient createClient(@Nonnull final SphereClientConfig clientConfig) {
return createClient(clientConfig, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT_TIME_UNIT);
}
/**
* Gets an asynchronous {@link HttpClient} to be used by the {@link BlockingSphereClient}.
* Client is created during first invocation and then cached.
*
* @return {@link HttpClient}
*/
private static synchronized HttpClient getHttpClient() {
if (httpClient == null) {
final AsyncHttpClient asyncHttpClient =
new DefaultAsyncHttpClient(
new DefaultAsyncHttpClientConfig.Builder().setAcceptAnyCertificate(true)
.setHandshakeTimeout((int) DEFAULT_TIMEOUT)
.build());
httpClient = AsyncHttpClientAdapter.of(asyncHttpClient);
}
return httpClient;
}
private static SphereClient withRetry(final SphereClient delegate) {
final int maxAttempts = 5;
final RetryAction scheduledRetry = RetryAction
.ofScheduledRetry(maxAttempts, context -> calculateVariableDelay(context.getAttempt()));
final RetryPredicate http5xxMatcher = RetryPredicate
.ofMatchingStatusCodes(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504);
final List<RetryRule> retryRules = Collections.singletonList(RetryRule.of(http5xxMatcher, scheduledRetry));
return RetrySphereClientDecorator.of(delegate, retryRules);
}
/**
* Computes a variable delay in seconds (grows with attempts count with a random component).
*
* @param triedAttempts the number of attempts already tried by the client.
* @return a computed variable delay in seconds, that grows with the number of attempts with a random component.
*/
private static Duration calculateVariableDelay(final long triedAttempts) {
final long timeoutInSeconds = TimeUnit.SECONDS.convert(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
final long randomNumberInRange = getRandomNumberInRange(50, timeoutInSeconds);
final long timeoutMultipliedByTriedAttempts = timeoutInSeconds * triedAttempts;
return Duration.ofSeconds(timeoutMultipliedByTriedAttempts + randomNumberInRange);
}
private static long getRandomNumberInRange(final long min, final long max) {
return new Random().longs(min, (max + 1)).limit(1).findFirst().getAsLong();
}
private static SphereClient withLimitedParallelRequests(final SphereClient delegate) {
final int maxParallelRequests = 20;
return QueueSphereClientDecorator.of(delegate, maxParallelRequests);
}
}