From 19c1afe4e2a3fdcea5e301c83fb3e6d6c27d84d7 Mon Sep 17 00:00:00 2001 From: Cece Mei Date: Tue, 14 Jan 2025 19:35:32 -0800 Subject: [PATCH] Add row-level security filter in query (#17564) This PR adds the ability to attach restrictions (e.x. row filters) to a query, thus restrict row-level data access for given users. Note that this feature is not directly available, and requires additional work in Authorizer. This PR has been tested in a local druid cluster with additional Authorizer changes. Description A query follows these steps: initialize -> authorize -> execute. In the authorize step, the permissions are checked for all the required resources in the query. Before this PR, the authorize step only returns allow or deny access on a table. Granting access to a table means a user can see all data in this table. After this PR, the authorize step can return allow access along with restrictions (i.e. a row filter that must be applied to the table ), which restrict users' data access at row level. For example, customers can only see rows relevant to their company. The authorizeAllResourceActions now returns a AuthorizationResult instead of Access. The main difference between AuthorizationResult and Access is that the former contains a map of table with restriction (i.e. Policy) In the authorize step of QueryLifecycle, it would add the Policy to tables in the datasource tree, transform TableDataSource to RestrictedDataSource. In the execute step, Policy is enforced through RestrictedSegment and RestrictedCursorFactory, the filter inside Policy would be attached to the CursorHolder. --- .../apache/druid/grpc/server/QueryDriver.java | 5 +- .../basic/BasicSecurityResourceFilter.java | 8 +- .../druid/catalog/http/CatalogResource.java | 77 ++- .../dart/controller/http/DartSqlResource.java | 10 +- .../dart/controller/sql/DartQueryMaker.java | 4 + .../druid/msq/rpc/MSQResourceUtils.java | 22 +- .../druid/msq/sql/MSQTaskQueryMaker.java | 4 + .../sql/resources/SqlStatementResource.java | 44 +- .../controller/http/DartSqlResourceTest.java | 63 ++- .../apache/druid/msq/exec/MSQSelectTest.java | 15 + .../indexing/common/task/IndexTaskUtils.java | 13 +- .../overlord/http/OverlordResource.java | 29 +- .../security/SupervisorResourceFilter.java | 8 +- .../http/security/TaskResourceFilter.java | 8 +- .../overlord/sampler/SamplerResource.java | 8 +- .../supervisor/SupervisorResource.java | 31 +- .../SeekableStreamIndexTaskRunner.java | 50 +- .../supervisor/SupervisorResourceTest.java | 4 +- pom.xml | 7 +- .../org/apache/druid/query/DataSource.java | 34 +- .../apache/druid/query/JoinDataSource.java | 8 +- .../java/org/apache/druid/query/Query.java | 10 +- .../druid/query/RestrictedDataSource.java | 215 +++++++++ .../apache/druid/query/TableDataSource.java | 14 + .../druid/query/filter/NotDimFilter.java | 1 - .../metadata/SegmentMetadataQuery.java | 24 +- .../query/policy/NoRestrictionPolicy.java | 70 +++ .../org/apache/druid/query/policy/Policy.java | 45 ++ .../druid/query/policy/RowFilterPolicy.java | 94 ++++ .../segment/BypassRestrictedSegment.java | 68 +++ .../segment/RestrictedCursorFactory.java | 74 +++ .../druid/segment/RestrictedSegment.java | 129 +++++ .../org/apache/druid/segment/Segment.java | 28 +- .../apache/druid/query/DataSourceTest.java | 107 +++++ .../druid/query/JoinDataSourceTest.java | 22 + .../druid/query/RestrictedDataSourceTest.java | 173 +++++++ .../query/policy/NoRestrictionPolicyTest.java | 60 +++ .../query/policy/RowFilterPolicyTest.java | 93 ++++ .../SQLMetadataStorageActionHandler.java | 3 +- .../AbstractSegmentMetadataCache.java | 5 +- .../SegmentMetadataQuerySegmentWalker.java | 20 +- .../druid/segment/realtime/ChatHandlers.java | 12 +- .../apache/druid/server/QueryLifecycle.java | 114 +++-- .../apache/druid/server/QueryResource.java | 14 +- .../http/security/ConfigResourceFilter.java | 8 +- .../security/DatasourceResourceFilter.java | 8 +- .../http/security/RulesResourceFilter.java | 12 +- .../http/security/StateResourceFilter.java | 9 +- .../apache/druid/server/security/Access.java | 83 +++- .../druid/server/security/AuthConfig.java | 4 +- .../server/security/AuthorizationResult.java | 238 ++++++++++ .../server/security/AuthorizationUtils.java | 159 +++---- .../AuthorizationResultTest.java | 118 +++++ .../CoordinatorSegmentMetadataCacheTest.java | 14 +- .../druid/server/QueryLifecycleTest.java | 445 ++++++++++++++++-- .../druid/server/QueryResourceTest.java | 131 +++--- .../security/ForbiddenExceptionTest.java | 32 +- .../apache/druid/sql/AbstractStatement.java | 18 +- .../org/apache/druid/sql/HttpStatement.java | 18 +- .../sql/calcite/planner/DruidPlanner.java | 18 +- .../sql/calcite/planner/PlannerContext.java | 10 +- .../sql/calcite/planner/PlannerFactory.java | 10 +- .../sql/calcite/run/NativeQueryMaker.java | 10 +- .../sql/calcite/schema/SystemSchema.java | 32 +- .../apache/druid/sql/http/SqlResource.java | 10 +- .../sql/avatica/DruidAvaticaHandlerTest.java | 12 + .../sql/calcite/CalciteJoinQueryTest.java | 4 +- .../druid/sql/calcite/CalciteQueryTest.java | 2 + .../sql/calcite/CalciteSelectQueryTest.java | 40 ++ .../BrokerSegmentMetadataCacheTest.java | 123 +++-- .../druid/sql/calcite/util/CalciteTests.java | 58 ++- .../sql/calcite/util/TestDataBuilder.java | 9 + .../druid/sql/http/SqlResourceTest.java | 58 ++- 73 files changed, 2930 insertions(+), 610 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java create mode 100644 processing/src/main/java/org/apache/druid/query/policy/NoRestrictionPolicy.java create mode 100644 processing/src/main/java/org/apache/druid/query/policy/Policy.java create mode 100644 processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java create mode 100644 processing/src/main/java/org/apache/druid/segment/BypassRestrictedSegment.java create mode 100644 processing/src/main/java/org/apache/druid/segment/RestrictedCursorFactory.java create mode 100644 processing/src/main/java/org/apache/druid/segment/RestrictedSegment.java create mode 100644 processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/policy/NoRestrictionPolicyTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java create mode 100644 server/src/main/java/org/apache/druid/server/security/AuthorizationResult.java create mode 100644 server/src/test/java/org/apache/druid/initialization/AuthorizationResultTest.java diff --git a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java index 096a1439a4fd..ca3aeb4e65ac 100644 --- a/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java +++ b/extensions-contrib/grpc-query/src/main/java/org/apache/druid/grpc/server/QueryDriver.java @@ -49,6 +49,7 @@ import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.DirectStatement; import org.apache.druid.sql.DirectStatement.ResultSet; @@ -146,8 +147,8 @@ private QueryResponse runNativeQuery(QueryRequest request, AuthenticationResult final String currThreadName = Thread.currentThread().getName(); try { queryLifecycle.initialize(query); - Access authorizationResult = queryLifecycle.authorize(authResult); - if (!authorizationResult.isAllowed()) { + AuthorizationResult authorizationResult = queryLifecycle.authorize(authResult); + if (!authorizationResult.allowAccessWithNoRestriction()) { throw new ForbiddenException(Access.DEFAULT_ERROR_MESSAGE); } queryResponse = queryLifecycle.execute(); diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicSecurityResourceFilter.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicSecurityResourceFilter.java index bcb4ec053457..eade42320c7d 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicSecurityResourceFilter.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/BasicSecurityResourceFilter.java @@ -23,7 +23,7 @@ import com.sun.jersey.spi.container.ContainerRequest; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.http.security.AbstractResourceFilter; -import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Resource; @@ -54,17 +54,17 @@ public ContainerRequest filter(ContainerRequest request) getAction(request) ); - final Access authResult = AuthorizationUtils.authorizeResourceAction( + final AuthorizationResult authResult = AuthorizationUtils.authorizeResourceAction( getReq(), resourceAction, getAuthorizerMapper() ); - if (!authResult.isAllowed()) { + if (!authResult.allowAccessWithNoRestriction()) { throw new WebApplicationException( Response.status(Response.Status.FORBIDDEN) .type(MediaType.TEXT_PLAIN) - .entity(StringUtils.format("Access-Check-Result: %s", authResult.toString())) + .entity(StringUtils.format("Access-Check-Result: %s", authResult.getErrorMessage())) .build() ); } diff --git a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java index 2590f57c8b38..c206266f5f12 100644 --- a/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java +++ b/extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java @@ -34,8 +34,8 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -56,7 +56,6 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -108,17 +107,17 @@ public CatalogResource( * * * @param schemaName The name of the Druid schema, which must be writable - * and the user must have at least read access. - * @param tableName The name of the table definition to modify. The user must - * have write access to the table. - * @param spec The new table definition. - * @param version the expected version of an existing table. The version must - * match. If not (or if the table does not exist), returns an error. - * @param overwrite if {@code true}, then overwrites any existing table. - * If {@code false}, then the operation fails if the table already exists. - * Ignored if a version is specified. - * @param req the HTTP request used for authorization. - */ + * and the user must have at least read access. + * @param tableName The name of the table definition to modify. The user must + * have write access to the table. + * @param spec The new table definition. + * @param version the expected version of an existing table. The version must + * match. If not (or if the table does not exist), returns an error. + * @param overwrite if {@code true}, then overwrites any existing table. + * If {@code false}, then the operation fails if the table already exists. + * Ignored if a version is specified. + * @param req the HTTP request used for authorization. + */ @POST @Path("/schemas/{schema}/tables/{name}") @Consumes(MediaType.APPLICATION_JSON) @@ -181,9 +180,9 @@ public Response postTable( * the definition is created before the datasource itself.) * * @param schemaName The Druid schema. The user must have read access. - * @param tableName The name of the table within the schema. The user must have - * read access. - * @param req the HTTP request used for authorization. + * @param tableName The name of the table within the schema. The user must have + * read access. + * @param req the HTTP request used for authorization. * @return the definition for the table, if any. */ @GET @@ -211,8 +210,8 @@ public Response getTable( * for the given schema and table. * * @param schemaName The name of the schema that holds the table. - * @param tableName The name of the table definition to delete. The user must have - * write access. + * @param tableName The name of the table definition to delete. The user must have + * write access. */ @DELETE @Path("/schemas/{schema}/tables/{name}") @@ -247,9 +246,9 @@ public Response deleteTable( * the table spec changed between the time it was retrieve and the edit operation * is submitted. * - * @param schemaName The name of the schema that holds the table. - * @param tableName The name of the table definition to delete. The user must have - * write access. + * @param schemaName The name of the schema that holds the table. + * @param tableName The name of the table definition to delete. The user must have + * write access. * @param editRequest The operation to perform. See the classes for details. */ @POST @@ -281,7 +280,7 @@ public Response editTable( * Retrieves the list of all Druid schema names. * * @param format the format of the response. See the code for the - * available formats + * available formats */ @GET @Path("/schemas") @@ -318,9 +317,9 @@ public Response getSchemas( * the read-only schemas, there will be no table definitions. * * @param schemaName The name of the Druid schema to query. The user must - * have read access. - * @param format the format of the response. See the code for the - * available formats + * have read access. + * @param format the format of the response. See the code for the + * available formats */ @GET @Path("/schemas/{schema}/tables") @@ -360,7 +359,7 @@ public Response getSchemaTables( * table definitions known to the catalog. Used to prime a cache on first access. * After that, the Coordinator will push updates to Brokers. Returns the full * list of table details. - * + *

* It is expected that the number of table definitions will be of small or moderate * size, so no provision is made to handle very large lists. */ @@ -467,9 +466,9 @@ private Response listAllTableMetadata(final HttpServletRequest req) List> tables = new ArrayList<>(); for (SchemaSpec schema : catalog.schemaRegistry().schemas()) { tables.addAll(catalog.tables().tablesInSchema(schema.name()) - .stream() - .map(table -> Pair.of(schema, table)) - .collect(Collectors.toList())); + .stream() + .map(table -> Pair.of(schema, table)) + .collect(Collectors.toList())); } Iterable> filtered = AuthorizationUtils.filterAuthorizedResources( @@ -483,9 +482,9 @@ private Response listAllTableMetadata(final HttpServletRequest req) ); List metadata = Lists.newArrayList(filtered) - .stream() - .map(pair -> pair.rhs) - .collect(Collectors.toList()); + .stream() + .map(pair -> pair.rhs) + .collect(Collectors.toList()); return Response.ok().entity(metadata).build(); } @@ -499,9 +498,9 @@ private Response tableNamesInSchema( req, tables, name -> - Collections.singletonList( - resourceAction(schema, name, Action.READ)), - authorizerMapper + Collections.singletonList( + resourceAction(schema, name, Action.READ)), + authorizerMapper ); return Response.ok().entity(Lists.newArrayList(filtered)).build(); } @@ -581,13 +580,13 @@ private void authorizeTable( private void authorize(String resource, String key, Action action, HttpServletRequest request) { - final Access authResult = authorizeAccess(resource, key, action, request); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); + final AuthorizationResult authResult = authorizeAccess(resource, key, action, request); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } } - private Access authorizeAccess(String resource, String key, Action action, HttpServletRequest request) + private AuthorizationResult authorizeAccess(String resource, String key, Action action, HttpServletRequest request) { return AuthorizationUtils.authorizeResourceAction( request, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java index 65d770a29c55..a277d7d126ff 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java @@ -36,9 +36,9 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Resource; @@ -144,7 +144,7 @@ public GetQueriesResponse doGetRunningQueries( ) { final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(req); - final Access stateReadAccess = AuthorizationUtils.authorizeAllResourceActions( + final AuthorizationResult stateReadAccess = AuthorizationUtils.authorizeAllResourceActions( authenticationResult, Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, Action.READ)), authorizerMapper @@ -175,7 +175,7 @@ public GetQueriesResponse doGetRunningQueries( queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId)); final GetQueriesResponse response; - if (stateReadAccess.isAllowed()) { + if (stateReadAccess.allowAccessWithNoRestriction()) { // User can READ STATE, so they can see all running queries, as well as authentication details. response = new GetQueriesResponse(queries); } else { @@ -245,9 +245,9 @@ public Response cancelQuery( return Response.status(Response.Status.ACCEPTED).build(); } - final Access access = authorizeCancellation(req, cancelables); + final AuthorizationResult authResult = authorizeCancellation(req, cancelables); - if (access.isAllowed()) { + if (authResult.allowAccessWithNoRestriction()) { sqlLifecycleManager.removeAll(sqlQueryId, cancelables); // Don't call cancel() on the cancelables. That just cancels native queries, which is useless here. Instead, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java index c35ca39a9f04..4526754cf573 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java @@ -52,6 +52,7 @@ import org.apache.druid.msq.sql.MSQTaskQueryMaker; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.calcite.run.QueryMaker; @@ -127,6 +128,9 @@ public DartQueryMaker( @Override public QueryResponse runQuery(DruidQuery druidQuery) { + if (!plannerContext.getAuthorizationResult().allowAccessWithNoRestriction()) { + throw new ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage()); + } final MSQSpec querySpec = MSQTaskQueryMaker.makeQuerySpec( null, druidQuery, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/MSQResourceUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/MSQResourceUtils.java index 8820b4ead5a0..ade376066138 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/MSQResourceUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/MSQResourceUtils.java @@ -19,7 +19,7 @@ package org.apache.druid.msq.rpc; -import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -41,10 +41,14 @@ public static void authorizeAdminRequest( { final List resourceActions = permissionMapper.getAdminPermissions(); - Access access = AuthorizationUtils.authorizeAllResourceActions(request, resourceActions, authorizerMapper); + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( + request, + resourceActions, + authorizerMapper + ); - if (!access.isAllowed()) { - throw new ForbiddenException(access.toString()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } } @@ -57,10 +61,14 @@ public static void authorizeQueryRequest( { final List resourceActions = permissionMapper.getQueryPermissions(queryId); - Access access = AuthorizationUtils.authorizeAllResourceActions(request, resourceActions, authorizerMapper); + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( + request, + resourceActions, + authorizerMapper + ); - if (!access.isAllowed()) { - throw new ForbiddenException(access.toString()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 5462b9917376..9f69396edcfc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -54,6 +54,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; @@ -116,6 +117,9 @@ public class MSQTaskQueryMaker implements QueryMaker @Override public QueryResponse runQuery(final DruidQuery druidQuery) { + if (!plannerContext.getAuthorizationResult().allowAccessWithNoRestriction()) { + throw new ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage()); + } Hook.QUERY_PLAN.run(druidQuery.getQuery()); plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, druidQuery.getQuery()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index c92bfa955fb6..e26969c67619 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -74,9 +74,9 @@ import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.QueryResponse; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -484,7 +484,13 @@ private Response buildTaskResponse(Sequence sequence, AuthenticationRe } String taskId = String.valueOf(firstRow[0]); - Optional statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ, false); + Optional statementResult = getStatementStatus( + taskId, + authenticationResult, + true, + Action.READ, + false + ); if (statementResult.isPresent()) { return Response.status(Response.Status.OK).entity(statementResult.get()).build(); @@ -585,7 +591,11 @@ private Optional getStatementStatus( } // since we need the controller payload for auth checks. - MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction); + MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission( + queryId, + authenticationResult, + forAction + ); SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus); MSQTaskReportPayload taskReportPayload = null; @@ -640,9 +650,9 @@ private Optional getStatementStatus( * necessary permissions. A user has the necessary permissions if one of the following criteria is satisfied: * 1. The user is the one who submitted the query * 2. The user belongs to a role containing the READ or WRITE permissions over the STATE resource. For endpoints like GET, - * the user should have READ permission for the STATE resource, while for endpoints like DELETE, the user should - * have WRITE permission for the STATE resource. (Note: POST API does not need to check the state permissions since - * the currentUser always equal to the queryUser) + * the user should have READ permission for the STATE resource, while for endpoints like DELETE, the user should + * have WRITE permission for the STATE resource. (Note: POST API does not need to check the state permissions since + * the currentUser always equal to the queryUser) */ private MSQControllerTask getMSQControllerTaskAndCheckPermission( String queryId, @@ -665,21 +675,21 @@ private MSQControllerTask getMSQControllerTaskAndCheckPermission( return msqControllerTask; } - Access access = AuthorizationUtils.authorizeAllResourceActions( + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( authenticationResult, Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, forAction)), authorizerMapper ); - if (access.isAllowed()) { - return msqControllerTask; + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(StringUtils.format( + "The current user[%s] cannot view query id[%s] since the query is owned by another user", + currentUser, + queryId + )); } - throw new ForbiddenException(StringUtils.format( - "The current user[%s] cannot view query id[%s] since the query is owned by another user", - currentUser, - queryId - )); + return msqControllerTask; } /** @@ -990,7 +1000,11 @@ private T contactOverlord(final ListenableFuture future, String queryId) private static DruidException queryNotFoundException(String queryId) { - return NotFound.exception("Query [%s] was not found. The query details are no longer present or might not be of the type [%s]. Verify that the id is correct.", queryId, MSQControllerTask.TYPE); + return NotFound.exception( + "Query [%s] was not found. The query details are no longer present or might not be of the type [%s]. Verify that the id is correct.", + queryId, + MSQControllerTask.TYPE + ); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java index 981f96fbe2a8..10b5f20e4187 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java @@ -509,6 +509,61 @@ public void test_doPost_regularUser_forbidden() ); } + @Test + public void test_doPost_regularUser_restricted_throwsForbidden() + { + final MockAsyncContext asyncContext = new MockAsyncContext(); + final MockHttpServletResponse asyncResponse = new MockHttpServletResponse(); + asyncContext.response = asyncResponse; + + Mockito.when(httpServletRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .thenReturn(makeAuthenticationResult(REGULAR_USER_NAME)); + Mockito.when(httpServletRequest.startAsync()) + .thenReturn(asyncContext); + + final SqlQuery sqlQuery = new SqlQuery( + StringUtils.format("SELECT * FROM \"%s\"", CalciteTests.RESTRICTED_DATASOURCE), + ResultFormat.ARRAY, + false, + false, + false, + Collections.emptyMap(), + Collections.emptyList() + ); + + ForbiddenException e = Assertions.assertThrows( + ForbiddenException.class, + () -> sqlResource.doPost(sqlQuery, httpServletRequest) + ); + Assertions.assertEquals("Unauthorized", e.getMessage()); + } + + @Test + public void test_doPost_superUser_restricted_throwsServerError() + { + final MockAsyncContext asyncContext = new MockAsyncContext(); + final MockHttpServletResponse asyncResponse = new MockHttpServletResponse(); + asyncContext.response = asyncResponse; + + Mockito.when(httpServletRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .thenReturn(makeAuthenticationResult(CalciteTests.TEST_SUPERUSER_NAME)); + Mockito.when(httpServletRequest.startAsync()) + .thenReturn(asyncContext); + + final SqlQuery sqlQuery = new SqlQuery( + StringUtils.format("SELECT * FROM \"%s\"", CalciteTests.RESTRICTED_DATASOURCE), + ResultFormat.ARRAY, + false, + false, + false, + Collections.emptyMap(), + Collections.emptyList() + ); + Assertions.assertNull(sqlResource.doPost(sqlQuery, httpServletRequest)); + // Super user can run a dart query, but we don't support it yet. + Assertions.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), asyncResponse.getStatus()); + } + @Test public void test_doPost_regularUser_runtimeError() throws IOException { @@ -571,7 +626,9 @@ public void test_doPost_regularUser_fullReport() throws Exception final List> reportMaps = objectMapper.readValue( asyncResponse.baos.toByteArray(), - new TypeReference<>() {} + new TypeReference<>() + { + } ); Assertions.assertEquals(1, reportMaps.size()); @@ -610,7 +667,9 @@ public void test_doPost_regularUser_runtimeError_fullReport() throws Exception final List> reportMaps = objectMapper.readValue( asyncResponse.baos.toByteArray(), - new TypeReference<>() {} + new TypeReference<>() + { + } ); Assertions.assertEquals(1, reportMaps.size()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 39df97297162..130de0f60aaf 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -74,6 +74,7 @@ import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.filtration.Filtration; @@ -795,6 +796,20 @@ public void testSelectWithGroupByLimit(String contextName, Map c } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testSelectRestricted(String contextName, Map context) + { + testSelectQuery() + .setSql("select count(*) from druid.restrictedDatasource_m1_is_6") + .setQueryContext(context) + .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(ForbiddenException.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Unauthorized")) + )) + .verifyExecutionError(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testSelectLookup(String contextName, Map context) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java index 79a3e8993a8c..1d0f2622ce30 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java @@ -27,8 +27,8 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -67,7 +67,7 @@ public static List getReportListFromSavedParseExceptions( * * @return authorization result */ - public static Access datasourceAuthorizationCheck( + public static AuthorizationResult datasourceAuthorizationCheck( final HttpServletRequest req, Action action, String datasource, @@ -79,12 +79,11 @@ public static Access datasourceAuthorizationCheck( action ); - Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper); - if (!access.isAllowed()) { - throw new ForbiddenException(access.toString()); + AuthorizationResult authResult = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } - - return access; + return authResult; } public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final Task task) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index fc2b00ad6f58..74656dfdb5f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -66,9 +66,9 @@ import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.http.security.StateResourceFilter; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -177,14 +177,13 @@ public Response taskPost( .build(); } - Access authResult = AuthorizationUtils.authorizeAllResourceActions( + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( req, resourceActions, authorizerMapper ); - - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.getMessage()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } return asLeaderWith( @@ -609,16 +608,17 @@ public Response getTasks( new Resource(dataSource, ResourceType.DATASOURCE), Action.READ ); - final Access authResult = AuthorizationUtils.authorizeResourceAction( + final AuthorizationResult authResult = AuthorizationUtils.authorizeResourceAction( req, resourceAction, authorizerMapper ); - if (!authResult.isAllowed()) { + + if (!authResult.allowAccessWithNoRestriction()) { throw new WebApplicationException( Response.status(Response.Status.FORBIDDEN) .type(MediaType.TEXT_PLAIN) - .entity(StringUtils.format("Access-Check-Result: %s", authResult.toString())) + .entity(StringUtils.format("Access-Check-Result: %s", authResult.getErrorMessage())) .build() ); } @@ -654,7 +654,7 @@ public Response killPendingSegments( { final Interval deleteInterval = Intervals.of(deleteIntervalString); // check auth for dataSource - final Access authResult = AuthorizationUtils.authorizeAllResourceActions( + final AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( request, ImmutableList.of( new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.READ), @@ -663,8 +663,8 @@ public Response killPendingSegments( authorizerMapper ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.getMessage()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } if (overlord.isLeader()) { @@ -678,7 +678,12 @@ public Response killPendingSegments( .build(); } catch (Exception e) { - log.warn(e, "Failed to delete pending segments for datasource[%s] and interval[%s].", dataSource, deleteInterval); + log.warn( + e, + "Failed to delete pending segments for datasource[%s] and interval[%s].", + dataSource, + deleteInterval + ); return Response.status(Status.INTERNAL_SERVER_ERROR) .entity(ImmutableMap.of("error", e.getMessage())) .build(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java index c4be66719913..0265d20c1dee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilter.java @@ -30,8 +30,8 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.http.security.AbstractResourceFilter; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -97,14 +97,14 @@ public boolean apply(PathSegment input) AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR : AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR; - Access authResult = AuthorizationUtils.authorizeAllResourceActions( + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( getReq(), Iterables.transform(spec.getDataSources(), resourceActionFunction), getAuthorizerMapper() ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } return request; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java index a9f66ce30e72..2d23c443125d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java @@ -29,7 +29,7 @@ import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.http.security.AbstractResourceFilter; -import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -92,14 +92,14 @@ public ContainerRequest filter(ContainerRequest request) getAction(request) ); - final Access authResult = AuthorizationUtils.authorizeResourceAction( + final AuthorizationResult authResult = AuthorizationUtils.authorizeResourceAction( getReq(), resourceAction, getAuthorizerMapper() ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } return request; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java index 75618ddae42a..9c2bf31a18d5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java @@ -23,9 +23,9 @@ import com.google.inject.Inject; import org.apache.druid.client.indexing.SamplerResponse; import org.apache.druid.client.indexing.SamplerSpec; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -72,14 +72,14 @@ public SamplerResponse post(final SamplerSpec sampler, @Context final HttpServle resourceActions.addAll(sampler.getInputSourceResources()); } - Access authResult = AuthorizationUtils.authorizeAllResourceActions( + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( req, resourceActions, authorizerMapper ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.getMessage()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } return sampler.sample(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 130f617d59d1..3190835c3e67 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -41,9 +41,9 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -142,14 +142,14 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe .build(); } - Access authResult = AuthorizationUtils.authorizeAllResourceActions( + AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions( req, resourceActions, authorizerMapper ); - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); + if (!authResult.allowAccessWithNoRestriction()) { + throw new ForbiddenException(authResult.getErrorMessage()); } manager.createOrUpdateAndStartSupervisor(spec); @@ -410,13 +410,16 @@ public Response shutdown(@PathParam("id") final String id) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) - public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest) + public Response handoffTaskGroups( + @PathParam("id") final String id, + @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest + ) { List taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds(); if (CollectionUtils.isNullOrEmpty(taskGroupIds)) { return Response.status(Response.Status.BAD_REQUEST) - .entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty")) - .build(); + .entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty")) + .build(); } return asLeaderWithSupervisorManager( @@ -426,14 +429,20 @@ public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull fin return Response.ok().build(); } else { return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id))) - .build(); + .entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id))) + .build(); } } catch (NotImplementedException e) { return Response.status(Response.Status.BAD_REQUEST) - .entity(ImmutableMap.of("error", StringUtils.format("Supervisor [%s] does not support early handoff", id))) - .build(); + .entity(ImmutableMap.of( + "error", + StringUtils.format( + "Supervisor [%s] does not support early handoff", + id + ) + )) + .build(); } } ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index fddd8f8e4536..b593b841ed44 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -89,8 +89,8 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.segment.realtime.appenderator.StreamAppenderator; import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; -import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CollectionUtils; @@ -144,7 +144,8 @@ * @param Sequence Number Type */ @SuppressWarnings("CheckReturnValue") -public abstract class SeekableStreamIndexTaskRunner implements ChatHandler +public abstract class SeekableStreamIndexTaskRunner + implements ChatHandler { private static final String CTX_KEY_LOOKUP_TIER = "lookupTier"; @@ -278,12 +279,11 @@ public SeekableStreamIndexTaskRunner( rejectionPeriodUpdaterExec = Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d"); if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) { - rejectionPeriodUpdaterExec - .scheduleWithFixedDelay( - this::refreshMinMaxMessageTime, - ioConfig.getRefreshRejectionPeriodsInMinutes(), - ioConfig.getRefreshRejectionPeriodsInMinutes(), - TimeUnit.MINUTES); + rejectionPeriodUpdaterExec.scheduleWithFixedDelay(this::refreshMinMaxMessageTime, + ioConfig.getRefreshRejectionPeriodsInMinutes(), + ioConfig.getRefreshRejectionPeriodsInMinutes(), + TimeUnit.MINUTES + ); } resetNextCheckpointTime(); } @@ -759,10 +759,18 @@ public void onFailure(Throwable t) if (System.currentTimeMillis() > nextCheckpointTime) { sequenceToCheckpoint = getLastSequenceMetadata(); - log.info("Next checkpoint time, updating sequenceToCheckpoint, SequenceToCheckpoint: [%s]", sequenceToCheckpoint); + log.info( + "Next checkpoint time, updating sequenceToCheckpoint, SequenceToCheckpoint: [%s]", + sequenceToCheckpoint + ); } if (pushTriggeringAddResult != null) { - log.info("Hit the row limit updating sequenceToCheckpoint, SequenceToCheckpoint: [%s], rowInSegment: [%s], TotalRows: [%s]", sequenceToCheckpoint, pushTriggeringAddResult.getNumRowsInSegment(), pushTriggeringAddResult.getTotalNumRowsInAppenderator()); + log.info( + "Hit the row limit updating sequenceToCheckpoint, SequenceToCheckpoint: [%s], rowInSegment: [%s], TotalRows: [%s]", + sequenceToCheckpoint, + pushTriggeringAddResult.getNumRowsInSegment(), + pushTriggeringAddResult.getTotalNumRowsInAppenderator() + ); } if (sequenceToCheckpoint != null && stillReading) { @@ -1128,14 +1136,14 @@ private synchronized void persistSequences() throws IOException /** * Return a map of reports for the task. - * + *

* A successfull task should always have a null errorMsg. Segments availability is inherently confirmed * if the task was succesful. - * + *

* A falied task should always have a non-null errorMsg. Segment availability is never confirmed if the task * was not successful. * - * @param errorMsg Nullable error message for the task. null if task succeeded. + * @param errorMsg Nullable error message for the task. null if task succeeded. * @param handoffWaitMs Milliseconds waited for segments to be handed off. * @return Map of reports for the task. */ @@ -1446,7 +1454,7 @@ protected void sendResetRequestAndWait( * * @return authorization result */ - private Access authorizationCheck(final HttpServletRequest req, Action action) + private AuthorizationResult authorizationCheck(final HttpServletRequest req, Action action) { return IndexTaskUtils.datasourceAuthorizationCheck(req, action, task.getDataSource(), authorizerMapper); } @@ -2023,9 +2031,7 @@ private boolean verifyRecordInRange( * * @param toolbox task toolbox * @param checkpointsString the json-serialized checkpoint string - * * @return checkpoint - * * @throws IOException jsonProcessingException */ @Nullable @@ -2039,7 +2045,6 @@ protected abstract TreeMap> ge * This is what would become the start offsets of the next reader, if we stopped reading now. * * @param sequenceNumber the sequence number that has already been processed - * * @return next sequence number to be stored */ protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType sequenceNumber); @@ -2049,7 +2054,6 @@ protected abstract TreeMap> ge * * @param mapper json objectMapper * @param object metadata - * * @return SeekableStreamEndSequenceNumbers */ protected abstract SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata( @@ -2063,9 +2067,7 @@ protected abstract SeekableStreamEndSequenceNumbers createDataSourceMetadata( @@ -2089,7 +2090,6 @@ protected abstract SeekableStreamDataSourceMetadata createSequenceNumber(SequenceOffsetType sequenceNumber); @@ -2117,7 +2117,11 @@ private void refreshMinMaxMessageTime() minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); - log.info(StringUtils.format("Updated min and max messsage times to %s and %s respectively.", minMessageTime, maxMessageTime)); + log.info(StringUtils.format( + "Updated min and max messsage times to %s and %s respectively.", + minMessageTime, + maxMessageTime + )); } public boolean withinMinMaxRecordTime(final InputRow row) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 1fd7af69e123..f07c6c13ab88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -114,13 +114,13 @@ public Authorizer getAuthorizer(String name) } else { if (resource.getType().equals(ResourceType.DATASOURCE)) { if (resource.getName().equals("datasource2")) { - return new Access(false, "not authorized."); + return Access.deny("not authorized."); } else { return Access.OK; } } else if (resource.getType().equals(ResourceType.EXTERNAL)) { if (resource.getName().equals("test")) { - return new Access(false, "not authorized."); + return Access.deny("not authorized."); } else { return Access.OK; } diff --git a/pom.xml b/pom.xml index c67df8127a03..36618adacdd8 100644 --- a/pom.xml +++ b/pom.xml @@ -1647,12 +1647,7 @@ check - - org.codehaus.mojo.signature - - java18 - 1.0 - + diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 360c339627f9..7c5f52d08fee 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -23,12 +23,16 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; +import org.apache.druid.query.policy.Policy; import org.apache.druid.segment.SegmentReference; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Collectors; /** * Represents a source... of data... for a query. Analogous to the "FROM" clause in SQL. @@ -43,7 +47,8 @@ @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"), @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable"), @JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest"), - @JsonSubTypes.Type(value = FilteredDataSource.class, name = "filter") + @JsonSubTypes.Type(value = FilteredDataSource.class, name = "filter"), + @JsonSubTypes.Type(value = RestrictedDataSource.class, name = "restrict") }) public interface DataSource { @@ -88,11 +93,11 @@ public interface DataSource /** * Returns true if this datasource can be the base datasource of query processing. - * + *

* Base datasources drive query processing. If the base datasource is {@link TableDataSource}, for example, queries * are processed in parallel on data servers. If the base datasource is {@link InlineDataSource}, queries are * processed on the Broker. See {@link DataSourceAnalysis#getBaseDataSource()} for further discussion. - * + *

* Datasources that are *not* concrete must be pre-processed in some way before they can be processed by the main * query stack. For example, {@link QueryDataSource} must be executed first and substituted with its results. * @@ -118,6 +123,29 @@ public interface DataSource */ DataSource withUpdatedDataSource(DataSource newSource); + /** + * Returns the query with an updated datasource based on the policy restrictions on tables. + *

+ * If this datasource contains no table, no changes should occur. + * + * @param policyMap a mapping of table names to policy restrictions. A missing key is different from an empty value: + *