Skip to content

Commit

Permalink
refactor(policy): refactor policy locking, no functional difference (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 3, 2023
1 parent 148ad1a commit 7ff48b3
Showing 1 changed file with 55 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -55,7 +56,8 @@ public enum AuthorizationMode {
// Maps privilege name to the associated set of policies for fast access.
// Not concurrent data structure because writes are always against the entire thing.
private final Map<String, List<DataHubPolicyInfo>> _policyCache = new HashMap<>(); // Shared Policy Cache.
private final ReadWriteLock _lockPolicyCache = new ReentrantReadWriteLock();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();

private final ScheduledExecutorService _refreshExecutorService = Executors.newScheduledThreadPool(1);
private final PolicyRefreshRunnable _policyRefreshRunnable;
Expand All @@ -74,7 +76,7 @@ public DataHubAuthorizer(
_systemAuthentication = Objects.requireNonNull(systemAuthentication);
_mode = Objects.requireNonNull(mode);
_policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient));
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, _lockPolicyCache);
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, readWriteLock.writeLock());
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
}

Expand All @@ -93,41 +95,30 @@ public AuthorizationResult authorize(@Nonnull final AuthorizationRequest request

Optional<ResolvedEntitySpec> resolvedResourceSpec = request.getResourceSpec().map(_entitySpecResolver::resolve);

_lockPolicyCache.readLock().lock();
try {
// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
}
// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
} finally {
_lockPolicyCache.readLock().unlock();
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
}

public List<String> getGrantedPrivileges(final String actor, final Optional<EntitySpec> resourceSpec) {
// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = getOrDefault(ALL, new ArrayList<>());

_lockPolicyCache.readLock().lock();
try {
// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>());

Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));
Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);
Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
} finally {
_lockPolicyCache.readLock().unlock();
}
return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
}

/**
Expand All @@ -143,36 +134,31 @@ public AuthorizedActors authorizedActors(
boolean allUsers = false;
boolean allGroups = false;

_lockPolicyCache.readLock().lock();
try {
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>());

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = getOrDefault(privilege, new ArrayList<>());

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}
// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}

final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);
final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);

// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;
}
// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;
}
} finally {
_lockPolicyCache.readLock().unlock();
}

// Step 4: Return all authorized users and groups.
return new AuthorizedActors(privilege, authorizedUsers, authorizedGroups, allUsers, allGroups);
}
Expand Down Expand Up @@ -234,6 +220,16 @@ private Optional<Urn> getUrnFromRequestActor(String actor) {
}
}

private List<DataHubPolicyInfo> getOrDefault(String key, List<DataHubPolicyInfo> defaultValue) {
readLock.lock();
try {
return _policyCache.getOrDefault(key, defaultValue);
} finally {
// To unlock the acquired read thread
readLock.unlock();
}
}

/**
* A {@link Runnable} used to periodically fetch a new instance of the policies Cache.
*
Expand All @@ -247,7 +243,7 @@ static class PolicyRefreshRunnable implements Runnable {
private final Authentication _systemAuthentication;
private final PolicyFetcher _policyFetcher;
private final Map<String, List<DataHubPolicyInfo>> _policyCache;
private final ReadWriteLock _lockPolicyCache;
private final Lock writeLock;

@Override
public void run() {
Expand All @@ -274,13 +270,16 @@ public void run() {
return;
}
}
_lockPolicyCache.writeLock().lock();

writeLock.lock();
try {
_policyCache.clear();
_policyCache.putAll(newCache);
} finally {
_lockPolicyCache.writeLock().unlock();
// To unlock the acquired write thread
writeLock.unlock();
}

log.debug(String.format("Successfully fetched %s policies.", total));
} catch (Exception e) {
log.error("Caught exception while loading Policy cache. Will retry on next scheduled attempt.", e);
Expand Down

0 comments on commit 7ff48b3

Please sign in to comment.