From 7ff48b37aaea165ba3c3cb6f9f9f742ea2e37654 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Fri, 3 Nov 2023 10:23:37 -0500 Subject: [PATCH] refactor(policy): refactor policy locking, no functional difference (#9163) --- .../authorization/DataHubAuthorizer.java | 111 +++++++++--------- 1 file changed, 55 insertions(+), 56 deletions(-) diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java index e30fb93109915..f8b28f6c182a7 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authorization/DataHubAuthorizer.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nonnull; @@ -55,7 +56,8 @@ public enum AuthorizationMode { // Maps privilege name to the associated set of policies for fast access. // Not concurrent data structure because writes are always against the entire thing. private final Map> _policyCache = new HashMap<>(); // Shared Policy Cache. - private final ReadWriteLock _lockPolicyCache = new ReentrantReadWriteLock(); + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); private final ScheduledExecutorService _refreshExecutorService = Executors.newScheduledThreadPool(1); private final PolicyRefreshRunnable _policyRefreshRunnable; @@ -74,7 +76,7 @@ public DataHubAuthorizer( _systemAuthentication = Objects.requireNonNull(systemAuthentication); _mode = Objects.requireNonNull(mode); _policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient)); - _policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, _lockPolicyCache); + _policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, readWriteLock.writeLock()); _refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS); } @@ -93,41 +95,30 @@ public AuthorizationResult authorize(@Nonnull final AuthorizationRequest request Optional resolvedResourceSpec = request.getResourceSpec().map(_entitySpecResolver::resolve); - _lockPolicyCache.readLock().lock(); - try { - // 1. Fetch the policies relevant to the requested privilege. - final List policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>()); - - // 2. Evaluate each policy. - for (DataHubPolicyInfo policy : policiesToEvaluate) { - if (isRequestGranted(policy, request, resolvedResourceSpec)) { - // Short circuit if policy has granted privileges to this actor. - return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW, - String.format("Granted by policy with type: %s", policy.getType())); - } + // 1. Fetch the policies relevant to the requested privilege. + final List policiesToEvaluate = getOrDefault(request.getPrivilege(), new ArrayList<>()); + + // 2. Evaluate each policy. + for (DataHubPolicyInfo policy : policiesToEvaluate) { + if (isRequestGranted(policy, request, resolvedResourceSpec)) { + // Short circuit if policy has granted privileges to this actor. + return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW, + String.format("Granted by policy with type: %s", policy.getType())); } - return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null); - } finally { - _lockPolicyCache.readLock().unlock(); } + return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null); } public List getGrantedPrivileges(final String actor, final Optional resourceSpec) { + // 1. Fetch all policies + final List policiesToEvaluate = getOrDefault(ALL, new ArrayList<>()); - _lockPolicyCache.readLock().lock(); - try { - // 1. Fetch all policies - final List policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>()); - - Urn actorUrn = UrnUtils.getUrn(actor); - final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor)); + Urn actorUrn = UrnUtils.getUrn(actor); + final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor)); - Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); + Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); - return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec); - } finally { - _lockPolicyCache.readLock().unlock(); - } + return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec); } /** @@ -143,36 +134,31 @@ public AuthorizedActors authorizedActors( boolean allUsers = false; boolean allGroups = false; - _lockPolicyCache.readLock().lock(); - try { - // Step 1: Find policies granting the privilege. - final List policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>()); - - Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); + // Step 1: Find policies granting the privilege. + final List policiesToEvaluate = getOrDefault(privilege, new ArrayList<>()); + Optional resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve); - // Step 2: For each policy, determine whether the resource is a match. - for (DataHubPolicyInfo policy : policiesToEvaluate) { - if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) { - // Policy is not active, skip. - continue; - } + // Step 2: For each policy, determine whether the resource is a match. + for (DataHubPolicyInfo policy : policiesToEvaluate) { + if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) { + // Policy is not active, skip. + continue; + } - final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec); + final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec); - // Step 3: For each matching policy, add actors that are authorized. - authorizedUsers.addAll(matchingActors.getUsers()); - authorizedGroups.addAll(matchingActors.getGroups()); - if (matchingActors.allUsers()) { - allUsers = true; - } - if (matchingActors.allGroups()) { - allGroups = true; - } + // Step 3: For each matching policy, add actors that are authorized. + authorizedUsers.addAll(matchingActors.getUsers()); + authorizedGroups.addAll(matchingActors.getGroups()); + if (matchingActors.allUsers()) { + allUsers = true; + } + if (matchingActors.allGroups()) { + allGroups = true; } - } finally { - _lockPolicyCache.readLock().unlock(); } + // Step 4: Return all authorized users and groups. return new AuthorizedActors(privilege, authorizedUsers, authorizedGroups, allUsers, allGroups); } @@ -234,6 +220,16 @@ private Optional getUrnFromRequestActor(String actor) { } } + private List getOrDefault(String key, List defaultValue) { + readLock.lock(); + try { + return _policyCache.getOrDefault(key, defaultValue); + } finally { + // To unlock the acquired read thread + readLock.unlock(); + } + } + /** * A {@link Runnable} used to periodically fetch a new instance of the policies Cache. * @@ -247,7 +243,7 @@ static class PolicyRefreshRunnable implements Runnable { private final Authentication _systemAuthentication; private final PolicyFetcher _policyFetcher; private final Map> _policyCache; - private final ReadWriteLock _lockPolicyCache; + private final Lock writeLock; @Override public void run() { @@ -274,13 +270,16 @@ public void run() { return; } } - _lockPolicyCache.writeLock().lock(); + + writeLock.lock(); try { _policyCache.clear(); _policyCache.putAll(newCache); } finally { - _lockPolicyCache.writeLock().unlock(); + // To unlock the acquired write thread + writeLock.unlock(); } + log.debug(String.format("Successfully fetched %s policies.", total)); } catch (Exception e) { log.error("Caught exception while loading Policy cache. Will retry on next scheduled attempt.", e);