Skip to content

Commit

Permalink
Added metrics and removed retryHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
patduin committed Oct 1, 2024
1 parent 873ffbe commit ea2c450
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [3.13.3] - 2024-10-01
### Added
- Metric for monitoring open transports. `<prefix>.open_transports_gauge`
### Changed
- Removed RetryingHMSHandler. Retries are done in the client there should be no need to wrap everything in retry logic again.

## [3.13.2] - 2024-07-23
### Fix
- Add HiveConf cache to `CloseableThriftHiveMetastoreIfaceClientFactory` to prevent threads block. See [#325](https://github.com/ExpediaGroup/waggle-dance/issues/325)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TSocket;
Expand Down Expand Up @@ -61,24 +59,16 @@ public TProcessor getProcessor(TTransport transport) {

boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
if (useSASL) {
IHMSHandler tokenHandler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(tokenHandler), hiveConf,
false);
IHMSHandler handler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
return new TSetIpAddressProcessor<>(handler);
} else {
IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(baseHandler), hiveConf,
false);
IHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
transportMonitor.monitor(transport, baseHandler);
return new TSetIpAddressProcessor<>(handler);
}
} catch (MetaException | ReflectiveOperationException | RuntimeException e) {
} catch (ReflectiveOperationException | RuntimeException e) {
throw new RuntimeException("Error creating TProcessor", e);
}
}

private IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hiveConf, boolean local)
throws MetaException {
return RetryingHMSHandler.getProxy(hiveConf, baseHandler, local);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import org.springframework.stereotype.Component;

import com.google.common.annotations.VisibleForTesting;

import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

@Component
public class TTransportMonitor {

static final String METRIC_NAME_OPEN_TRANSPORTS = "open_transports_gauge";
private static final Logger LOG = LoggerFactory.getLogger(TTransportMonitor.class);

private static class ActionContainer {
Expand All @@ -53,13 +56,17 @@ private ActionContainer(TTransport transport, Closeable action) {
private final ConcurrentLinkedQueue<ActionContainer> transports = new ConcurrentLinkedQueue<>();

@Autowired
public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration) {
this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1));
public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, MeterRegistry meterRegistry) {
this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1), meterRegistry);
}

@VisibleForTesting
TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, ScheduledExecutorService scheduler) {
TTransportMonitor(
WaggleDanceConfiguration waggleDanceConfiguration,
ScheduledExecutorService scheduler,
MeterRegistry meterRegistry) {
this.scheduler = scheduler;
Gauge.builder(METRIC_NAME_OPEN_TRANSPORTS, transports, ConcurrentLinkedQueue::size).register(meterRegistry);
Runnable monitor = () -> {
LOG.debug("Releasing disconnected sessions");
Iterator<ActionContainer> iterator = transports.iterator();
Expand All @@ -80,6 +87,7 @@ public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration) {
}
iterator.remove();
}
LOG.info("Number of open transports (#connections clients -> WD ): {}", transports.size());
};
this.scheduler
.scheduleAtFixedRate(monitor, waggleDanceConfiguration.getDisconnectConnectionDelay(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@

import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

@RunWith(MockitoJUnitRunner.class)
public class TTransportMonitorTest {

Expand All @@ -52,21 +55,25 @@ public class TTransportMonitorTest {
private @Mock TTransport transport;
private @Mock Closeable action;
private @Mock ScheduledExecutorService scheduler;
private MeterRegistry meterRegistry;

private TTransportMonitor monitor;


@Before
public void init() {
meterRegistry = new SimpleMeterRegistry();
when(waggleDanceConfiguration.getDisconnectConnectionDelay()).thenReturn((int) DEFAULT_DELAY);
when(waggleDanceConfiguration.getDisconnectTimeUnit()).thenReturn(MILLISECONDS);
monitor = new TTransportMonitor(waggleDanceConfiguration, scheduler);
monitor = new TTransportMonitor(waggleDanceConfiguration, scheduler, meterRegistry);
verify(scheduler).scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
}

@Test
public void initialization() throws Exception {
assertThat(runnableCaptor.getValue(), is(notNullValue()));
verify(scheduler).scheduleAtFixedRate(runnableCaptor.getValue(), DEFAULT_DELAY, DEFAULT_DELAY, MILLISECONDS);
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

@Test
Expand All @@ -76,6 +83,7 @@ public void shouldNotDisconnect() throws Exception {
runnableCaptor.getValue().run();
verify(transport, never()).close();
verify(action, never()).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(1.0));
}

@Test
Expand All @@ -85,6 +93,7 @@ public void shouldDisconnect() throws Exception {
runnableCaptor.getValue().run();
verify(transport).close();
verify(action).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

@Test
Expand All @@ -95,6 +104,7 @@ public void shouldDisconnectWhenTransportThrowsException() throws Exception {
runnableCaptor.getValue().run();
verify(transport).close();
verify(action).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

@Test
Expand All @@ -105,6 +115,7 @@ public void shouldDisconnectWhenActionThrowsException() throws Exception {
runnableCaptor.getValue().run();
verify(transport).close();
verify(action).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<!-- WaggleDance Integration Tests (WD_IT) log (through hive dependencies) via log4j (not log4j2.xml which controls the logging of the started WaggleDanceRunner) -->
<param name="ConversionPattern" value="%d{ISO8601} WD_IT %tn %-5p %c:%L - %m%n" />
<param name="ConversionPattern" value="%d{ISO8601} WD_IT %t %-5p %c:%L - %m%n" />
</layout>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<Configuration>
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %tn %-5p %c:%L - %m%n"/>
<PatternLayout pattern="%d{ISO8601} %t %-5p %c:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
Expand Down

0 comments on commit ea2c450

Please sign in to comment.