Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix): Avoid java.util.ConcurrentModificationException #9090

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -53,6 +55,7 @@ public enum AuthorizationMode {
// Maps privilege name to the associated set of policies for fast access.
// Not concurrent data structure because writes are always against the entire thing.
private final Map<String, List<DataHubPolicyInfo>> _policyCache = new HashMap<>(); // Shared Policy Cache.
private final ReadWriteLock _lockPolicyCache = new ReentrantReadWriteLock();

private final ScheduledExecutorService _refreshExecutorService = Executors.newScheduledThreadPool(1);
private final PolicyRefreshRunnable _policyRefreshRunnable;
Expand All @@ -71,7 +74,7 @@ public DataHubAuthorizer(
_systemAuthentication = Objects.requireNonNull(systemAuthentication);
_mode = Objects.requireNonNull(mode);
_policyEngine = new PolicyEngine(systemAuthentication, Objects.requireNonNull(entityClient));
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache);
_policyRefreshRunnable = new PolicyRefreshRunnable(systemAuthentication, new PolicyFetcher(entityClient), _policyCache, _lockPolicyCache);
_refreshExecutorService.scheduleAtFixedRate(_policyRefreshRunnable, delayIntervalSeconds, refreshIntervalSeconds, TimeUnit.SECONDS);
}

Expand All @@ -90,31 +93,41 @@ public AuthorizationResult authorize(@Nonnull final AuthorizationRequest request

Optional<ResolvedEntitySpec> resolvedResourceSpec = request.getResourceSpec().map(_entitySpecResolver::resolve);

// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
_lockPolicyCache.readLock().lock();
try {
// 1. Fetch the policies relevant to the requested privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(request.getPrivilege(), new ArrayList<>());

// 2. Evaluate each policy.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (isRequestGranted(policy, request, resolvedResourceSpec)) {
// Short circuit if policy has granted privileges to this actor.
return new AuthorizationResult(request, AuthorizationResult.Type.ALLOW,
String.format("Granted by policy with type: %s", policy.getType()));
}
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
} finally {
_lockPolicyCache.readLock().unlock();
}
return new AuthorizationResult(request, AuthorizationResult.Type.DENY, null);
}

public List<String> getGrantedPrivileges(final String actor, final Optional<EntitySpec> resourceSpec) {

// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>());
_lockPolicyCache.readLock().lock();
try {
// 1. Fetch all policies
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(ALL, new ArrayList<>());

Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));
Urn actorUrn = UrnUtils.getUrn(actor);
final ResolvedEntitySpec resolvedActorSpec = _entitySpecResolver.resolve(new EntitySpec(actorUrn.getEntityType(), actor));

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);
Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
return _policyEngine.getGrantedPrivileges(policiesToEvaluate, resolvedActorSpec, resolvedResourceSpec);
} finally {
_lockPolicyCache.readLock().unlock();
}
}

/**
Expand All @@ -124,36 +137,42 @@ public List<String> getGrantedPrivileges(final String actor, final Optional<Enti
public AuthorizedActors authorizedActors(
final String privilege,
final Optional<EntitySpec> resourceSpec) {
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>());

Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

final List<Urn> authorizedUsers = new ArrayList<>();
final List<Urn> authorizedGroups = new ArrayList<>();
boolean allUsers = false;
boolean allGroups = false;

// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}
_lockPolicyCache.readLock().lock();
try {
// Step 1: Find policies granting the privilege.
final List<DataHubPolicyInfo> policiesToEvaluate = _policyCache.getOrDefault(privilege, new ArrayList<>());

final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);
Optional<ResolvedEntitySpec> resolvedResourceSpec = resourceSpec.map(_entitySpecResolver::resolve);

// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;

// Step 2: For each policy, determine whether the resource is a match.
for (DataHubPolicyInfo policy : policiesToEvaluate) {
if (!PoliciesConfig.ACTIVE_POLICY_STATE.equals(policy.getState())) {
// Policy is not active, skip.
continue;
}

final PolicyEngine.PolicyActors matchingActors = _policyEngine.getMatchingActors(policy, resolvedResourceSpec);

// Step 3: For each matching policy, add actors that are authorized.
authorizedUsers.addAll(matchingActors.getUsers());
authorizedGroups.addAll(matchingActors.getGroups());
if (matchingActors.allUsers()) {
allUsers = true;
}
if (matchingActors.allGroups()) {
allGroups = true;
}
}
} finally {
_lockPolicyCache.readLock().unlock();
}

// Step 4: Return all authorized users and groups.
return new AuthorizedActors(privilege, authorizedUsers, authorizedGroups, allUsers, allGroups);
}
Expand Down Expand Up @@ -228,6 +247,7 @@ static class PolicyRefreshRunnable implements Runnable {
private final Authentication _systemAuthentication;
private final PolicyFetcher _policyFetcher;
private final Map<String, List<DataHubPolicyInfo>> _policyCache;
private final ReadWriteLock _lockPolicyCache;

@Override
public void run() {
Expand All @@ -253,10 +273,13 @@ public void run() {
"Failed to retrieve policy urns! Skipping updating policy cache until next refresh. start: {}, count: {}", start, count, e);
return;
}
synchronized (_policyCache) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronize doesn't have to be inside the while loop

_policyCache.clear();
_policyCache.putAll(newCache);
}
}
_lockPolicyCache.writeLock().lock();
try {
_policyCache.clear();
_policyCache.putAll(newCache);
} finally {
_lockPolicyCache.writeLock().unlock();
}
log.debug(String.format("Successfully fetched %s policies.", total));
} catch (Exception e) {
Expand Down
Loading