Cleanup OwnerSupervisor a bit 21/97321/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 24 Aug 2021 21:37:13 +0000 (23:37 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 25 Aug 2021 09:39:28 +0000 (09:39 +0000)
Use streaming to reduce funky copy/remove/remove cycle. This way
we perform minimal needed materialization.

Change-Id: I0cf25fc7bb57e2d053cb4368cdd4ac8905995e42
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java

index 5210f43dc66cbb9ade382f301cd8c42a2ca9844f..e56ed59f3cfcd7f8cc521c0b3bb50eb557e44330 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
@@ -33,13 +34,13 @@ import com.google.common.collect.Sets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
@@ -234,7 +235,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
             return;
         }
 
-        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
 
         LOG.debug("currently present candidates: {}", currentlyPresent);
@@ -297,7 +298,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
-        return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
+        return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
     }
 
     private void assignOwnerFor(final DOMEntity entity) {
@@ -413,21 +414,21 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private Set<String> getActiveMembers() {
-        final Set<String> members = new HashSet<>();
-        cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
-        // filter out unreachable
-        members.removeAll(cluster.state().getUnreachable().stream()
-                .map(OwnerSupervisor::extractRole)
-                .collect(Collectors.toSet()));
-
-        // filter out members not from our datacenter
-        cluster.state().getMembers().forEach(member -> {
-            if (!member.roles().contains(dataCenter)) {
-                members.remove(extractRole(member));
-            }
-        });
-
-        return members;
+        final CurrentClusterState clusterState = cluster.state();
+        final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
+            .map(OwnerSupervisor::extractRole)
+            .collect(Collectors.toSet());
+
+        return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
+            // We are evaluating the set of roles for each member
+            .map(Member::getRoles)
+            // Filter out any members which do not share our dataCenter
+            .filter(roles -> roles.contains(dataCenter))
+            // Find first legal role
+            .map(OwnerSupervisor::extractRole)
+            // filter out unreachable roles
+            .filter(role -> !unreachableRoles.contains(role))
+            .collect(Collectors.toSet());
     }
 
     private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
@@ -443,7 +444,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
 
-    private String extractDatacenterRole(final Member member) {
+    private static String extractDatacenterRole(final Member member) {
         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }