import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.ddata.LWWRegister;
import akka.cluster.ddata.LWWRegisterKey;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
return;
}
- final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+ final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
LOG.debug("currently present candidates: {}", currentlyPresent);
}
private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
- return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
+ return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
}
private void assignOwnerFor(final DOMEntity entity) {
}
private Set<String> getActiveMembers() {
- final Set<String> members = new HashSet<>();
- cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
- // filter out unreachable
- members.removeAll(cluster.state().getUnreachable().stream()
- .map(OwnerSupervisor::extractRole)
- .collect(Collectors.toSet()));
-
- // filter out members not from our datacenter
- cluster.state().getMembers().forEach(member -> {
- if (!member.roles().contains(dataCenter)) {
- members.remove(extractRole(member));
- }
- });
-
- return members;
+ final CurrentClusterState clusterState = cluster.state();
+ final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
+ .map(OwnerSupervisor::extractRole)
+ .collect(Collectors.toSet());
+
+ return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
+ // We are evaluating the set of roles for each member
+ .map(Member::getRoles)
+ // Filter out any members which do not share our dataCenter
+ .filter(roles -> roles.contains(dataCenter))
+ // Find first legal role
+ .map(OwnerSupervisor::extractRole)
+ // filter out unreachable roles
+ .filter(role -> !unreachableRoles.contains(role))
+ .collect(Collectors.toSet());
}
private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
- private String extractDatacenterRole(final Member member) {
+ private static String extractDatacenterRole(final Member member) {
return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}