diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dca1c13a..4b00760f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## [3.13.3] - 2024-10-01 +### Added +- Metric for monitoring open transports. `.open_transports_gauge` +### Changed +- Removed RetryingHMSHandler. Retries are done in the client there should be no need to wrap everything in retry logic again. + ## [3.13.2] - 2024-07-23 ### Fix - Add HiveConf cache to `CloseableThriftHiveMetastoreIfaceClientFactory` to prevent threads block. See [#325](https://github.com/ExpediaGroup/waggle-dance/issues/325) diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java index 8a771e059..dcebcdaaa 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java @@ -19,9 +19,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IHMSHandler; -import org.apache.hadoop.hive.metastore.RetryingHMSHandler; import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.transport.TSocket; @@ -61,24 +59,16 @@ public TProcessor getProcessor(TTransport transport) { boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); if (useSASL) { - IHMSHandler tokenHandler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL); - IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(tokenHandler), hiveConf, - false); + IHMSHandler handler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL); return new TSetIpAddressProcessor<>(handler); } else { - IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(baseHandler), hiveConf, - false); + IHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler); transportMonitor.monitor(transport, baseHandler); return new TSetIpAddressProcessor<>(handler); } - } catch (MetaException | ReflectiveOperationException | RuntimeException e) { + } catch (ReflectiveOperationException | RuntimeException e) { throw new RuntimeException("Error creating TProcessor", e); } } - private IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hiveConf, boolean local) - throws MetaException { - return RetryingHMSHandler.getProxy(hiveConf, baseHandler, local); - } - } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TTransportMonitor.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TTransportMonitor.java index ac717eac7..bf99f3cdc 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TTransportMonitor.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TTransportMonitor.java @@ -31,12 +31,15 @@ import org.springframework.stereotype.Component; import com.google.common.annotations.VisibleForTesting; - import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + @Component public class TTransportMonitor { + static final String METRIC_NAME_OPEN_TRANSPORTS = "open_transports_gauge"; private static final Logger LOG = LoggerFactory.getLogger(TTransportMonitor.class); private static class ActionContainer { @@ -53,13 +56,17 @@ private ActionContainer(TTransport transport, Closeable action) { private final ConcurrentLinkedQueue transports = new ConcurrentLinkedQueue<>(); @Autowired - public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration) { - this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1)); + public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, MeterRegistry meterRegistry) { + this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1), meterRegistry); } @VisibleForTesting - TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, ScheduledExecutorService scheduler) { + TTransportMonitor( + WaggleDanceConfiguration waggleDanceConfiguration, + ScheduledExecutorService scheduler, + MeterRegistry meterRegistry) { this.scheduler = scheduler; + Gauge.builder(METRIC_NAME_OPEN_TRANSPORTS, transports, ConcurrentLinkedQueue::size).register(meterRegistry); Runnable monitor = () -> { LOG.debug("Releasing disconnected sessions"); Iterator iterator = transports.iterator(); @@ -80,6 +87,7 @@ public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration) { } iterator.remove(); } + LOG.info("Number of open transports (#connections clients -> WD ): {}", transports.size()); }; this.scheduler .scheduleAtFixedRate(monitor, waggleDanceConfiguration.getDisconnectConnectionDelay(), diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/TTransportMonitorTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/TTransportMonitorTest.java index b43c71c3e..c69c7dc06 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/TTransportMonitorTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/TTransportMonitorTest.java @@ -41,6 +41,9 @@ import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + @RunWith(MockitoJUnitRunner.class) public class TTransportMonitorTest { @@ -52,14 +55,17 @@ public class TTransportMonitorTest { private @Mock TTransport transport; private @Mock Closeable action; private @Mock ScheduledExecutorService scheduler; + private MeterRegistry meterRegistry; private TTransportMonitor monitor; + @Before public void init() { + meterRegistry = new SimpleMeterRegistry(); when(waggleDanceConfiguration.getDisconnectConnectionDelay()).thenReturn((int) DEFAULT_DELAY); when(waggleDanceConfiguration.getDisconnectTimeUnit()).thenReturn(MILLISECONDS); - monitor = new TTransportMonitor(waggleDanceConfiguration, scheduler); + monitor = new TTransportMonitor(waggleDanceConfiguration, scheduler, meterRegistry); verify(scheduler).scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); } @@ -67,6 +73,7 @@ public void init() { public void initialization() throws Exception { assertThat(runnableCaptor.getValue(), is(notNullValue())); verify(scheduler).scheduleAtFixedRate(runnableCaptor.getValue(), DEFAULT_DELAY, DEFAULT_DELAY, MILLISECONDS); + assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0)); } @Test @@ -76,6 +83,7 @@ public void shouldNotDisconnect() throws Exception { runnableCaptor.getValue().run(); verify(transport, never()).close(); verify(action, never()).close(); + assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(1.0)); } @Test @@ -85,6 +93,7 @@ public void shouldDisconnect() throws Exception { runnableCaptor.getValue().run(); verify(transport).close(); verify(action).close(); + assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0)); } @Test @@ -95,6 +104,7 @@ public void shouldDisconnectWhenTransportThrowsException() throws Exception { runnableCaptor.getValue().run(); verify(transport).close(); verify(action).close(); + assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0)); } @Test @@ -105,6 +115,7 @@ public void shouldDisconnectWhenActionThrowsException() throws Exception { runnableCaptor.getValue().run(); verify(transport).close(); verify(action).close(); + assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0)); } } diff --git a/waggle-dance-integration-tests/src/test/resources/log4j.xml b/waggle-dance-integration-tests/src/test/resources/log4j.xml index 470f6f1ae..cfdea49a0 100644 --- a/waggle-dance-integration-tests/src/test/resources/log4j.xml +++ b/waggle-dance-integration-tests/src/test/resources/log4j.xml @@ -20,7 +20,7 @@ - + diff --git a/waggle-dance-integration-tests/src/test/resources/log4j2.xml b/waggle-dance-integration-tests/src/test/resources/log4j2.xml index 9764318a2..ea10a660e 100644 --- a/waggle-dance-integration-tests/src/test/resources/log4j2.xml +++ b/waggle-dance-integration-tests/src/test/resources/log4j2.xml @@ -17,7 +17,7 @@ - +