Skip to content

Commit

Permalink
Skip non-existent MBeans in JmxRecordSetProvider
Browse files Browse the repository at this point in the history
When dynamic catalogs are enabled, and multiple catalogs use the same
connector, querying a metric associated with the connector may return
results only from the coordinator if not all catalogs have been used
yet. This happens because the corresponding MBeans haven't been
registered on the workers.

With this change, the query will return results from workers for
catalogs that have already been used and whose MBeans have been
registered, while gracefully skipping non-existent MBeans.
  • Loading branch information
piotrrzysko committed Feb 5, 2025
1 parent 3b53eee commit 7d3606e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.type.Type;

import javax.management.Attribute;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
Expand Down Expand Up @@ -228,7 +229,12 @@ private List<List<Object>> getLiveRows(JmxTableHandle tableHandle, List<? extend
{
ImmutableList.Builder<List<Object>> rows = ImmutableList.builder();
for (String objectName : tableHandle.objectNames()) {
rows.add(getLiveRow(objectName, columns, 0));
try {
rows.add(getLiveRow(objectName, columns, 0));
}
catch (InstanceNotFoundException _) {
// Ignore if the object doesn't exist. This might happen when it exists on the coordinator but has not yet been created on the worker.
}
}
return rows.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
Expand All @@ -43,6 +42,7 @@
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -51,6 +51,7 @@
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.jmx.JmxMetadata.HISTORY_SCHEMA_NAME;
import static io.trino.plugin.jmx.JmxMetadata.JMX_SCHEMA_NAME;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.type.TimestampWithTimeZoneType.createTimestampWithTimeZoneType;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static io.trino.testing.TestingConnectorSession.SESSION;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void testPredicatePushdown()
TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(createUnboundedVarcharType(), utf8Slice(nodeIdentifier))));
JmxTableHandle tableHandle = new JmxTableHandle(new SchemaTableName("schema", "tableName"), ImmutableList.of("objectName"), ImmutableList.of(columnHandle), true, nodeTupleDomain);

ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, DynamicFilter.EMPTY, Constraint.alwaysTrue());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, DynamicFilter.EMPTY, alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);

assertThat(allSplits).hasSize(1);
Expand All @@ -126,7 +127,7 @@ public void testNoPredicate()
throws Exception
{
JmxTableHandle tableHandle = new JmxTableHandle(new SchemaTableName("schema", "tableName"), ImmutableList.of("objectName"), ImmutableList.of(columnHandle), true, TupleDomain.all());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, DynamicFilter.EMPTY, Constraint.alwaysTrue());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, DynamicFilter.EMPTY, alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertThat(allSplits).hasSize(nodes.size());

Expand Down Expand Up @@ -156,6 +157,45 @@ public void testRecordSetProvider()
}
}

@Test
public void testNonExistentObjectName()
throws Exception
{
JmxTableHandle jmxTableHandle = metadata.listTables(SESSION, Optional.of(JMX_SCHEMA_NAME)).stream()
.map(schemaTableName -> metadata.getTableHandle(SESSION, schemaTableName, Optional.empty(), Optional.empty()))
.filter(Objects::nonNull)
.filter(tableHandle -> !tableHandle.objectNames().isEmpty())
.findFirst()
.orElseThrow();

ImmutableList<String> objectNamesWithUnknowns = ImmutableList.<String>builder()
.addAll(jmxTableHandle.objectNames())
.add("JMImplementation:type=Unknown")
.build();
JmxTableHandle tableHandleWithUnknownObject = new JmxTableHandle(
jmxTableHandle.tableName(),
objectNamesWithUnknowns,
jmxTableHandle.columnHandles(),
jmxTableHandle.liveData(),
jmxTableHandle.nodeFilter());

List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, tableHandleWithUnknownObject).values());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandleWithUnknownObject, DynamicFilter.EMPTY, alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
ConnectorSplit split = allSplits.getFirst();

RecordSet recordSet = recordSetProvider.getRecordSet(JmxTransactionHandle.INSTANCE, SESSION, split, tableHandleWithUnknownObject, columnHandles);

int count = 0;
try (RecordCursor cursor = recordSet.cursor()) {
while (cursor.advanceNextPosition()) {
count++;
}
}

assertThat(count).isEqualTo(objectNamesWithUnknowns.size() - 1);
}

@Test
public void testHistoryRecordSetProvider()
throws Exception
Expand Down Expand Up @@ -202,7 +242,7 @@ private RecordSet getRecordSet(SchemaTableName schemaTableName)
JmxTableHandle tableHandle = metadata.getTableHandle(SESSION, schemaTableName, Optional.empty(), Optional.empty());
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, tableHandle).values());

ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, DynamicFilter.EMPTY, Constraint.alwaysTrue());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, DynamicFilter.EMPTY, alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertThat(allSplits).hasSize(nodes.size());
ConnectorSplit split = allSplits.get(0);
Expand Down

0 comments on commit 7d3606e

Please sign in to comment.