diff --git a/plugin/trino-opa/pom.xml b/plugin/trino-opa/pom.xml
index 739d4dfe45b48..27b1d5526fafb 100644
--- a/plugin/trino-opa/pom.xml
+++ b/plugin/trino-opa/pom.xml
@@ -121,11 +121,19 @@
test
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
io.trino
trino-testing
test
+
org.assertj
assertj-core
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControl.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControl.java
index 4da0f63d6c8b8..fafd8a9d8cd16 100644
--- a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControl.java
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControl.java
@@ -22,6 +22,7 @@
import io.trino.plugin.opa.schema.OpaQueryInput;
import io.trino.plugin.opa.schema.OpaQueryInputAction;
import io.trino.plugin.opa.schema.OpaQueryInputResource;
+import io.trino.plugin.opa.schema.OpaViewExpression;
import io.trino.plugin.opa.schema.TrinoCatalogSessionProperty;
import io.trino.plugin.opa.schema.TrinoFunction;
import io.trino.plugin.opa.schema.TrinoGrantPrincipal;
@@ -40,15 +41,19 @@
import io.trino.spi.security.SystemAccessControl;
import io.trino.spi.security.SystemSecurityContext;
import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.security.ViewExpression;
+import io.trino.spi.type.Type;
import java.security.Principal;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.opa.OpaHighLevelClient.buildQueryInputForSimpleResource;
@@ -709,6 +714,23 @@ public void checkCanDropFunction(SystemSecurityContext systemSecurityContext, Ca
OpaQueryInputResource.builder().function(TrinoFunction.fromTrinoFunction(functionName)).build());
}
+ @Override
+ public List getRowFilters(SystemSecurityContext context, CatalogSchemaTableName tableName)
+ {
+ List rowFilterExpressions = opaHighLevelClient.getRowFilterExpressionsFromOpa(buildQueryContext(context), tableName);
+ return rowFilterExpressions.stream()
+ .map(expression -> expression.toTrinoViewExpression(tableName.getCatalogName(), tableName.getSchemaTableName().getSchemaName()))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public Optional getColumnMask(SystemSecurityContext context, CatalogSchemaTableName tableName, String columnName, Type type)
+ {
+ return opaHighLevelClient
+ .getColumnMaskFromOpa(buildQueryContext(context), tableName, columnName, type)
+ .map(expression -> expression.toTrinoViewExpression(tableName.getCatalogName(), tableName.getSchemaTableName().getSchemaName()));
+ }
+
private void checkTableOperation(SystemSecurityContext context, String actionName, CatalogSchemaTableName table, Consumer deny)
{
opaHighLevelClient.queryAndEnforce(
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControlFactory.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControlFactory.java
index e23d2838f9656..6aa6f45dc7739 100644
--- a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControlFactory.java
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaAccessControlFactory.java
@@ -22,9 +22,11 @@
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.http.client.HttpClient;
import io.airlift.json.JsonModule;
+import io.trino.plugin.opa.schema.OpaColumnMaskQueryResult;
import io.trino.plugin.opa.schema.OpaPluginContext;
import io.trino.plugin.opa.schema.OpaQuery;
import io.trino.plugin.opa.schema.OpaQueryResult;
+import io.trino.plugin.opa.schema.OpaRowFiltersQueryResult;
import io.trino.spi.security.SystemAccessControl;
import io.trino.spi.security.SystemAccessControlFactory;
@@ -71,6 +73,8 @@ protected static SystemAccessControl create(Map config, Optional
binder -> {
jsonCodecBinder(binder).bindJsonCodec(OpaQuery.class);
jsonCodecBinder(binder).bindJsonCodec(OpaQueryResult.class);
+ jsonCodecBinder(binder).bindJsonCodec(OpaRowFiltersQueryResult.class);
+ jsonCodecBinder(binder).bindJsonCodec(OpaColumnMaskQueryResult.class);
httpClient.ifPresentOrElse(
client -> binder.bind(Key.get(HttpClient.class, ForOpa.class)).toInstance(client),
() -> httpClientBinder(binder).bindHttpClient("opa", ForOpa.class));
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaConfig.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaConfig.java
index 675e02b8c7dcc..d215c8c0b046e 100644
--- a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaConfig.java
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaConfig.java
@@ -28,6 +28,8 @@ public class OpaConfig
private boolean logRequests;
private boolean logResponses;
private boolean allowPermissioningOperations;
+ private Optional opaRowFiltersUri = Optional.empty();
+ private Optional opaColumnMaskingUri = Optional.empty();
@NotNull
public URI getOpaUri()
@@ -43,6 +45,7 @@ public OpaConfig setOpaUri(@NotNull URI opaUri)
return this;
}
+ @NotNull
public Optional getOpaBatchUri()
{
return opaBatchUri;
@@ -94,4 +97,32 @@ public OpaConfig setAllowPermissioningOperations(boolean allowPermissioningOpera
this.allowPermissioningOperations = allowPermissioningOperations;
return this;
}
+
+ @NotNull
+ public Optional getOpaRowFiltersUri()
+ {
+ return opaRowFiltersUri;
+ }
+
+ @Config("opa.policy.row-filters-uri")
+ @ConfigDescription("URI for fetching row filters - if not set no row filtering will be applied")
+ public OpaConfig setOpaRowFiltersUri(@NotNull URI opaRowFiltersUri)
+ {
+ this.opaRowFiltersUri = Optional.ofNullable(opaRowFiltersUri);
+ return this;
+ }
+
+ @NotNull
+ public Optional getOpaColumnMaskingUri()
+ {
+ return opaColumnMaskingUri;
+ }
+
+ @Config("opa.policy.column-masking-uri")
+ @ConfigDescription("URI for fetching column masks - if not set no masking will be applied")
+ public OpaConfig setOpaColumnMaskingUri(URI opaColumnMaskingUri)
+ {
+ this.opaColumnMaskingUri = Optional.ofNullable(opaColumnMaskingUri);
+ return this;
+ }
}
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaHighLevelClient.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaHighLevelClient.java
index 81a99ddde5dc4..054e187f97a39 100644
--- a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaHighLevelClient.java
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/OpaHighLevelClient.java
@@ -13,17 +13,27 @@
*/
package io.trino.plugin.opa;
+import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
+import io.trino.plugin.opa.schema.OpaColumnMaskQueryResult;
import io.trino.plugin.opa.schema.OpaQueryContext;
import io.trino.plugin.opa.schema.OpaQueryInput;
import io.trino.plugin.opa.schema.OpaQueryInputAction;
import io.trino.plugin.opa.schema.OpaQueryInputResource;
import io.trino.plugin.opa.schema.OpaQueryResult;
+import io.trino.plugin.opa.schema.OpaRowFiltersQueryResult;
+import io.trino.plugin.opa.schema.OpaViewExpression;
+import io.trino.plugin.opa.schema.TrinoColumn;
+import io.trino.plugin.opa.schema.TrinoTable;
+import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.security.AccessDeniedException;
+import io.trino.spi.type.Type;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
@@ -32,18 +42,28 @@
public class OpaHighLevelClient
{
private final JsonCodec queryResultCodec;
- private final URI opaPolicyUri;
+ private final JsonCodec rowFiltersQueryResultCodec;
+ private final JsonCodec columnMaskQueryResultCodec;
private final OpaHttpClient opaHttpClient;
+ private final URI opaPolicyUri;
+ private final Optional opaRowFiltersUri;
+ private final Optional opaColumnMaskingUri;
@Inject
public OpaHighLevelClient(
JsonCodec queryResultCodec,
+ JsonCodec rowFiltersQueryResultCodec,
+ JsonCodec columnMaskQueryResultCodec,
OpaHttpClient opaHttpClient,
OpaConfig config)
{
this.queryResultCodec = requireNonNull(queryResultCodec, "queryResultCodec is null");
+ this.rowFiltersQueryResultCodec = requireNonNull(rowFiltersQueryResultCodec, "rowFiltersQueryResultCodec is null");
+ this.columnMaskQueryResultCodec = requireNonNull(columnMaskQueryResultCodec, "columnMaskQueryResultCodec is null");
this.opaHttpClient = requireNonNull(opaHttpClient, "opaHttpClient is null");
this.opaPolicyUri = config.getOpaUri();
+ this.opaRowFiltersUri = config.getOpaRowFiltersUri();
+ this.opaColumnMaskingUri = config.getOpaColumnMaskingUri();
}
public boolean queryOpa(OpaQueryInput input)
@@ -105,6 +125,31 @@ public Set parallelFilterFromOpa(
return opaHttpClient.parallelFilterFromOpa(items, requestBuilder, opaPolicyUri, queryResultCodec);
}
+ public List getRowFilterExpressionsFromOpa(OpaQueryContext context, CatalogSchemaTableName table)
+ {
+ OpaQueryInput queryInput = new OpaQueryInput(
+ context,
+ OpaQueryInputAction.builder()
+ .operation("GetRowFilters")
+ .resource(OpaQueryInputResource.builder().table(new TrinoTable(table)).build())
+ .build());
+ return opaRowFiltersUri
+ .map(uri -> opaHttpClient.consumeOpaResponse(opaHttpClient.submitOpaRequest(queryInput, uri, rowFiltersQueryResultCodec)).result())
+ .orElse(ImmutableList.of());
+ }
+
+ public Optional getColumnMaskFromOpa(OpaQueryContext context, CatalogSchemaTableName table, String columnName, Type type)
+ {
+ OpaQueryInput queryInput = new OpaQueryInput(
+ context,
+ OpaQueryInputAction.builder()
+ .operation("GetColumnMask")
+ .resource(OpaQueryInputResource.builder().column(new TrinoColumn(table, columnName, type)).build())
+ .build());
+ return opaColumnMaskingUri
+ .flatMap(uri -> opaHttpClient.consumeOpaResponse(opaHttpClient.submitOpaRequest(queryInput, uri, columnMaskQueryResultCodec)).result());
+ }
+
public static OpaQueryInput buildQueryInputForSimpleResource(OpaQueryContext context, String operation, OpaQueryInputResource resource)
{
return new OpaQueryInput(context, OpaQueryInputAction.builder().operation(operation).resource(resource).build());
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaColumnMaskQueryResult.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaColumnMaskQueryResult.java
new file mode 100644
index 0000000000000..d19ac46c1c262
--- /dev/null
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaColumnMaskQueryResult.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa.schema;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.NotNull;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public record OpaColumnMaskQueryResult(@JsonProperty("decision_id") String decisionId, @NotNull Optional result)
+{
+ public OpaColumnMaskQueryResult
+ {
+ requireNonNull(result, "result is null");
+ }
+}
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaQueryInputResource.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaQueryInputResource.java
index 61820030882ef..a6468fa305a85 100644
--- a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaQueryInputResource.java
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaQueryInputResource.java
@@ -27,7 +27,8 @@ public record OpaQueryInputResource(
TrinoFunction function,
NamedEntity catalog,
TrinoSchema schema,
- TrinoTable table)
+ TrinoTable table,
+ TrinoColumn column)
{
public record NamedEntity(@NotNull String name)
{
@@ -51,6 +52,7 @@ public static class Builder
private TrinoSchema schema;
private TrinoTable table;
private TrinoFunction function;
+ private TrinoColumn column;
private Builder() {}
@@ -102,6 +104,12 @@ public Builder function(String functionName)
return this;
}
+ public Builder column(TrinoColumn column)
+ {
+ this.column = column;
+ return this;
+ }
+
public OpaQueryInputResource build()
{
return new OpaQueryInputResource(
@@ -111,7 +119,8 @@ public OpaQueryInputResource build()
this.function,
this.catalog,
this.schema,
- this.table);
+ this.table,
+ this.column);
}
}
}
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaRowFiltersQueryResult.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaRowFiltersQueryResult.java
new file mode 100644
index 0000000000000..98e6f5b5facdd
--- /dev/null
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaRowFiltersQueryResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa.schema;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import jakarta.validation.constraints.NotNull;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNullElse;
+
+public record OpaRowFiltersQueryResult(@JsonProperty("decision_id") String decisionId, @NotNull List result)
+{
+ public OpaRowFiltersQueryResult
+ {
+ result = ImmutableList.copyOf(requireNonNullElse(result, ImmutableList.of()));
+ }
+}
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaViewExpression.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaViewExpression.java
new file mode 100644
index 0000000000000..5d5bd228aa3f8
--- /dev/null
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/OpaViewExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa.schema;
+
+import io.trino.spi.security.ViewExpression;
+import jakarta.validation.constraints.NotNull;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public record OpaViewExpression(@NotNull String expression, @NotNull Optional identity)
+{
+ public OpaViewExpression
+ {
+ requireNonNull(expression, "expression is null");
+ requireNonNull(identity, "identity is null");
+ }
+
+ public ViewExpression toTrinoViewExpression(String catalogName, String schemaName)
+ {
+ ViewExpression.Builder builder = ViewExpression.builder()
+ .catalog(catalogName)
+ .schema(schemaName)
+ .expression(expression);
+ identity.ifPresent(builder::identity);
+ return builder.build();
+ }
+}
diff --git a/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/TrinoColumn.java b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/TrinoColumn.java
new file mode 100644
index 0000000000000..4f21cb04a20b6
--- /dev/null
+++ b/plugin/trino-opa/src/main/java/io/trino/plugin/opa/schema/TrinoColumn.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa.schema;
+
+import io.trino.spi.connector.CatalogSchemaTableName;
+import io.trino.spi.type.Type;
+import jakarta.validation.constraints.NotNull;
+
+import static java.util.Objects.requireNonNull;
+
+public record TrinoColumn(
+ @NotNull String catalogName,
+ @NotNull String schemaName,
+ @NotNull String tableName,
+ @NotNull String columnName,
+ @NotNull String columnType)
+{
+ public TrinoColumn
+ {
+ requireNonNull(catalogName, "catalogName is null");
+ requireNonNull(schemaName, "schemaName is null");
+ requireNonNull(tableName, "tableName is null");
+ requireNonNull(columnName, "columnName is null");
+ requireNonNull(columnType, "columnType is null");
+ }
+
+ public TrinoColumn(CatalogSchemaTableName tableName, String columnName, Type type)
+ {
+ this(tableName.getCatalogName(),
+ tableName.getSchemaTableName().getSchemaName(),
+ tableName.getSchemaTableName().getTableName(),
+ columnName,
+ type.getDisplayName());
+ }
+}
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/DistributedQueryRunnerHelper.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/DistributedQueryRunnerHelper.java
new file mode 100644
index 0000000000000..36a8fa737151e
--- /dev/null
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/DistributedQueryRunnerHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa;
+
+import io.trino.Session;
+import io.trino.spi.security.Identity;
+import io.trino.testing.DistributedQueryRunner;
+
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static io.trino.testing.TestingSession.testSessionBuilder;
+
+public final class DistributedQueryRunnerHelper
+{
+ private final DistributedQueryRunner runner;
+
+ private DistributedQueryRunnerHelper(DistributedQueryRunner runner)
+ {
+ this.runner = runner;
+ }
+
+ public static DistributedQueryRunnerHelper withOpaConfig(Map opaConfig)
+ throws Exception
+ {
+ return new DistributedQueryRunnerHelper(
+ DistributedQueryRunner.builder(testSessionBuilder().build())
+ .setSystemAccessControl(new OpaAccessControlFactory().create(opaConfig))
+ .setNodeCount(1)
+ .build());
+ }
+
+ public Set querySetOfStrings(String user, String query)
+ {
+ return querySetOfStrings(userSession(user), query);
+ }
+
+ public Set querySetOfStrings(Session session, String query)
+ {
+ return runner.execute(session, query).getMaterializedRows().stream().map(row -> row.getField(0) == null ? "" : row.getField(0).toString()).collect(toImmutableSet());
+ }
+
+ public DistributedQueryRunner getBaseQueryRunner()
+ {
+ return this.runner;
+ }
+
+ public void teardown()
+ {
+ if (this.runner != null) {
+ this.runner.close();
+ }
+ }
+
+ private static Session userSession(String user)
+ {
+ return testSessionBuilder().setIdentity(Identity.ofUser(user)).build();
+ }
+}
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/OpaContainer.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/OpaContainer.java
new file mode 100644
index 0000000000000..ddb8c4aaf6fce
--- /dev/null
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/OpaContainer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startable;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+
+public class OpaContainer
+ implements Startable
+{
+ private static final int OPA_PORT = 8181;
+ private static final String OPA_BASE_PATH = "v1/data/trino/";
+ private static final String OPA_POLICY_PUSH_BASE_PATH = "v1/policies/trino";
+
+ private final GenericContainer> container;
+ private URI resolvedUri;
+
+ public OpaContainer()
+ {
+ this.container = new GenericContainer<>(DockerImageName.parse("openpolicyagent/opa:latest-rootless"))
+ .withCommand("run", "--server", "--addr", ":%d".formatted(OPA_PORT))
+ .withExposedPorts(OPA_PORT)
+ .waitingFor(Wait.forListeningPort());
+ }
+
+ @Override
+ public synchronized void start()
+ {
+ this.container.start();
+ this.resolvedUri = null;
+ }
+
+ @Override
+ public synchronized void stop()
+ {
+ this.container.stop();
+ this.resolvedUri = null;
+ }
+
+ public synchronized URI getOpaServerUri()
+ {
+ if (!container.isRunning()) {
+ this.resolvedUri = null;
+ throw new IllegalStateException("Container is not running");
+ }
+ if (this.resolvedUri == null) {
+ this.resolvedUri = URI.create(String.format("http://%s:%d/", container.getHost(), container.getMappedPort(OPA_PORT)));
+ }
+ return this.resolvedUri;
+ }
+
+ public URI getOpaUriForPolicyPath(String relativePath)
+ {
+ return getOpaServerUri().resolve(OPA_BASE_PATH + relativePath);
+ }
+
+ public void submitPolicy(String... policyLines)
+ throws IOException, InterruptedException
+ {
+ HttpClient httpClient = HttpClient.newHttpClient();
+ HttpResponse policyResponse =
+ httpClient.send(
+ HttpRequest.newBuilder(getOpaServerUri().resolve(OPA_POLICY_PUSH_BASE_PATH))
+ .PUT(HttpRequest.BodyPublishers.ofString(String.join("\n", policyLines)))
+ .header("Content-Type", "text/plain").build(),
+ HttpResponse.BodyHandlers.ofString());
+ if (policyResponse.statusCode() != 200) {
+ throw new RuntimeException("Failed to submit policy: %s".formatted(policyResponse.body()));
+ }
+ }
+}
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestHelpers.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestHelpers.java
index d47e2a4280d86..ea3b36a5710cb 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestHelpers.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestHelpers.java
@@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+import io.airlift.configuration.ConfigurationMetadata;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.execution.QueryIdGenerator;
@@ -28,9 +29,11 @@
import org.junit.jupiter.api.Named;
import org.junit.jupiter.params.provider.Arguments;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -149,20 +152,60 @@ public static InstrumentedHttpClient createMockHttpClient(URI expectedUri, Funct
return new InstrumentedHttpClient(expectedUri, "POST", JSON_UTF_8.toString(), handler);
}
- public static OpaAccessControl createOpaAuthorizer(URI opaUri, InstrumentedHttpClient mockHttpClient)
+ public static OpaAccessControl createOpaAuthorizer(Map config, InstrumentedHttpClient mockHttpClient)
{
- return (OpaAccessControl) OpaAccessControlFactory.create(ImmutableMap.of("opa.policy.uri", opaUri.toString()), Optional.of(mockHttpClient), Optional.of(SYSTEM_ACCESS_CONTROL_CONTEXT));
+ return (OpaAccessControl) OpaAccessControlFactory.create(config, Optional.of(mockHttpClient), Optional.of(SYSTEM_ACCESS_CONTROL_CONTEXT));
}
- public static OpaAccessControl createOpaAuthorizer(URI opaUri, URI opaBatchUri, InstrumentedHttpClient mockHttpClient)
+ public static final class OpaConfigBuilder
{
- return (OpaAccessControl) OpaAccessControlFactory.create(
- ImmutableMap.builder()
- .put("opa.policy.uri", opaUri.toString())
- .put("opa.policy.batched-uri", opaBatchUri.toString())
- .buildOrThrow(),
- Optional.of(mockHttpClient),
- Optional.of(SYSTEM_ACCESS_CONTROL_CONTEXT));
+ private final OpaConfig config = new OpaConfig();
+
+ public OpaConfigBuilder withBasePolicy(URI basePolicy)
+ {
+ config.setOpaUri(basePolicy);
+ return this;
+ }
+
+ public OpaConfigBuilder withBatchPolicy(URI batchPolicy)
+ {
+ config.setOpaBatchUri(batchPolicy);
+ return this;
+ }
+
+ public OpaConfigBuilder withRowFiltersPolicy(URI rowFiltersPolicy)
+ {
+ config.setOpaRowFiltersUri(rowFiltersPolicy);
+ return this;
+ }
+
+ public OpaConfigBuilder withColumnMaskingPolicy(URI columnMaskingPolicy)
+ {
+ config.setOpaColumnMaskingUri(columnMaskingPolicy);
+ return this;
+ }
+
+ public Map buildConfig()
+ {
+ ConfigurationMetadata metadata = ConfigurationMetadata.getValidConfigurationMetadata(OpaConfig.class);
+ ImmutableMap.Builder opaConfigBuilder = ImmutableMap.builder();
+ try {
+ for (ConfigurationMetadata.AttributeMetadata attribute : metadata.getAttributes().values()) {
+ convertPropertyToString(attribute.getGetter().invoke(config)).ifPresent(
+ propertyValue -> opaConfigBuilder.put(attribute.getInjectionPoint().getProperty(), propertyValue));
+ }
+ } catch (InvocationTargetException|IllegalAccessException e) {
+ throw new AssertionError("Failed to build config map", e);
+ }
+ return opaConfigBuilder.buildOrThrow();
+ }
+
+ private static Optional convertPropertyToString(Object value) {
+ if (value instanceof Optional> optionalValue) {
+ return optionalValue.map(Object::toString);
+ }
+ return Optional.ofNullable(value).map(Object::toString);
+ }
}
static final class TestingSystemAccessControlContext
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControl.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControl.java
index 8c5e90e1f0c55..45bf09b8e41fe 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControl.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControl.java
@@ -13,6 +13,7 @@
*/
package io.trino.plugin.opa;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
@@ -21,6 +22,7 @@
import io.trino.plugin.opa.HttpClientUtils.MockResponse;
import io.trino.plugin.opa.TestHelpers.MethodWrapper;
import io.trino.plugin.opa.TestHelpers.TestingSystemAccessControlContext;
+import io.trino.plugin.opa.schema.OpaViewExpression;
import io.trino.spi.connector.CatalogSchemaName;
import io.trino.spi.connector.CatalogSchemaRoutineName;
import io.trino.spi.connector.CatalogSchemaTableName;
@@ -29,6 +31,8 @@
import io.trino.spi.security.SystemAccessControlFactory;
import io.trino.spi.security.SystemSecurityContext;
import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.security.ViewExpression;
+import io.trino.spi.type.VarcharType;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -36,10 +40,12 @@
import org.junit.jupiter.params.provider.MethodSource;
import java.net.URI;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import static io.trino.plugin.opa.RequestTestUtilities.assertStringRequestsEqual;
@@ -60,8 +66,11 @@
public class TestOpaAccessControl
{
private static final URI OPA_SERVER_URI = URI.create("http://my-uri/");
+ private static final URI OPA_SERVER_ROW_FILTERING_URI = URI.create("http://my-row-filtering-uri");
+ private static final URI OPA_SERVER_COLUMN_MASK_URI = URI.create("http://my-column-masking-uri");
private static final Identity TEST_IDENTITY = Identity.forUser("source-user").withGroups(ImmutableSet.of("some-group")).build();
private static final SystemSecurityContext TEST_SECURITY_CONTEXT = systemSecurityContextFromIdentity(TEST_IDENTITY);
+ private static final Map OPA_CONFIG_WITH_ONLY_ALLOW = new TestHelpers.OpaConfigBuilder().withBasePolicy(OPA_SERVER_URI).buildConfig();
// The below identity and security ctx would go away if we move all the tests to use their static constant counterparts above
private final Identity requestingIdentity = Identity.ofUser("source-user");
private final SystemSecurityContext requestingSecurityContext = systemSecurityContextFromIdentity(requestingIdentity);
@@ -75,7 +84,7 @@ public void testResponseHasExtraFields()
"decision_id": "foo",
"some_debug_info": {"test": ""}
}"""));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
authorizer.checkCanExecuteQuery(requestingIdentity);
}
@@ -145,7 +154,7 @@ public void testTableResourceActions(
FunctionalHelpers.Consumer3 callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
callable.accept(
authorizer,
@@ -182,7 +191,7 @@ public void testTableResourceFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> method.accept(
@@ -215,7 +224,7 @@ public void testTableWithPropertiesActions(
FunctionalHelpers.Consumer4 callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName table = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
Map> properties = ImmutableMap.>builder()
@@ -261,7 +270,7 @@ public void testTableWithPropertiesActionFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> method.accept(
@@ -291,7 +300,7 @@ public void testIdentityResourceActions(
FunctionalHelpers.Consumer3 callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
Identity dummyIdentity = Identity.forUser("dummy-user")
.withGroups(ImmutableSet.of("some-group"))
@@ -327,7 +336,7 @@ public void testIdentityResourceActionsFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> method.accept(
@@ -364,7 +373,7 @@ public void testStringResourceAction(
FunctionalHelpers.Consumer3 callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
callable.accept(authorizer, requestingSecurityContext, "resource_name");
@@ -397,7 +406,7 @@ public void testStringResourceActionsFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> method.accept(
@@ -412,7 +421,7 @@ public void testStringResourceActionsFailure(
public void testCanImpersonateUser()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
authorizer.checkCanImpersonateUser(requestingIdentity, "some_other_user");
@@ -437,7 +446,7 @@ public void testCanImpersonateUserFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> authorizer.checkCanImpersonateUser(requestingIdentity, "some_other_user"))
@@ -449,11 +458,11 @@ public void testCanImpersonateUserFailure(
public void testCanAccessCatalog()
{
InstrumentedHttpClient permissiveClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl permissiveAuthorizer = createOpaAuthorizer(OPA_SERVER_URI, permissiveClient);
+ OpaAccessControl permissiveAuthorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, permissiveClient);
assertThat(permissiveAuthorizer.canAccessCatalog(requestingSecurityContext, "test_catalog")).isTrue();
InstrumentedHttpClient restrictiveClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, NO_ACCESS_RESPONSE));
- OpaAccessControl restrictiveAuthorizer = createOpaAuthorizer(OPA_SERVER_URI, restrictiveClient);
+ OpaAccessControl restrictiveAuthorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, restrictiveClient);
assertThat(restrictiveAuthorizer.canAccessCatalog(requestingSecurityContext, "test_catalog")).isFalse();
String expectedRequest = """
@@ -477,7 +486,7 @@ public void testCanAccessCatalogIllegalResponses(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> authorizer.canAccessCatalog(requestingSecurityContext, "my_catalog"))
@@ -507,7 +516,7 @@ public void testSchemaResourceActions(
FunctionalHelpers.Consumer3 callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
callable.accept(authorizer, requestingSecurityContext, new CatalogSchemaName("my_catalog", "my_schema"));
@@ -540,7 +549,7 @@ public void testSchemaResourceActionsFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> method.accept(
@@ -555,7 +564,7 @@ public void testSchemaResourceActionsFailure(
public void testCreateSchema()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaName schema = new CatalogSchemaName("my_catalog", "my_schema");
authorizer.checkCanCreateSchema(requestingSecurityContext, schema, ImmutableMap.of("some_key", "some_value"));
@@ -600,7 +609,7 @@ public void testCreateSchemaFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> authorizer.checkCanCreateSchema(
@@ -615,7 +624,7 @@ public void testCreateSchemaFailure(
public void testCanRenameSchema()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaName sourceSchema = new CatalogSchemaName("my_catalog", "my_schema");
authorizer.checkCanRenameSchema(requestingSecurityContext, sourceSchema, "new_schema_name");
@@ -648,7 +657,7 @@ public void testCanRenameSchemaFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> authorizer.checkCanRenameSchema(
@@ -679,7 +688,7 @@ public void testRenameTableActions(
FunctionalHelpers.Consumer4 method)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName sourceTable = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
CatalogSchemaTableName targetTable = new CatalogSchemaTableName("my_catalog", "new_schema_name", "new_table_name");
@@ -723,7 +732,7 @@ public void testRenameTableFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName sourceTable = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
CatalogSchemaTableName targetTable = new CatalogSchemaTableName("my_catalog", "new_schema_name", "new_table_name");
@@ -741,7 +750,7 @@ public void testRenameTableFailure(
public void testCanSetSchemaAuthorization()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaName schema = new CatalogSchemaName("my_catalog", "my_schema");
@@ -773,7 +782,7 @@ public void testCanSetSchemaAuthorizationFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaName schema = new CatalogSchemaName("my_catalog", "my_schema");
assertThatThrownBy(
@@ -803,7 +812,7 @@ public void testCanSetTableAuthorization(
FunctionalHelpers.Consumer4 method)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName table = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
@@ -843,7 +852,7 @@ public void testCanSetTableAuthorizationFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName table = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
@@ -877,7 +886,7 @@ public void testTableColumnOperations(
FunctionalHelpers.Consumer4> method)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName table = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
Set columns = ImmutableSet.of("my_column");
@@ -915,7 +924,7 @@ public void testTableColumnOperationsFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
CatalogSchemaTableName table = new CatalogSchemaTableName("my_catalog", "my_schema", "my_table");
Set columns = ImmutableSet.of("my_column");
@@ -930,7 +939,7 @@ public void testTableColumnOperationsFailure(
public void testCanSetCatalogSessionProperty()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, OK_RESPONSE));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
authorizer.checkCanSetCatalogSessionProperty(
requestingSecurityContext, "my_catalog", "my_property");
@@ -957,7 +966,7 @@ public void testCanSetCatalogSessionPropertyFailure(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, mockClient);
assertThatThrownBy(
() -> authorizer.checkCanSetCatalogSessionProperty(
@@ -1066,19 +1075,224 @@ private void testRequestContextContentsForGivenTrinoVersion(Optional authorizer.getRowFilters(TEST_SECURITY_CONTEXT, tableName));
+
+ // Also test a valid JSON response, but containing invalid fields for a row filters request
+ String validJsonButIllegalSchemaResponseContents = """
+ {
+ "result": ["some-expr"]
+ }""";
+ assertAccessControlMethodThrowsForResponse(
+ authorizer -> authorizer.getRowFilters(TEST_SECURITY_CONTEXT, tableName),
+ new MockResponse(validJsonButIllegalSchemaResponseContents, 200),
+ OpaQueryException.class,
+ "Failed to deserialize");
+ }
+
+ @Test
+ public void testGetRowFilters()
+ {
+ // This example is a bit strange - an undefined policy would in most cases
+ // result in an access denied situation. However, since this is row-level-filtering
+ // we will accept this as meaning there are no known filters to be applied.
+ testGetRowFilters("{}", ImmutableList.of());
+
+ String noExpressionsResponse = """
+ {
+ "result": []
+ }""";
+ testGetRowFilters(noExpressionsResponse, ImmutableList.of());
+
+ String singleExpressionResponse = """
+ {
+ "result": [
+ {"expression": "expr1"}
+ ]
+ }""";
+ testGetRowFilters(
+ singleExpressionResponse,
+ ImmutableList.of(new OpaViewExpression("expr1", Optional.empty())));
+
+ String multipleExpressionsAndIdentitiesResponse = """
+ {
+ "result": [
+ {"expression": "expr1"},
+ {"expression": "expr2", "identity": "expr2_identity"},
+ {"expression": "expr3", "identity": "expr3_identity"}
+ ]
+ }""";
+ testGetRowFilters(
+ multipleExpressionsAndIdentitiesResponse,
+ ImmutableList.builder()
+ .add(new OpaViewExpression("expr1", Optional.empty()))
+ .add(new OpaViewExpression("expr2", Optional.of("expr2_identity")))
+ .add(new OpaViewExpression("expr3", Optional.of("expr3_identity")))
+ .build());
+ }
+
+ private void testGetRowFilters(String responseContent, List expectedExpressions)
+ {
+ InstrumentedHttpClient httpClient = createMockHttpClient(OPA_SERVER_ROW_FILTERING_URI, buildValidatingRequestHandler(TEST_IDENTITY, new MockResponse(responseContent, 200)));
+ OpaAccessControl authorizer = createOpaAuthorizer(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_SERVER_URI)
+ .withRowFiltersPolicy(OPA_SERVER_ROW_FILTERING_URI)
+ .buildConfig(),
+ httpClient);
+ CatalogSchemaTableName tableName = new CatalogSchemaTableName("some_catalog", "some_schema", "some_table");
+
+ List result = authorizer.getRowFilters(TEST_SECURITY_CONTEXT, tableName);
+ assertThat(result).allSatisfy(expression -> {
+ assertThat(expression.getCatalog()).contains("some_catalog");
+ assertThat(expression.getSchema()).contains("some_schema");
+ });
+ assertThat(result).map(
+ viewExpression -> new OpaViewExpression(
+ viewExpression.getExpression(),
+ viewExpression.getSecurityIdentity()))
+ .containsExactlyInAnyOrderElementsOf(expectedExpressions);
+
+ String expectedRequest = """
+ {
+ "operation": "GetRowFilters",
+ "resource": {
+ "table": {
+ "catalogName": "some_catalog",
+ "schemaName": "some_schema",
+ "tableName": "some_table"
+ }
+ }
+ }""";
+ assertStringRequestsEqual(ImmutableSet.of(expectedRequest), httpClient.getRequests(), "/input/action");
+ }
+
+ @Test
+ public void testGetRowFiltersDoesNothingIfNotConfigured()
+ {
+ InstrumentedHttpClient httpClient = createMockHttpClient(OPA_SERVER_ROW_FILTERING_URI, request -> {throw new AssertionError("Should not have been called");});
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, httpClient);
+ CatalogSchemaTableName tableName = new CatalogSchemaTableName("some_catalog", "some_schema", "some_table");
+
+ List result = authorizer.getRowFilters(TEST_SECURITY_CONTEXT, tableName);
+ assertThat(result).isEmpty();
+ assertThat(httpClient.getRequests()).isEmpty();
+ }
+
+ @Test
+ public void testGetColumnMaskThrowsForIllegalResponse()
+ {
+ CatalogSchemaTableName tableName = new CatalogSchemaTableName("some_catalog", "some_schema", "some_table");
+ assertAccessControlMethodThrowsForIllegalResponses(authorizer -> authorizer.getColumnMask(TEST_SECURITY_CONTEXT, tableName, "some_column", VarcharType.VARCHAR));
+
+ // Also test a valid JSON response, but containing invalid fields for a row filters request
+ String validJsonButIllegalSchemaResponseContents = """
+ {
+ "result": {"expression": {"foo": "bar"}}
+ }""";
+ assertAccessControlMethodThrowsForResponse(
+ authorizer -> authorizer.getColumnMask(TEST_SECURITY_CONTEXT, tableName, "some_column", VarcharType.VARCHAR),
+ new MockResponse(validJsonButIllegalSchemaResponseContents, 200),
+ OpaQueryException.class,
+ "Failed to deserialize");
+ }
+
+ @Test
+ public void testGetColumnMask()
+ {
+ // Similar note to the test for row level filtering:
+ // This example is a bit strange - an undefined policy would in most cases
+ // result in an access denied situation. However, since this is column masking,
+ // we will accept this as meaning there are no masks to be applied.
+ testGetColumnMask("{}", Optional.empty());
+
+ String nullResponse = """
+ {
+ "result": null
+ }""";
+ testGetColumnMask(nullResponse, Optional.empty());
+
+ String expressionWithoutIdentityResponse = """
+ {
+ "result": {"expression": "expr1"}
+ }""";
+ testGetColumnMask(
+ expressionWithoutIdentityResponse,
+ Optional.of(new OpaViewExpression("expr1", Optional.empty())));
+
+ String expressionWithIdentityResponse = """
+ {
+ "result": {"expression": "expr1", "identity": "some_identity"}
+ }""";
+ testGetColumnMask(
+ expressionWithIdentityResponse,
+ Optional.of(new OpaViewExpression("expr1", Optional.of("some_identity"))));
+ }
+
+ private void testGetColumnMask(String responseContent, Optional expectedExpression)
+ {
+ InstrumentedHttpClient httpClient = createMockHttpClient(OPA_SERVER_COLUMN_MASK_URI, buildValidatingRequestHandler(TEST_IDENTITY, new MockResponse(responseContent, 200)));
+ OpaAccessControl authorizer = createOpaAuthorizer(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_SERVER_URI)
+ .withColumnMaskingPolicy(OPA_SERVER_COLUMN_MASK_URI)
+ .buildConfig(),
+ httpClient);
+ CatalogSchemaTableName tableName = new CatalogSchemaTableName("some_catalog", "some_schema", "some_table");
+
+ Optional result = authorizer.getColumnMask(TEST_SECURITY_CONTEXT, tableName, "some_column", VarcharType.VARCHAR);
+
+ assertThat(result.isEmpty()).isEqualTo(expectedExpression.isEmpty());
+ assertThat(result.map(viewExpression -> {
+ assertThat(viewExpression.getCatalog()).contains("some_catalog");
+ assertThat(viewExpression.getSchema()).contains("some_schema");
+ return new OpaViewExpression(viewExpression.getExpression(), viewExpression.getSecurityIdentity());
+ })).isEqualTo(expectedExpression);
+
+ String expectedRequest = """
+ {
+ "operation": "GetColumnMask",
+ "resource": {
+ "column": {
+ "catalogName": "some_catalog",
+ "schemaName": "some_schema",
+ "tableName": "some_table",
+ "columnName": "some_column",
+ "columnType": "varchar"
+ }
+ }
+ }""";
+ assertStringRequestsEqual(ImmutableSet.of(expectedRequest), httpClient.getRequests(), "/input/action");
+ }
+
+ @Test
+ public void testGetColumnMaskDoesNothingIfNotConfigured()
+ {
+ InstrumentedHttpClient httpClient = createMockHttpClient(OPA_SERVER_COLUMN_MASK_URI, request -> {throw new AssertionError("Should not have been called");});
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, httpClient);
+ CatalogSchemaTableName tableName = new CatalogSchemaTableName("some_catalog", "some_schema", "some_table");
+
+ Optional result = authorizer.getColumnMask(TEST_SECURITY_CONTEXT, tableName, "some_column", VarcharType.VARCHAR);
+ assertThat(result).isEmpty();
+ assertThat(httpClient.getRequests()).isEmpty();
+ }
+
private static void assertAccessControlMethodBehaviour(MethodWrapper method, Set expectedRequests)
{
InstrumentedHttpClient permissiveMockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(TEST_IDENTITY, OK_RESPONSE));
InstrumentedHttpClient restrictiveMockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(TEST_IDENTITY, NO_ACCESS_RESPONSE));
- assertThat(method.isAccessAllowed(createOpaAuthorizer(OPA_SERVER_URI, permissiveMockClient))).isTrue();
- assertThat(method.isAccessAllowed(createOpaAuthorizer(OPA_SERVER_URI, restrictiveMockClient))).isFalse();
+ assertThat(method.isAccessAllowed(createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, permissiveMockClient))).isTrue();
+ assertThat(method.isAccessAllowed(createOpaAuthorizer(OPA_CONFIG_WITH_ONLY_ALLOW, restrictiveMockClient))).isFalse();
assertThat(permissiveMockClient.getRequests()).containsExactlyInAnyOrderElementsOf(restrictiveMockClient.getRequests());
assertStringRequestsEqual(expectedRequests, permissiveMockClient.getRequests(), "/input/action");
- assertAccessControlMethodThrowsForIllegalResponses(method);
+ assertAccessControlMethodThrowsForIllegalResponses(method::isAccessAllowed);
}
- private static void assertAccessControlMethodThrowsForIllegalResponses(MethodWrapper methodToTest)
+ private static void assertAccessControlMethodThrowsForIllegalResponses(Consumer methodToTest)
{
assertAccessControlMethodThrowsForResponse(methodToTest, UNDEFINED_RESPONSE, OpaQueryException.OpaServerError.PolicyNotFound.class, "did not return a value");
assertAccessControlMethodThrowsForResponse(methodToTest, BAD_REQUEST_RESPONSE, OpaQueryException.OpaServerError.class, "returned status 400");
@@ -1087,15 +1301,21 @@ private static void assertAccessControlMethodThrowsForIllegalResponses(MethodWra
}
private static void assertAccessControlMethodThrowsForResponse(
- MethodWrapper methodToTest,
+ Consumer methodToTest,
MockResponse response,
Class extends Throwable> expectedException,
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(TEST_IDENTITY, response));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
-
- assertThatThrownBy(() -> methodToTest.isAccessAllowed(authorizer))
+ OpaAccessControl authorizer = createOpaAuthorizer(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_SERVER_URI)
+ .withRowFiltersPolicy(OPA_SERVER_URI)
+ .withColumnMaskingPolicy(OPA_SERVER_URI)
+ .buildConfig(),
+ mockClient);
+
+ assertThatThrownBy(() -> methodToTest.accept(authorizer))
.isInstanceOf(expectedException)
.hasMessageContaining(expectedErrorMessage);
}
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlDataFilteringSystem.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlDataFilteringSystem.java
new file mode 100644
index 0000000000000..1cced9f230a8c
--- /dev/null
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlDataFilteringSystem.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.opa;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import io.trino.connector.MockConnectorFactory;
+import io.trino.connector.MockConnectorPlugin;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.VarcharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
+
+@Testcontainers
+@TestInstance(PER_CLASS)
+public class TestOpaAccessControlDataFilteringSystem
+{
+ @Container
+ private static final OpaContainer OPA_CONTAINER = new OpaContainer();
+ private static final String OPA_ALLOW_POLICY_NAME = "allow";
+ private static final String OPA_ROW_LEVEL_FILTERING_POLICY_NAME = "rowFilters";
+ private static final String OPA_COLUMN_MASKING_POLICY_NAME = "columnMask";
+ private static final String SAMPLE_ROW_LEVEL_FILTERING_POLICY = """
+ package trino
+ import future.keywords.in
+ import future.keywords.if
+ import future.keywords.contains
+
+ default allow := true
+
+ table_resource := input.action.resource.table
+ is_admin {
+ input.context.identity.user == "admin"
+ }
+
+ rowFilters contains {"expression": "user_type <> 'customer'"} if {
+ not is_admin
+ table_resource.catalogName == "sample_catalog"
+ table_resource.schemaName == "sample_schema"
+ table_resource.tableName == "restricted_table"
+ }""";
+ private static final String SAMPLE_COLUMN_MASKING_POLICY = """
+ package trino
+ import future.keywords.in
+ import future.keywords.if
+ import future.keywords.contains
+
+ default allow := true
+
+ column_resource := input.action.resource.column
+ is_admin {
+ input.context.identity.user == "admin"
+ }
+
+ columnMask := {"expression": "NULL"} if {
+ not is_admin
+ column_resource.catalogName == "sample_catalog"
+ column_resource.schemaName == "sample_schema"
+ column_resource.tableName == "restricted_table"
+ column_resource.columnName == "user_phone"
+ }
+
+ columnMask := {"expression": "'****' || substring(user_name, -3)"} if {
+ not is_admin
+ column_resource.catalogName == "sample_catalog"
+ column_resource.schemaName == "sample_schema"
+ column_resource.tableName == "restricted_table"
+ column_resource.columnName == "user_name"
+ }
+ """;
+
+ private static final Set DUMMY_CUSTOMERS_IN_TABLE = ImmutableSet.of("customer_one", "customer_two");
+ private static final Set DUMMY_INTERNAL_USERS_IN_TABLE = ImmutableSet.of("some_internal_user");
+ private static final Set ALL_DUMMY_USERS_IN_TABLE = ImmutableSet.builder()
+ .addAll(DUMMY_INTERNAL_USERS_IN_TABLE)
+ .addAll(DUMMY_CUSTOMERS_IN_TABLE)
+ .build();
+
+ private DistributedQueryRunnerHelper runner;
+
+
+ @AfterEach
+ public void teardown()
+ {
+ if (runner != null) {
+ runner.teardown();
+ }
+ }
+
+ @Test
+ public void testRowFilteringEnabled()
+ throws Exception
+ {
+ setupTrinoWithOpa(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .withRowFiltersPolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ROW_LEVEL_FILTERING_POLICY_NAME))
+ .buildConfig());
+ OPA_CONTAINER.submitPolicy(SAMPLE_ROW_LEVEL_FILTERING_POLICY);
+ String restrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.restricted_table";
+ String unrestrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.unrestricted_table";
+ assertResultsForUser("admin", restrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("admin", unrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+
+ assertResultsForUser("bob", unrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("bob", restrictedTableQuery, DUMMY_INTERNAL_USERS_IN_TABLE);
+ }
+
+ @Test
+ public void testRowFilteringDisabledDoesNothing()
+ throws Exception
+ {
+ setupTrinoWithOpa(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .buildConfig());
+ OPA_CONTAINER.submitPolicy(SAMPLE_ROW_LEVEL_FILTERING_POLICY);
+ String restrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.restricted_table";
+ String unrestrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.unrestricted_table";
+ assertResultsForUser("admin", restrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("admin", unrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+
+ assertResultsForUser("bob", unrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("bob", restrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ }
+
+ @Test
+ public void testColumnMasking()
+ throws Exception
+ {
+ setupTrinoWithOpa(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .withColumnMaskingPolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_COLUMN_MASKING_POLICY_NAME))
+ .buildConfig());
+ OPA_CONTAINER.submitPolicy(SAMPLE_COLUMN_MASKING_POLICY);
+
+ String userNamesInUnrestrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.unrestricted_table";
+ String userNamesInRestrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.restricted_table";
+ // No masking is applied to the unrestricted table
+ assertResultsForUser("admin", userNamesInUnrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("bob", userNamesInUnrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+
+ // No masking is applied for "admin" even in the restricted table
+ assertResultsForUser("admin", userNamesInRestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+
+ // "bob" can only see the last 3 characters of user names for the restricted table
+ Set expectedMaskedUserNames = ALL_DUMMY_USERS_IN_TABLE.stream().map(userName -> "****" + userName.substring(userName.length() - 3)).collect(toImmutableSet());
+ assertResultsForUser("bob", userNamesInRestrictedTableQuery, expectedMaskedUserNames);
+
+ String phoneNumbersInUnrestrictedTableQuery = "SELECT user_phone FROM sample_catalog.sample_schema.unrestricted_table";
+ String phoneNumbersInRestrictedTableQuery = "SELECT user_phone FROM sample_catalog.sample_schema.restricted_table";
+
+ // Phone numbers are derived by hashing the name of the user
+ Set allExpectedPhoneNumbers = ALL_DUMMY_USERS_IN_TABLE.stream().map(userName -> String.valueOf(userName.hashCode())).collect(toImmutableSet());
+
+ // No masking is applied to the unrestricted table
+ assertResultsForUser("admin", phoneNumbersInUnrestrictedTableQuery, allExpectedPhoneNumbers);
+ assertResultsForUser("bob", phoneNumbersInUnrestrictedTableQuery, allExpectedPhoneNumbers);
+
+ // No masking is applied for "admin" even in the restricted table
+ assertResultsForUser("admin", phoneNumbersInRestrictedTableQuery, allExpectedPhoneNumbers);
+ // "bob" cannot see any phone numbers in the restricted table
+ assertResultsForUser("bob", phoneNumbersInRestrictedTableQuery, ImmutableSet.of(""));
+ }
+
+ @Test
+ public void testColumnMaskingDisabledDoesNothing()
+ throws Exception
+ {
+ setupTrinoWithOpa(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .buildConfig());
+ OPA_CONTAINER.submitPolicy(SAMPLE_COLUMN_MASKING_POLICY);
+ String restrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.restricted_table";
+ String unrestrictedTableQuery = "SELECT user_name FROM sample_catalog.sample_schema.unrestricted_table";
+ assertResultsForUser("admin", restrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("admin", unrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+
+ assertResultsForUser("bob", unrestrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("bob", restrictedTableQuery, ALL_DUMMY_USERS_IN_TABLE);
+ }
+
+ @Test
+ public void testColumnMaskingAndRowFiltering()
+ throws Exception
+ {
+ setupTrinoWithOpa(
+ new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .withColumnMaskingPolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_COLUMN_MASKING_POLICY_NAME))
+ .withRowFiltersPolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ROW_LEVEL_FILTERING_POLICY_NAME))
+ .buildConfig());
+ // Simpler policy than the previous tests:
+ // Admin has no restrictions
+ // Any other user can only see rows where "user_type" is not "customer"
+ // And cannot see any data for field "user_name"
+ String policy = """
+ package trino
+ import future.keywords.in
+ import future.keywords.if
+ import future.keywords.contains
+
+ default allow := true
+
+ is_admin {
+ input.context.identity.user == "admin"
+ }
+
+ table_resource := input.action.resource.table
+ column_resource := input.action.resource.column
+
+ rowFilters contains {"expression": "user_type <> 'customer'"} if {
+ not is_admin
+ }
+ columnMask := {"expression": "NULL"} if {
+ not is_admin
+ column_resource.columnName == "user_name"
+ }""";
+ OPA_CONTAINER.submitPolicy(policy);
+
+ String selectUserNameData = "SELECT user_name FROM sample_catalog.sample_schema.restricted_table";
+ String selectUserTypeData = "SELECT user_type FROM sample_catalog.sample_schema.restricted_table";
+ Set expectedUserTypes = ImmutableSet.of("internal_user", "customer");
+
+ assertResultsForUser("admin", selectUserNameData, ALL_DUMMY_USERS_IN_TABLE);
+ assertResultsForUser("admin", selectUserTypeData, expectedUserTypes);
+
+ assertResultsForUser("bob", selectUserNameData, ImmutableSet.of(""));
+ assertResultsForUser("bob", selectUserTypeData, ImmutableSet.of("internal_user"));
+ }
+
+ private void assertResultsForUser(String asUser, String query, Set expectedResults)
+ {
+ assertThat(runner.querySetOfStrings(asUser, query)).containsExactlyInAnyOrderElementsOf(expectedResults);
+ }
+
+ private void setupTrinoWithOpa(Map opaConfig)
+ throws Exception
+ {
+ this.runner = DistributedQueryRunnerHelper.withOpaConfig(opaConfig);
+ MockConnectorFactory connectorFactory = MockConnectorFactory.builder()
+ .withListSchemaNames(session -> ImmutableList.of("sample_schema"))
+ .withListTables((session, schema) -> ImmutableList.builder()
+ .add("restricted_table")
+ .add("unrestricted_table")
+ .build())
+ .withGetColumns(schemaTableName -> ImmutableList.builder()
+ .add(ColumnMetadata.builder().setName("user_type").setType(VarcharType.VARCHAR).build())
+ .add(ColumnMetadata.builder().setName("user_name").setType(VarcharType.VARCHAR).build())
+ .add(ColumnMetadata.builder().setName("user_phone").setType(IntegerType.INTEGER).build())
+ .build())
+ .withData(schemaTableName -> ImmutableList.>builder()
+ .addAll(DUMMY_CUSTOMERS_IN_TABLE.stream().map(customer -> ImmutableList.of("customer", customer, customer.hashCode())).collect(toImmutableSet()))
+ .addAll(DUMMY_INTERNAL_USERS_IN_TABLE.stream().map(internalUser -> ImmutableList.of("internal_user", internalUser, internalUser.hashCode())).collect(toImmutableSet()))
+ .build())
+ .build();
+
+ runner.getBaseQueryRunner().installPlugin(new MockConnectorPlugin(connectorFactory));
+ runner.getBaseQueryRunner().createCatalog("sample_catalog", "mock");
+ }
+}
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlFiltering.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlFiltering.java
index bd97a034e52b6..3ad662efceca5 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlFiltering.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlFiltering.java
@@ -49,6 +49,7 @@
public class TestOpaAccessControlFiltering
{
private static final URI OPA_SERVER_URI = URI.create("http://my-uri/");
+ private static final Map OPA_CONFIG = new TestHelpers.OpaConfigBuilder().withBasePolicy(OPA_SERVER_URI).buildConfig();
private final Identity requestingIdentity = Identity.ofUser("source-user");
private final SystemSecurityContext requestingSecurityContext = systemSecurityContextFromIdentity(requestingIdentity);
@@ -56,7 +57,7 @@ public class TestOpaAccessControlFiltering
public void testFilterViewQueryOwnedBy()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildHandler("/input/action/resource/user/user", "user-one"));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Identity userOne = Identity.ofUser("user-one");
Identity userTwo = Identity.ofUser("user-two");
@@ -98,7 +99,7 @@ public void testFilterViewQueryOwnedBy()
public void testFilterCatalogs()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildHandler("/input/action/resource/catalog/name", "catalog_two"));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Set requestedCatalogs = ImmutableSet.of("catalog_one", "catalog_two");
Set result = authorizer.filterCatalogs(
@@ -135,7 +136,7 @@ public void testFilterCatalogs()
public void testFilterSchemas()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildHandler("/input/action/resource/schema/schemaName", "schema_one"));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Set requestedSchemas = ImmutableSet.of("schema_one", "schema_two");
@@ -171,7 +172,7 @@ public void testFilterTables()
.add(new SchemaTableName("schema_two", "table_two"))
.build();
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildHandler("/input/action/resource/table/tableName", "table_one"));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Set result = authorizer.filterTables(requestingSecurityContext, "my_catalog", tables);
assertThat(result).containsExactlyInAnyOrderElementsOf(tables.stream().filter(table -> table.getTableName().equals("table_one")).collect(toImmutableSet()));
@@ -212,7 +213,7 @@ public void testFilterColumns()
.build();
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildHandler("/input/action/resource/table/columns/0", columnsToAllow));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Map> result = authorizer.filterColumns(requestingSecurityContext, "my_catalog", requestedColumns);
@@ -245,7 +246,7 @@ public void testFilterColumns()
public void testEmptyFilterColumns()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, request -> OK_RESPONSE);
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
SchemaTableName someTable = SchemaTableName.schemaTableName("my_schema", "my_table");
Map> requestedColumns = ImmutableMap.of(someTable, ImmutableSet.of());
@@ -267,7 +268,7 @@ public void testFilterFunctions()
Set requestedFunctions = ImmutableSet.of(functionOne, functionTwo);
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildHandler("/input/action/resource/function/functionName", "function_two"));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Set result = authorizer.filterFunctions(
requestingSecurityContext,
@@ -297,7 +298,7 @@ public void testEmptyRequests(
BiFunction callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, request -> OK_RESPONSE);
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Collection> result = callable.apply(authorizer, requestingSecurityContext);
assertThat(result).isEmpty();
@@ -313,7 +314,7 @@ public void testIllegalResponseThrows(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
assertThatThrownBy(() -> callable.apply(authorizer, requestingSecurityContext))
.isInstanceOf(expectedException)
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlSystem.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlSystem.java
index b174efd29e0a0..2e38f0a33a3ae 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlSystem.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaAccessControlSystem.java
@@ -13,12 +13,8 @@
*/
package io.trino.plugin.opa;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import io.trino.Session;
import io.trino.plugin.blackhole.BlackHolePlugin;
-import io.trino.spi.security.Identity;
-import io.trino.testing.DistributedQueryRunner;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
@@ -29,26 +25,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.util.Optional;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
-import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.opa.FunctionalHelpers.Pair;
-import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
@@ -57,14 +42,12 @@
@TestInstance(PER_CLASS)
public class TestOpaAccessControlSystem
{
- private URI opaServerUri;
- private DistributedQueryRunner runner;
+ private DistributedQueryRunnerHelper runner;
- private static final int OPA_PORT = 8181;
+ private static final String OPA_ALLOW_POLICY_NAME = "allow";
+ private static final String OPA_BATCH_ALLOW_POLICY_NAME = "batchAllow";
@Container
- private static final GenericContainer> OPA_CONTAINER = new GenericContainer<>(DockerImageName.parse("openpolicyagent/opa:latest-rootless"))
- .withCommand("run", "--server", "--addr", ":%d".formatted(OPA_PORT))
- .withExposedPorts(OPA_PORT);
+ private static final OpaContainer OPA_CONTAINER = new OpaContainer();
@Nested
@TestInstance(PER_CLASS)
@@ -75,15 +58,15 @@ class UnbatchedAuthorizerTests
public void setupTrino()
throws Exception
{
- setupTrinoWithOpa("v1/data/trino/allow", Optional.empty());
+ setupTrinoWithOpa(new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .buildConfig());
}
@AfterAll
public void teardown()
{
- if (runner != null) {
- runner.close();
- }
+ runner.teardown();
}
@ParameterizedTest(name = "{index}: {0}")
@@ -91,7 +74,7 @@ public void teardown()
public void testAllowsQueryAndFilters(String userName, Set expectedCatalogs)
throws IOException, InterruptedException
{
- submitPolicy("""
+ OPA_CONTAINER.submitPolicy("""
package trino
import future.keywords.in
import future.keywords.if
@@ -117,7 +100,7 @@ public void testAllowsQueryAndFilters(String userName, Set expectedCatal
input.action.resource.catalog.name == "catalog_one"
}
""");
- Set catalogs = querySetOfStrings(user(userName), "SHOW CATALOGS");
+ Set catalogs = runner.querySetOfStrings(userName, "SHOW CATALOGS");
assertThat(catalogs).containsExactlyInAnyOrderElementsOf(expectedCatalogs);
}
@@ -125,7 +108,7 @@ public void testAllowsQueryAndFilters(String userName, Set expectedCatal
public void testShouldDenyQueryIfDirected()
throws IOException, InterruptedException
{
- submitPolicy("""
+ OPA_CONTAINER.submitPolicy("""
package trino
import future.keywords.in
default allow = false
@@ -134,11 +117,11 @@ public void testShouldDenyQueryIfDirected()
input.context.identity.user in ["someone", "admin"]
}
""");
- assertThatThrownBy(() -> runner.execute(user("bob"), "SHOW CATALOGS"))
+ assertThatThrownBy(() -> runner.querySetOfStrings("bob", "SHOW CATALOGS"))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Access Denied");
// smoke test: we can still query if we are the right user
- runner.execute(user("admin"), "SHOW CATALOGS");
+ runner.querySetOfStrings("admin", "SHOW CATALOGS");
}
}
@@ -151,15 +134,16 @@ class BatchedAuthorizerTests
public void setupTrino()
throws Exception
{
- setupTrinoWithOpa("v1/data/trino/allow", Optional.of("v1/data/trino/batchAllow"));
+ setupTrinoWithOpa(new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_ALLOW_POLICY_NAME))
+ .withBatchPolicy(OPA_CONTAINER.getOpaUriForPolicyPath(OPA_BATCH_ALLOW_POLICY_NAME))
+ .buildConfig());
}
@AfterAll
public void teardown()
{
- if (runner != null) {
- runner.close();
- }
+ runner.teardown();
}
@ParameterizedTest(name = "{index}: {0}")
@@ -167,7 +151,7 @@ public void teardown()
public void testFilterOutItemsBatch(String userName, Set expectedCatalogs)
throws IOException, InterruptedException
{
- submitPolicy("""
+ OPA_CONTAINER.submitPolicy("""
package trino
import future.keywords.in
import future.keywords.if
@@ -201,7 +185,7 @@ public void testFilterOutItemsBatch(String userName, Set expectedCatalog
is_admin
}
""");
- Set catalogs = querySetOfStrings(user(userName), "SHOW CATALOGS");
+ Set catalogs = runner.querySetOfStrings(userName, "SHOW CATALOGS");
assertThat(catalogs).containsExactlyInAnyOrderElementsOf(expectedCatalogs);
}
@@ -209,12 +193,12 @@ public void testFilterOutItemsBatch(String userName, Set expectedCatalog
public void testDenyUnbatchedQuery()
throws IOException, InterruptedException
{
- submitPolicy("""
+ OPA_CONTAINER.submitPolicy("""
package trino
import future.keywords.in
default allow = false
""");
- assertThatThrownBy(() -> runner.execute(user("bob"), "SELECT version()"))
+ assertThatThrownBy(() -> runner.querySetOfStrings("bob", "SELECT version()"))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Access Denied");
}
@@ -223,7 +207,7 @@ public void testDenyUnbatchedQuery()
public void testAllowUnbatchedQuery()
throws IOException, InterruptedException
{
- submitPolicy("""
+ OPA_CONTAINER.submitPolicy("""
package trino
import future.keywords.in
default allow = false
@@ -232,86 +216,18 @@ public void testAllowUnbatchedQuery()
input.action.operation in ["ImpersonateUser", "ExecuteFunction", "AccessCatalog", "ExecuteQuery"]
}
""");
- Set version = querySetOfStrings(user("bob"), "SELECT version()");
+ Set version = runner.querySetOfStrings("bob", "SELECT version()");
assertThat(version).isNotEmpty();
}
}
- private void ensureOpaUp()
- throws IOException, InterruptedException
- {
- assertThat(OPA_CONTAINER.isRunning()).isTrue();
- InetSocketAddress opaSocket = new InetSocketAddress(OPA_CONTAINER.getHost(), OPA_CONTAINER.getMappedPort(OPA_PORT));
- String opaEndpoint = String.format("%s:%d", opaSocket.getHostString(), opaSocket.getPort());
- awaitSocketOpen(opaSocket, 100, 200);
- this.opaServerUri = URI.create(String.format("http://%s/", opaEndpoint));
- }
-
- private void setupTrinoWithOpa(String basePolicyRelativeUri, Optional batchPolicyRelativeUri)
+ private void setupTrinoWithOpa(Map opaConfig)
throws Exception
{
- ensureOpaUp();
- ImmutableMap.Builder opaConfigBuilder = ImmutableMap.builder();
- opaConfigBuilder.put("opa.policy.uri", opaServerUri.resolve(basePolicyRelativeUri).toString());
- batchPolicyRelativeUri.ifPresent(relativeUri -> opaConfigBuilder.put("opa.policy.batched-uri", opaServerUri.resolve(relativeUri).toString()));
- this.runner = DistributedQueryRunner.builder(testSessionBuilder().build())
- .setSystemAccessControl(new OpaAccessControlFactory().create(opaConfigBuilder.buildOrThrow()))
- .setNodeCount(1)
- .build();
- runner.installPlugin(new BlackHolePlugin());
- runner.createCatalog("catalog_one", "blackhole");
- runner.createCatalog("catalog_two", "blackhole");
- }
-
- private static void awaitSocketOpen(InetSocketAddress addr, int attempts, int timeoutMs)
- throws IOException, InterruptedException
- {
- for (int i = 0; i < attempts; ++i) {
- try (Socket socket = new Socket()) {
- socket.connect(addr, timeoutMs);
- return;
- }
- catch (SocketTimeoutException e) {
- // ignored
- }
- catch (IOException e) {
- Thread.sleep(timeoutMs);
- }
- }
- throw new SocketTimeoutException("Timed out waiting for addr %s to be available (%d attempts made with a %d ms wait)".formatted(addr, attempts, timeoutMs));
- }
-
- private static String stringOfLines(String... lines)
- {
- StringBuilder out = new StringBuilder();
- for (String line : lines) {
- out.append(line);
- out.append("\r\n");
- }
- return out.toString();
- }
-
- private void submitPolicy(String... policyLines)
- throws IOException, InterruptedException
- {
- HttpClient httpClient = HttpClient.newHttpClient();
- HttpResponse policyResponse =
- httpClient.send(
- HttpRequest.newBuilder(opaServerUri.resolve("v1/policies/trino"))
- .PUT(HttpRequest.BodyPublishers.ofString(stringOfLines(policyLines)))
- .header("Content-Type", "text/plain").build(),
- HttpResponse.BodyHandlers.ofString());
- assertThat(policyResponse.statusCode()).withFailMessage("Failed to submit policy: %s", policyResponse.body()).isEqualTo(200);
- }
-
- private Session user(String user)
- {
- return testSessionBuilder().setIdentity(Identity.ofUser(user)).build();
- }
-
- private Set querySetOfStrings(Session session, String query)
- {
- return runner.execute(session, query).getMaterializedRows().stream().map(row -> row.getField(0).toString()).collect(toImmutableSet());
+ this.runner = DistributedQueryRunnerHelper.withOpaConfig(opaConfig);
+ runner.getBaseQueryRunner().installPlugin(new BlackHolePlugin());
+ runner.getBaseQueryRunner().createCatalog("catalog_one", "blackhole");
+ runner.getBaseQueryRunner().createCatalog("catalog_two", "blackhole");
}
private static Stream filterSchemaTests()
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaBatchAccessControlFiltering.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaBatchAccessControlFiltering.java
index 6bfa78c190d55..3f63e94cc29cf 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaBatchAccessControlFiltering.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaBatchAccessControlFiltering.java
@@ -52,7 +52,11 @@
public class TestOpaBatchAccessControlFiltering
{
private static final URI OPA_SERVER_URI = URI.create("http://my-uri/");
- private static final URI OPA_BATCH_SERVER_URI = URI.create("http://my-uri/batchAllow");
+ private static final URI OPA_BATCH_SERVER_URI = URI.create("http://my-batch-uri/");
+ private static final Map OPA_CONFIG = new TestHelpers.OpaConfigBuilder()
+ .withBasePolicy(OPA_SERVER_URI)
+ .withBatchPolicy(OPA_BATCH_SERVER_URI)
+ .buildConfig();
private final Identity requestingIdentity = Identity.ofUser("source-user");
private final SystemSecurityContext requestingSecurityContext = systemSecurityContextFromIdentity(requestingIdentity);
@@ -63,7 +67,7 @@ public void testFilterViewQueryOwnedBy(
List expectedItems)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, response));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Identity identityOne = Identity.ofUser("user-one");
Identity identityTwo = Identity.ofUser("user-two");
@@ -107,7 +111,7 @@ public void testFilterCatalogs(
List expectedItems)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, response));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
List requestedCatalogs = ImmutableList.of("catalog_one", "catalog_two", "catalog_three");
@@ -147,7 +151,7 @@ public void testFilterSchemas(
List expectedItems)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, response));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
List requestedSchemas = ImmutableList.of("schema_one", "schema_two", "schema_three");
Set result = authorizer.filterSchemas(
@@ -190,7 +194,7 @@ public void testFilterTables(
List expectedItems)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, response));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
List tables = ImmutableList.builder()
.add(new SchemaTableName("schema_one", "table_one"))
.add(new SchemaTableName("schema_one", "table_two"))
@@ -264,7 +268,7 @@ public void testFilterColumns()
};
return new MockResponse(responseContents, 200);
}));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Map> result = authorizer.filterColumns(
requestingSecurityContext,
"my_catalog",
@@ -302,7 +306,7 @@ public void testFilterFunctions(
List expectedItems)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, response));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
List requestedFunctions = ImmutableList.builder()
.add(new SchemaFunctionName("my_schema", "function_one"))
.add(new SchemaFunctionName("my_schema", "function_two"))
@@ -349,7 +353,7 @@ public void testFilterFunctions(
public void testEmptyFilterColumns()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, request -> OK_RESPONSE);
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
SchemaTableName tableOne = SchemaTableName.schemaTableName("my_schema", "table_one");
SchemaTableName tableTwo = SchemaTableName.schemaTableName("my_schema", "table_two");
@@ -372,7 +376,7 @@ public void testEmptyRequests(
BiFunction callable)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, request -> OK_RESPONSE);
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
Collection result = callable.apply(authorizer, requestingSecurityContext);
assertThat(result).isEmpty();
@@ -388,7 +392,7 @@ public void testIllegalResponseThrows(
String expectedErrorMessage)
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, failureResponse));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
assertThatThrownBy(() -> callable.apply(authorizer, requestingSecurityContext))
.isInstanceOf(expectedException)
@@ -400,7 +404,7 @@ public void testIllegalResponseThrows(
public void testResponseOutOfBoundsThrows()
{
InstrumentedHttpClient mockClient = createMockHttpClient(OPA_BATCH_SERVER_URI, buildValidatingRequestHandler(requestingIdentity, 200, "{\"result\": [0, 1, 2]}"));
- OpaAccessControl authorizer = createOpaAuthorizer(OPA_SERVER_URI, OPA_BATCH_SERVER_URI, mockClient);
+ OpaAccessControl authorizer = createOpaAuthorizer(OPA_CONFIG, mockClient);
assertThatThrownBy(() -> authorizer.filterCatalogs(requestingSecurityContext, ImmutableSet.of("catalog_one", "catalog_two")))
.isInstanceOf(OpaQueryException.QueryFailed.class);
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaConfig.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaConfig.java
index 37184bdefd3e8..766b043234125 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaConfig.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaConfig.java
@@ -31,6 +31,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(OpaConfig.class)
.setOpaUri(null)
.setOpaBatchUri(null)
+ .setOpaRowFiltersUri(null)
+ .setOpaColumnMaskingUri(null)
.setLogRequests(false)
.setLogResponses(false)
.setAllowPermissioningOperations(false));
@@ -42,6 +44,8 @@ public void testExplicitPropertyMappings()
Map properties = ImmutableMap.builder()
.put("opa.policy.uri", "https://opa.example.com")
.put("opa.policy.batched-uri", "https://opa-batch.example.com")
+ .put("opa.policy.row-filters-uri", "https://opa-row-filtering.example.com")
+ .put("opa.policy.column-masking-uri", "https://opa-column-masking.example.com")
.put("opa.log-requests", "true")
.put("opa.log-responses", "true")
.put("opa.allow-permissioning-operations", "true")
@@ -50,6 +54,8 @@ public void testExplicitPropertyMappings()
OpaConfig expected = new OpaConfig()
.setOpaUri(URI.create("https://opa.example.com"))
.setOpaBatchUri(URI.create("https://opa-batch.example.com"))
+ .setOpaRowFiltersUri(URI.create("https://opa-row-filtering.example.com"))
+ .setOpaColumnMaskingUri(URI.create("https://opa-column-masking.example.com"))
.setLogRequests(true)
.setLogResponses(true)
.setAllowPermissioningOperations(true);
diff --git a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaResponseDecoding.java b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaResponseDecoding.java
index 63b7948f06cc8..b40bd9213125c 100644
--- a/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaResponseDecoding.java
+++ b/plugin/trino-opa/src/test/java/io/trino/plugin/opa/TestOpaResponseDecoding.java
@@ -16,15 +16,23 @@
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.trino.plugin.opa.schema.OpaBatchQueryResult;
+import io.trino.plugin.opa.schema.OpaColumnMaskQueryResult;
import io.trino.plugin.opa.schema.OpaQueryResult;
+import io.trino.plugin.opa.schema.OpaRowFiltersQueryResult;
+import io.trino.plugin.opa.schema.OpaViewExpression;
import org.junit.jupiter.api.Test;
+import java.util.Optional;
+
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class TestOpaResponseDecoding
{
private final JsonCodec responseCodec = new JsonCodecFactory().jsonCodec(OpaQueryResult.class);
private final JsonCodec batchResponseCodec = new JsonCodecFactory().jsonCodec(OpaBatchQueryResult.class);
+ private final JsonCodec rowFilteringResponseCodec = new JsonCodecFactory().jsonCodec(OpaRowFiltersQueryResult.class);
+ private final JsonCodec columnMaskingResponseCodec = new JsonCodecFactory().jsonCodec(OpaColumnMaskQueryResult.class);
@Test
public void testCanDeserializeOpaSingleResponse()
@@ -82,13 +90,19 @@ public void testUndefinedDecisionSingleResponseTreatedAsDeny()
}
@Test
- public void testEmptyOrUndefinedResponses()
+ public void testIllegalResponseThrows()
+ {
+ testIllegalResponseDecodingThrows("{\"result\": \"foo\"}", responseCodec);
+ }
+
+ @Test
+ public void testBatchEmptyOrUndefinedResponses()
{
- testEmptyOrUndefinedResponses("{}");
- testEmptyOrUndefinedResponses("{\"result\": []}");
+ testBatchEmptyOrUndefinedResponses("{}");
+ testBatchEmptyOrUndefinedResponses("{\"result\": []}");
}
- private void testEmptyOrUndefinedResponses(String response)
+ private void testBatchEmptyOrUndefinedResponses(String response)
{
OpaBatchQueryResult result = this.batchResponseCodec.fromJson(response);
assertThat(result.result()).isEmpty();
@@ -118,6 +132,16 @@ public void testBatchResponseWithItemsAndDecisionId()
assertThat(result.decisionId()).isEqualTo("foobar");
}
+ @Test
+ public void testBatchResponseIllegalResponseThrows()
+ {
+ testIllegalResponseDecodingThrows("""
+ {
+ "result": ["foo"],
+ "decision_id": "foobar"
+ }""", batchResponseCodec);
+ }
+
@Test
public void testBatchResponseWithExtraFields()
{
@@ -131,4 +155,140 @@ public void testBatchResponseWithExtraFields()
assertThat(result.result()).containsExactly(1, 2, 3);
assertThat(result.decisionId()).isEqualTo("foobar");
}
+
+ @Test
+ public void testRowFilteringEmptyOrUndefinedResponses()
+ {
+ testRowFilteringEmptyOrUndefinedResponses("{}");
+ testRowFilteringEmptyOrUndefinedResponses("{\"result\": []}");
+ }
+
+ private void testRowFilteringEmptyOrUndefinedResponses(String response)
+ {
+ OpaRowFiltersQueryResult result = this.rowFilteringResponseCodec.fromJson(response);
+ assertThat(result.result()).isEmpty();
+ assertThat(result.decisionId()).isNull();
+ }
+
+ @Test
+ public void testRowFilteringResponseWithItemsNoDecisionId()
+ {
+ OpaRowFiltersQueryResult result = this.rowFilteringResponseCodec.fromJson("""
+ {
+ "result": [
+ {"expression": "foo"},
+ {"expression": "bar", "identity": "some_identity"}
+ ]
+ }""");
+ assertThat(result.result()).containsExactlyInAnyOrder(
+ new OpaViewExpression("foo", Optional.empty()),
+ new OpaViewExpression("bar", Optional.of("some_identity")));
+ assertThat(result.decisionId()).isNull();
+ }
+
+ @Test
+ public void testRowFilteringResponseWithItemsAndDecisionId()
+ {
+ OpaRowFiltersQueryResult result = this.rowFilteringResponseCodec.fromJson("""
+ {
+ "result": [{"expression": "test_expression"}],
+ "decision_id": "some_id"
+ }""");
+ assertThat(result.result()).containsExactly(new OpaViewExpression("test_expression", Optional.empty()));
+ assertThat(result.decisionId()).isEqualTo("some_id");
+ }
+
+ @Test
+ public void testRowFilteringResponseWithExtraFields()
+ {
+ OpaRowFiltersQueryResult result = this.rowFilteringResponseCodec.fromJson("""
+ {
+ "result": [{"expression": "test_expression"}],
+ "decision_id": "foobar",
+ "someInfo": "foo",
+ "andAnObject": {}
+ }""");
+ assertThat(result.result()).containsExactly(new OpaViewExpression("test_expression", Optional.empty()));
+ assertThat(result.decisionId()).isEqualTo("foobar");
+ }
+
+ @Test
+ public void testRowFilteringResponseIllegalResponseThrows()
+ {
+ testIllegalResponseDecodingThrows("""
+ {
+ "result": ["foo"]
+ }""", rowFilteringResponseCodec);
+ }
+
+ @Test
+ public void testColumnMaskingEmptyOrUndefinedResponse()
+ {
+ OpaColumnMaskQueryResult emptyResult = columnMaskingResponseCodec.fromJson("{}");
+ assertThat(emptyResult.result()).isEmpty();
+ assertThat(emptyResult.decisionId()).isNull();
+ OpaColumnMaskQueryResult undefinedResult = columnMaskingResponseCodec.fromJson("{\"result\": null}");
+ assertThat(undefinedResult.result()).isEmpty();
+ assertThat(undefinedResult.decisionId()).isNull();
+ }
+
+ @Test
+ public void testColumnMaskingResponsesWithNoDecisionId()
+ {
+ OpaColumnMaskQueryResult result = this.columnMaskingResponseCodec.fromJson("""
+ {
+ "result": {"expression": "test_expression"}
+ }""");
+ assertThat(result.result()).contains(new OpaViewExpression("test_expression", Optional.empty()));
+ assertThat(result.decisionId()).isNull();
+ }
+
+ @Test
+ public void testColumnMaskingResponsesWithDecisionId()
+ {
+ OpaColumnMaskQueryResult resultWithExpression = this.columnMaskingResponseCodec.fromJson("""
+ {
+ "result": {"expression": "test_expression"},
+ "decision_id": "foobar"
+ }""");
+ OpaColumnMaskQueryResult resultWithExpressionAndIdentity = this.columnMaskingResponseCodec.fromJson("""
+ {
+ "result": {"expression": "test_expression", "identity": "some_identity"},
+ "decision_id": "foobar"
+ }""");
+ assertThat(resultWithExpression.result()).contains(new OpaViewExpression("test_expression", Optional.empty()));
+ assertThat(resultWithExpressionAndIdentity.result()).contains(new OpaViewExpression("test_expression", Optional.of("some_identity")));
+ assertThat(resultWithExpression.decisionId()).isEqualTo("foobar");
+ assertThat(resultWithExpressionAndIdentity.decisionId()).isEqualTo("foobar");
+ }
+
+ @Test
+ public void testColumnMaskingResponseWithExtraFields()
+ {
+ OpaColumnMaskQueryResult result = this.columnMaskingResponseCodec.fromJson("""
+ {
+ "result": {"expression": "test_expression"},
+ "decision_id": "foobar",
+ "someInfo": "foo",
+ "andAnObject": {}
+ }""");
+ assertThat(result.result()).contains(new OpaViewExpression("test_expression", Optional.empty()));
+ assertThat(result.decisionId()).isEqualTo("foobar");
+ }
+
+ @Test
+ public void testColumnMaskingResponseIllegalResponseThrows()
+ {
+ testIllegalResponseDecodingThrows("""
+ {
+ "result": {"foo": "bar"}
+ }""", columnMaskingResponseCodec);
+ }
+
+ private void testIllegalResponseDecodingThrows(String rawResponse, JsonCodec codec)
+ {
+ assertThatThrownBy(() -> codec.fromJson(rawResponse))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid JSON");
+ }
}