Remove DataChangeListener protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index c39c80021c456b20565bb214da63ee52e515f29d..70bdb2050e8ce27d616a23cc7af25afb6b5f9162 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static akka.pattern.Patterns.ask;
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
@@ -303,13 +302,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onWrappedShardResponse(WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
-            onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
+            onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+                    message.getLeaderPath());
         }
     }
 
-    private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
-        shardReplicaOperationsInProgress.remove(shardName);
-        originalSender.tell(new Status.Success(null), self());
+    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+            String leaderPath) {
+        shardReplicaOperationsInProgress.remove(shardId);
+
+        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+                    shardId.getShardName());
+            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else {
+            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+                    persistenceId(), shardId, replyMsg.getStatus());
+
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+                    leaderPath, shardId);
+            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+        }
     }
 
     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
@@ -356,7 +371,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
                 } else {
                     // SUCCESS
-                    self().tell(new WrappedShardResponse(shardName, response), sender);
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
@@ -369,8 +384,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
         } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(new Shutdown(), self());
         }
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
         persistShardList();
@@ -1299,7 +1314,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
-        private ActorPath actorPath;
         private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
@@ -1347,13 +1361,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return actor;
         }
 
-        ActorPath getActorPath() {
-            return actorPath;
-        }
-
         void setActor(ActorRef actor) {
             this.actor = actor;
-            this.actorPath = actor.path();
         }
 
         ShardIdentifier getShardId() {
@@ -1612,7 +1621,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private CountDownLatch waitTillReadyCountdownLatch;
         private PrimaryShardInfoFutureCache primaryShardInfoCache;
         private DatastoreSnapshot restoreFromSnapshot;
-
         private volatile boolean sealed;
 
         @SuppressWarnings("unchecked")
@@ -1834,21 +1842,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * The WrappedShardResponse class wraps a response from a Shard.
      */
     private static class WrappedShardResponse {
-        private final String shardName;
+        private final ShardIdentifier shardId;
         private final Object response;
+        private final String leaderPath;
 
-        private WrappedShardResponse(String shardName, Object response) {
-            this.shardName = shardName;
+        private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+            this.shardId = shardId;
             this.response = response;
+            this.leaderPath = leaderPath;
         }
 
-        String getShardName() {
-            return shardName;
+        ShardIdentifier getShardId() {
+            return shardId;
         }
 
         Object getResponse() {
             return response;
         }
+
+        String getLeaderPath() {
+            return leaderPath;
+        }
     }
 }