Skip to content

Commit

Permalink
Add row-level security filter in query (#17564)
Browse files Browse the repository at this point in the history
This PR adds the ability to attach restrictions (e.x. row filters) to a query, thus restrict row-level data access for given users. Note that this feature is not directly available, and requires additional work in Authorizer.

This PR has been tested in a local druid cluster with additional Authorizer changes.

Description
A query follows these steps: initialize -> authorize -> execute. In the authorize step, the permissions are checked for all the required resources in the query. Before this PR, the authorize step only returns allow or deny access on a table. Granting access to a table means a user can see all data in this table. After this PR, the authorize step can return allow access along with restrictions (i.e. a row filter that must be applied to the table ), which restrict users' data access at row level. For example, customers can only see rows relevant to their company.

The authorizeAllResourceActions now returns a AuthorizationResult instead of Access. The main difference between AuthorizationResult and Access is that the former contains a map of table with restriction (i.e. Policy)

In the authorize step of QueryLifecycle, it would add the Policy to tables in the datasource tree, transform TableDataSource to RestrictedDataSource. In the execute step, Policy is enforced through RestrictedSegment and RestrictedCursorFactory, the filter inside Policy would be attached to the CursorHolder.
  • Loading branch information
cecemei authored Jan 15, 2025
1 parent 55022db commit 19c1afe
Show file tree
Hide file tree
Showing 73 changed files with 2,930 additions and 610 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.DirectStatement.ResultSet;
Expand Down Expand Up @@ -146,8 +147,8 @@ private QueryResponse runNativeQuery(QueryRequest request, AuthenticationResult
final String currThreadName = Thread.currentThread().getName();
try {
queryLifecycle.initialize(query);
Access authorizationResult = queryLifecycle.authorize(authResult);
if (!authorizationResult.isAllowed()) {
AuthorizationResult authorizationResult = queryLifecycle.authorize(authResult);
if (!authorizationResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(Access.DEFAULT_ERROR_MESSAGE);
}
queryResponse = queryLifecycle.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.http.security.AbstractResourceFilter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -54,17 +54,17 @@ public ContainerRequest filter(ContainerRequest request)
getAction(request)
);

final Access authResult = AuthorizationUtils.authorizeResourceAction(
final AuthorizationResult authResult = AuthorizationUtils.authorizeResourceAction(
getReq(),
resourceAction,
getAuthorizerMapper()
);

if (!authResult.isAllowed()) {
if (!authResult.allowAccessWithNoRestriction()) {
throw new WebApplicationException(
Response.status(Response.Status.FORBIDDEN)
.type(MediaType.TEXT_PLAIN)
.entity(StringUtils.format("Access-Check-Result: %s", authResult.toString()))
.entity(StringUtils.format("Access-Check-Result: %s", authResult.getErrorMessage()))
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
Expand All @@ -56,7 +56,6 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -108,17 +107,17 @@ public CatalogResource(
* </ul>
*
* @param schemaName The name of the Druid schema, which must be writable
* and the user must have at least read access.
* @param tableName The name of the table definition to modify. The user must
* have write access to the table.
* @param spec The new table definition.
* @param version the expected version of an existing table. The version must
* match. If not (or if the table does not exist), returns an error.
* @param overwrite if {@code true}, then overwrites any existing table.
* If {@code false}, then the operation fails if the table already exists.
* Ignored if a version is specified.
* @param req the HTTP request used for authorization.
*/
* and the user must have at least read access.
* @param tableName The name of the table definition to modify. The user must
* have write access to the table.
* @param spec The new table definition.
* @param version the expected version of an existing table. The version must
* match. If not (or if the table does not exist), returns an error.
* @param overwrite if {@code true}, then overwrites any existing table.
* If {@code false}, then the operation fails if the table already exists.
* Ignored if a version is specified.
* @param req the HTTP request used for authorization.
*/
@POST
@Path("/schemas/{schema}/tables/{name}")
@Consumes(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -181,9 +180,9 @@ public Response postTable(
* the definition is created before the datasource itself.)
*
* @param schemaName The Druid schema. The user must have read access.
* @param tableName The name of the table within the schema. The user must have
* read access.
* @param req the HTTP request used for authorization.
* @param tableName The name of the table within the schema. The user must have
* read access.
* @param req the HTTP request used for authorization.
* @return the definition for the table, if any.
*/
@GET
Expand Down Expand Up @@ -211,8 +210,8 @@ public Response getTable(
* for the given schema and table.
*
* @param schemaName The name of the schema that holds the table.
* @param tableName The name of the table definition to delete. The user must have
* write access.
* @param tableName The name of the table definition to delete. The user must have
* write access.
*/
@DELETE
@Path("/schemas/{schema}/tables/{name}")
Expand Down Expand Up @@ -247,9 +246,9 @@ public Response deleteTable(
* the table spec changed between the time it was retrieve and the edit operation
* is submitted.
*
* @param schemaName The name of the schema that holds the table.
* @param tableName The name of the table definition to delete. The user must have
* write access.
* @param schemaName The name of the schema that holds the table.
* @param tableName The name of the table definition to delete. The user must have
* write access.
* @param editRequest The operation to perform. See the classes for details.
*/
@POST
Expand Down Expand Up @@ -281,7 +280,7 @@ public Response editTable(
* Retrieves the list of all Druid schema names.
*
* @param format the format of the response. See the code for the
* available formats
* available formats
*/
@GET
@Path("/schemas")
Expand Down Expand Up @@ -318,9 +317,9 @@ public Response getSchemas(
* the read-only schemas, there will be no table definitions.
*
* @param schemaName The name of the Druid schema to query. The user must
* have read access.
* @param format the format of the response. See the code for the
* available formats
* have read access.
* @param format the format of the response. See the code for the
* available formats
*/
@GET
@Path("/schemas/{schema}/tables")
Expand Down Expand Up @@ -360,7 +359,7 @@ public Response getSchemaTables(
* table definitions known to the catalog. Used to prime a cache on first access.
* After that, the Coordinator will push updates to Brokers. Returns the full
* list of table details.
*
* <p>
* It is expected that the number of table definitions will be of small or moderate
* size, so no provision is made to handle very large lists.
*/
Expand Down Expand Up @@ -467,9 +466,9 @@ private Response listAllTableMetadata(final HttpServletRequest req)
List<Pair<SchemaSpec, TableMetadata>> tables = new ArrayList<>();
for (SchemaSpec schema : catalog.schemaRegistry().schemas()) {
tables.addAll(catalog.tables().tablesInSchema(schema.name())
.stream()
.map(table -> Pair.of(schema, table))
.collect(Collectors.toList()));
.stream()
.map(table -> Pair.of(schema, table))
.collect(Collectors.toList()));

}
Iterable<Pair<SchemaSpec, TableMetadata>> filtered = AuthorizationUtils.filterAuthorizedResources(
Expand All @@ -483,9 +482,9 @@ private Response listAllTableMetadata(final HttpServletRequest req)
);

List<TableMetadata> metadata = Lists.newArrayList(filtered)
.stream()
.map(pair -> pair.rhs)
.collect(Collectors.toList());
.stream()
.map(pair -> pair.rhs)
.collect(Collectors.toList());
return Response.ok().entity(metadata).build();
}

Expand All @@ -499,9 +498,9 @@ private Response tableNamesInSchema(
req,
tables,
name ->
Collections.singletonList(
resourceAction(schema, name, Action.READ)),
authorizerMapper
Collections.singletonList(
resourceAction(schema, name, Action.READ)),
authorizerMapper
);
return Response.ok().entity(Lists.newArrayList(filtered)).build();
}
Expand Down Expand Up @@ -581,13 +580,13 @@ private void authorizeTable(

private void authorize(String resource, String key, Action action, HttpServletRequest request)
{
final Access authResult = authorizeAccess(resource, key, action, request);
if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
final AuthorizationResult authResult = authorizeAccess(resource, key, action, request);
if (!authResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(authResult.getErrorMessage());
}
}

private Access authorizeAccess(String resource, String key, Action action, HttpServletRequest request)
private AuthorizationResult authorizeAccess(String resource, String key, Action action, HttpServletRequest request)
{
return AuthorizationUtils.authorizeResourceAction(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -144,7 +144,7 @@ public GetQueriesResponse doGetRunningQueries(
)
{
final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(req);
final Access stateReadAccess = AuthorizationUtils.authorizeAllResourceActions(
final AuthorizationResult stateReadAccess = AuthorizationUtils.authorizeAllResourceActions(
authenticationResult,
Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, Action.READ)),
authorizerMapper
Expand Down Expand Up @@ -175,7 +175,7 @@ public GetQueriesResponse doGetRunningQueries(
queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId));

final GetQueriesResponse response;
if (stateReadAccess.isAllowed()) {
if (stateReadAccess.allowAccessWithNoRestriction()) {
// User can READ STATE, so they can see all running queries, as well as authentication details.
response = new GetQueriesResponse(queries);
} else {
Expand Down Expand Up @@ -245,9 +245,9 @@ public Response cancelQuery(
return Response.status(Response.Status.ACCEPTED).build();
}

final Access access = authorizeCancellation(req, cancelables);
final AuthorizationResult authResult = authorizeCancellation(req, cancelables);

if (access.isAllowed()) {
if (authResult.allowAccessWithNoRestriction()) {
sqlLifecycleManager.removeAll(sqlQueryId, cancelables);

// Don't call cancel() on the cancelables. That just cancels native queries, which is useless here. Instead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.QueryMaker;
Expand Down Expand Up @@ -127,6 +128,9 @@ public DartQueryMaker(
@Override
public QueryResponse<Object[]> runQuery(DruidQuery druidQuery)
{
if (!plannerContext.getAuthorizationResult().allowAccessWithNoRestriction()) {
throw new ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage());
}
final MSQSpec querySpec = MSQTaskQueryMaker.makeQuerySpec(
null,
druidQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.msq.rpc;

import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
Expand All @@ -41,10 +41,14 @@ public static void authorizeAdminRequest(
{
final List<ResourceAction> resourceActions = permissionMapper.getAdminPermissions();

Access access = AuthorizationUtils.authorizeAllResourceActions(request, resourceActions, authorizerMapper);
AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions(
request,
resourceActions,
authorizerMapper
);

if (!access.isAllowed()) {
throw new ForbiddenException(access.toString());
if (!authResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(authResult.getErrorMessage());
}
}

Expand All @@ -57,10 +61,14 @@ public static void authorizeQueryRequest(
{
final List<ResourceAction> resourceActions = permissionMapper.getQueryPermissions(queryId);

Access access = AuthorizationUtils.authorizeAllResourceActions(request, resourceActions, authorizerMapper);
AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions(
request,
resourceActions,
authorizerMapper
);

if (!access.isAllowed()) {
throw new ForbiddenException(access.toString());
if (!authResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(authResult.getErrorMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
Expand Down Expand Up @@ -116,6 +117,9 @@ public class MSQTaskQueryMaker implements QueryMaker
@Override
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
if (!plannerContext.getAuthorizationResult().allowAccessWithNoRestriction()) {
throw new ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage());
}
Hook.QUERY_PLAN.run(druidQuery.getQuery());
plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, druidQuery.getQuery());

Expand Down
Loading

0 comments on commit 19c1afe

Please sign in to comment.