BUG-5280: use MemberName instead of String
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 4bd0d677968d1e5dfb638a2ee8a661ec88c1718d..aa2c524fe7cf5acd3c0b49fc8f210df013ff6e44 100644 (file)
@@ -16,7 +16,9 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
@@ -46,6 +48,7 @@ import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -659,12 +662,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
-            @Override
-            public Object get() {
-                return new LocalShardFound(shardInformation.getActor());
-            }
-        });
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
     }
 
     private void sendResponse(ShardInformation shardInformation, boolean doWait,
@@ -674,12 +672,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 final ActorRef sender = getSender();
                 final ActorRef self = self();
 
-                Runnable replyRunnable = new Runnable() {
-                    @Override
-                    public void run() {
-                        sender.tell(messageSupplier.get(), self);
-                    }
-                };
+                Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
 
                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
                     new OnShardInitialized(replyRunnable);
@@ -729,8 +722,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
 
+    @VisibleForTesting
+    static MemberName memberToName(final Member member) {
+        return MemberName.forName(member.roles().iterator().next());
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -743,7 +741,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberExited(ClusterEvent.MemberExited message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -756,7 +754,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
 
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -766,7 +764,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         checkReady();
     }
 
-    private void addPeerAddress(String memberName, Address address) {
+    private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
@@ -779,7 +777,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
@@ -788,16 +786,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
-        String memberName = message.member().roles().iterator().next();
+        MemberName memberName = memberToName(message.member());
         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }
 
-    private void markMemberUnavailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberUnavailable(final MemberName memberName) {
+        final String memberStr = memberName.getName();
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            // XXX: why are we using String#contains() here?
+            if (leaderId != null && leaderId.contains(memberStr)) {
                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
                 info.setLeaderAvailable(false);
 
@@ -808,10 +808,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void markMemberAvailable(final String memberName) {
-        for(ShardInformation info : localShards.values()){
+    private void markMemberAvailable(final MemberName memberName) {
+        final String memberStr = memberName.getName();
+        for (ShardInformation info : localShards.values()) {
             String leaderId = info.getLeaderId();
-            if(leaderId != null && leaderId.contains(memberName)) {
+            // XXX: why are we using String#contains() here?
+            if (leaderId != null && leaderId.contains(memberStr)) {
                 LOG.debug("Marking Leader {} as available.", leaderId);
                 info.setLeaderAvailable(true);
             }
@@ -908,20 +910,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null && info.isActiveMember()) {
-            sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
-                @Override
-                public Object get() {
-                    String primaryPath = info.getSerializedLeaderActor();
-                    Object found = canReturnLocalShardState && info.isLeader() ?
-                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
-                            }
-
-                            return found;
-                }
+            sendResponse(info, message.isWaitUntilReady(), true, () -> {
+                String primaryPath = info.getSerializedLeaderActor();
+                Object found = canReturnLocalShardState && info.isLeader() ?
+                        new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                            new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+                        }
+
+                        return found;
             });
 
             return;
@@ -963,7 +962,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName
      * @return
      */
-    private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+    private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
         return peerAddressResolver.getShardIdentifier(memberName, shardName);
     }
 
@@ -973,7 +972,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      *
      */
     private void createLocalShards() {
-        String memberName = this.cluster.getCurrentMemberName();
+        MemberName memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
@@ -1004,13 +1003,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<String> members = configuration.getMembersFromShardName(shardName);
+        Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         Map<String, String> peerAddresses = new HashMap<>();
 
-        String currentMemberName = this.cluster.getCurrentMemberName();
+        MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
-        for(String memberName : members) {
-            if(!currentMemberName.equals(memberName)) {
+        for (MemberName memberName : members) {
+            if (!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
                 peerAddresses.put(shardId.toString(), address);
@@ -1023,13 +1022,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public SupervisorStrategy supervisorStrategy() {
 
         return new OneForOneStrategy(10, Duration.create("1 minute"),
-                new Function<Throwable, SupervisorStrategy.Directive>() {
-            @Override
-            public SupervisorStrategy.Directive apply(Throwable t) {
-                LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
-                return SupervisorStrategy.resume();
-            }
-        }
+                (Function<Throwable, Directive>) t -> {
+                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+                    return SupervisorStrategy.resume();
+                }
                 );
 
     }
@@ -1259,7 +1255,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
 
-        String currentMember = cluster.getCurrentMemberName();
+        final MemberName currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
         for (String shard : currentSnapshot.getShardList()) {