Skip to content

Commit

Permalink
Added metrics and removed retryHandler (#330)
Browse files Browse the repository at this point in the history
* Added metrics and removed retryHandler



* added namespace to the metric



---------

Co-authored-by: Javier Sánchez Beltrán <[email protected]>
  • Loading branch information
patduin and javsanbel2 authored Oct 2, 2024
1 parent 873ffbe commit 37804e0
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 27 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [3.13.3] - 2024-10-02
### Added
- Metric for monitoring open transports. `<prefix>.open_transports_gauge`
### Changed
- Removed RetryingHMSHandler. Retries are done in the client there should be no need to wrap everything in retry logic again.

## [3.13.2] - 2024-07-23
### Fix
- Add HiveConf cache to `CloseableThriftHiveMetastoreIfaceClientFactory` to prevent threads block. See [#325](https://github.com/ExpediaGroup/waggle-dance/issues/325)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TSocket;
Expand Down Expand Up @@ -61,24 +59,16 @@ public TProcessor getProcessor(TTransport transport) {

boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
if (useSASL) {
IHMSHandler tokenHandler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(tokenHandler), hiveConf,
false);
IHMSHandler handler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
return new TSetIpAddressProcessor<>(handler);
} else {
IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(baseHandler), hiveConf,
false);
IHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
transportMonitor.monitor(transport, baseHandler);
return new TSetIpAddressProcessor<>(handler);
}
} catch (MetaException | ReflectiveOperationException | RuntimeException e) {
} catch (ReflectiveOperationException | RuntimeException e) {
throw new RuntimeException("Error creating TProcessor", e);
}
}

private IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hiveConf, boolean local)
throws MetaException {
return RetryingHMSHandler.getProxy(hiveConf, baseHandler, local);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,13 +30,17 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

import com.google.common.annotations.VisibleForTesting;

import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;

@Component
public class TTransportMonitor {

static final String METRIC_NAME_OPEN_TRANSPORTS = "com_hotels_bdp_waggledance_open_transports_gauge";
private static final Logger LOG = LoggerFactory.getLogger(TTransportMonitor.class);

private static class ActionContainer {
Expand All @@ -53,13 +57,17 @@ private ActionContainer(TTransport transport, Closeable action) {
private final ConcurrentLinkedQueue<ActionContainer> transports = new ConcurrentLinkedQueue<>();

@Autowired
public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration) {
this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1));
public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, MeterRegistry meterRegistry) {
this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1), meterRegistry);
}

@VisibleForTesting
TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, ScheduledExecutorService scheduler) {
TTransportMonitor(
WaggleDanceConfiguration waggleDanceConfiguration,
ScheduledExecutorService scheduler,
MeterRegistry meterRegistry) {
this.scheduler = scheduler;
Gauge.builder(METRIC_NAME_OPEN_TRANSPORTS, transports, ConcurrentLinkedQueue::size).register(meterRegistry);
Runnable monitor = () -> {
LOG.debug("Releasing disconnected sessions");
Iterator<ActionContainer> iterator = transports.iterator();
Expand All @@ -80,6 +88,7 @@ public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration) {
}
iterator.remove();
}
LOG.info("Number of open transports (#connections clients -> WD ): {}", transports.size());
};
this.scheduler
.scheduleAtFixedRate(monitor, waggleDanceConfiguration.getDisconnectConnectionDelay(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2021 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@
import org.mockito.junit.MockitoJUnitRunner;

import com.google.common.collect.Lists;

import com.hotels.bdp.waggledance.api.WaggleDanceException;

@RunWith(MockitoJUnitRunner.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ public class TSetIpAddressProcessorFactoryTest {
@Before
public void init() {
when(federatedHMSHandlerFactory.create()).thenReturn(federatedHMSHandler);
when(federatedHMSHandler.getConf()).thenReturn(hiveConf);
factory = new TSetIpAddressProcessorFactory(hiveConf, federatedHMSHandlerFactory, transportMonitor);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,9 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -52,21 +55,25 @@ public class TTransportMonitorTest {
private @Mock TTransport transport;
private @Mock Closeable action;
private @Mock ScheduledExecutorService scheduler;
private MeterRegistry meterRegistry;

private TTransportMonitor monitor;


@Before
public void init() {
meterRegistry = new SimpleMeterRegistry();
when(waggleDanceConfiguration.getDisconnectConnectionDelay()).thenReturn((int) DEFAULT_DELAY);
when(waggleDanceConfiguration.getDisconnectTimeUnit()).thenReturn(MILLISECONDS);
monitor = new TTransportMonitor(waggleDanceConfiguration, scheduler);
monitor = new TTransportMonitor(waggleDanceConfiguration, scheduler, meterRegistry);
verify(scheduler).scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
}

@Test
public void initialization() throws Exception {
assertThat(runnableCaptor.getValue(), is(notNullValue()));
verify(scheduler).scheduleAtFixedRate(runnableCaptor.getValue(), DEFAULT_DELAY, DEFAULT_DELAY, MILLISECONDS);
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

@Test
Expand All @@ -76,6 +83,7 @@ public void shouldNotDisconnect() throws Exception {
runnableCaptor.getValue().run();
verify(transport, never()).close();
verify(action, never()).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(1.0));
}

@Test
Expand All @@ -85,6 +93,7 @@ public void shouldDisconnect() throws Exception {
runnableCaptor.getValue().run();
verify(transport).close();
verify(action).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

@Test
Expand All @@ -95,6 +104,7 @@ public void shouldDisconnectWhenTransportThrowsException() throws Exception {
runnableCaptor.getValue().run();
verify(transport).close();
verify(action).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

@Test
Expand All @@ -105,6 +115,7 @@ public void shouldDisconnectWhenActionThrowsException() throws Exception {
runnableCaptor.getValue().run();
verify(transport).close();
verify(action).close();
assertThat(meterRegistry.get(TTransportMonitor.METRIC_NAME_OPEN_TRANSPORTS).gauge().value(), is(0.0));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<!-- WaggleDance Integration Tests (WD_IT) log (through hive dependencies) via log4j (not log4j2.xml which controls the logging of the started WaggleDanceRunner) -->
<param name="ConversionPattern" value="%d{ISO8601} WD_IT %tn %-5p %c:%L - %m%n" />
<param name="ConversionPattern" value="%d{ISO8601} WD_IT %t %-5p %c:%L - %m%n" />
</layout>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<Configuration>
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %tn %-5p %c:%L - %m%n"/>
<PatternLayout pattern="%d{ISO8601} %t %-5p %c:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
Expand Down

0 comments on commit 37804e0

Please sign in to comment.