Fix incorrect remove call in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index 0f6425e..8386e66 100644 (file)
@@ -18,6 +18,7 @@ import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
@@ -195,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
         } else if (message instanceof ClusterEvent.MemberExited){
             memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -304,7 +307,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
-        shardReplicaOperationsInProgress.remove(shardId);
+        shardReplicaOperationsInProgress.remove(shardId.getShardName());
 
         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
 
@@ -346,7 +349,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
-                    shardReplicaOperationsInProgress.add(shardName);
+                    shardReplicaOperationsInProgress.remove(shardName);
                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
                             primaryPath, shardName);
 
@@ -733,7 +736,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -746,7 +749,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberExited(ClusterEvent.MemberExited message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
         peerAddressResolver.removePeerAddress(memberName);
@@ -759,14 +762,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void memberUp(ClusterEvent.MemberUp message) {
         MemberName memberName = memberToName(message.member());
 
-        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        addPeerAddress(memberName, message.member().address());
+        memberUp(memberName, message.member().address());
+    }
 
+    private void memberUp(MemberName memberName, Address address) {
+        addPeerAddress(memberName, address);
         checkReady();
     }
 
+    private void memberWeaklyUp(MemberWeaklyUp message) {
+        MemberName memberName = memberToName(message.member());
+
+        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberUp(memberName, message.member().address());
+    }
+
     private void addPeerAddress(MemberName memberName, Address address) {
         peerAddressResolver.addPeerAddress(memberName, address);
 
@@ -781,7 +796,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         MemberName memberName = memberToName(message.member());
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+        LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
 
@@ -790,7 +805,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
         MemberName memberName = memberToName(message.member());
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+        LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
     }
@@ -1079,12 +1094,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
-                getSelf().tell(new RunnableMessage() {
-                    @Override
-                    public void run() {
-                        addShard(getShardName(), response, getSender());
-                    }
-                }, getTargetActor());
+                getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
             }
 
             @Override
@@ -1241,12 +1251,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             private void doRemoveShardReplicaAsync(final String primaryPath) {
-                getSelf().tell(new RunnableMessage() {
-                    @Override
-                    public void run() {
-                        removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
-                    }
-                }, getTargetActor());
+                getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
             }
         });
     }
@@ -1365,12 +1370,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                             String.format("Failed to find local shard %s", shardName), failure)), self());
                 } else {
                     if(response instanceof LocalShardFound) {
-                        getSelf().tell(new RunnableMessage() {
-                            @Override
-                            public void run() {
-                                onLocalShardFound.accept((LocalShardFound) response);
-                            }
-                        }, sender);
+                        getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
                     } else if(response instanceof LocalShardNotFound) {
                         String msg = String.format("Local shard %s does not exist", shardName);
                         LOG.debug ("{}: {}", persistenceId, msg);