Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(policy): refactor policy locking, no functional difference #9163

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -55,7 +56,8 @@ public enum AuthorizationMode {
// Maps privilege name to the associated set of policies for fast access.
// Not concurrent data structure because writes are always against the entire thing.
private final Map<String, List<DataHubPolicyInfo>> _policyCache = new HashMap<>(); // Shared Policy Cache.
private final ReadWriteLock _lockPolicyCache = new ReentrantReadWriteLock();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();

private final ScheduledExecutorService _refreshExecutorService = Executors.newScheduledThreadPool(1);
private final PolicyRefreshRunnable _policyRefreshRunnable;
Expand All @@ -74,7 +76,7 @@ public DataHubAuthorizer(
_systemAuthentication = Objects.requireNonNull(systemAuthentication);
_mode = Objects.requireNonNull(mode);
_policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient));
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, _lockPolicyCache);
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, readWriteLock.writeLock());
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
}

Expand All @@ -93,41 +95,30 @@ public AuthorizationResult authorize(@Nonnull final AuthorizationRequest request

Optional<ResolvedEntitySpec> resolvedResourceSpec = request.getResourceSpec().map(_entitySpecResolver::resolve);

_lockPolicyCache.readLock().lock();
try {
// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
}
// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
} finally {
_lockPolicyCache.readLock().unlock();
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
}

public List<String> getGrantedPrivileges(final String actor, final Optional<EntitySpec> resourceSpec) {
// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = getOrDefault(ALL, new ArrayList<>());

_lockPolicyCache.readLock().lock();
try {
// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>());

Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));
Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);
Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
} finally {
_lockPolicyCache.readLock().unlock();
}
return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
}

/**
Expand All @@ -143,36 +134,31 @@ public AuthorizedActors authorizedActors(
boolean allUsers = false;
boolean allGroups = false;

_lockPolicyCache.readLock().lock();
try {
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>());

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = getOrDefault(privilege, new ArrayList<>());

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}
// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}

final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);
final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);

// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;
}
// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;
}
} finally {
_lockPolicyCache.readLock().unlock();
}

// Step 4: Return all authorized users and groups.
return new AuthorizedActors(privilege, authorizedUsers, authorizedGroups, allUsers, allGroups);
}
Expand Down Expand Up @@ -234,6 +220,16 @@ private Optional<Urn> getUrnFromRequestActor(String actor) {
}
}

private List<DataHubPolicyInfo> getOrDefault(String key, List<DataHubPolicyInfo> defaultValue) {
readLock.lock();
try {
return _policyCache.getOrDefault(key, defaultValue);
} finally {
// To unlock the acquired read thread
readLock.unlock();
}
}

/**
* A {@link Runnable} used to periodically fetch a new instance of the policies Cache.
*
Expand All @@ -247,7 +243,7 @@ static class PolicyRefreshRunnable implements Runnable {
private final Authentication _systemAuthentication;
private final PolicyFetcher _policyFetcher;
private final Map<String, List<DataHubPolicyInfo>> _policyCache;
private final ReadWriteLock _lockPolicyCache;
private final Lock writeLock;

@Override
public void run() {
Expand All @@ -274,13 +270,16 @@ public void run() {
return;
}
}
_lockPolicyCache.writeLock().lock();

writeLock.lock();
try {
_policyCache.clear();
_policyCache.putAll(newCache);
} finally {
_lockPolicyCache.writeLock().unlock();
// To unlock the acquired write thread
writeLock.unlock();
}

log.debug(String.format("Successfully fetched %s policies.", total));
} catch (Exception e) {
log.error("Caught exception while loading Policy cache. Will retry on next scheduled attempt.", e);
Expand Down
Loading