BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 7dcc52028cc28f7b3211565478be382845b3b2f5..616f56c466bbac02194460dca1d07b4a57b2569f 100644 (file)
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -85,6 +86,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -138,7 +140,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
-    private final String id;
+    private final String persistenceId;
 
     /**
      */
@@ -154,7 +156,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.primaryShardInfoCache = builder.primaryShardInfoCache;
         this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
-        id = "shard-manager-" + type;
+        String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
+        persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
@@ -212,19 +215,44 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onCreateShard((CreateShard)message);
         } else if(message instanceof AddShardReplica){
             onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof ForwardedAddServerReply) {
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerFailure) {
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+            onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+        } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+            ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+            addShard(msg.shardName, msg.primaryFound, getSender());
         } else if(message instanceof RemoveShardReplica){
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if(message instanceof ServerRemoved){
+            onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
                 persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
+    }
 
+    private void onShardReplicaRemoved(ServerRemoved message) {
+        final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+        final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+        if(shardInformation == null) {
+            LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+            return;
+        } else if(shardInformation.getActor() != null) {
+            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+        }
+        LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+        persistShardList();
     }
 
     private void onGetSnapshot() {
@@ -818,7 +846,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public String persistenceId() {
-        return id;
+        return persistenceId;
     }
 
     @VisibleForTesting
@@ -840,7 +868,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
-        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+        LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
 
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
@@ -873,8 +901,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                         String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
                 } else {
                     if(response instanceof RemotePrimaryShardFound) {
-                        RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
-                        addShard (shardName, message, sender);
+                        self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+                                (RemotePrimaryShardFound)response), sender);
                     } else if(response instanceof LocalPrimaryShardFound) {
                         sendLocalReplicaAlreadyExistsReply(shardName, sender);
                     } else {
@@ -936,16 +964,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
-                shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
-                            response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
+                    String msg = String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName);
+                    self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
                 } else {
-                    AddServerReply reply = (AddServerReply)addServerResponse;
-                    onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
+                    self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+                            response.getPrimaryPath(), removeShardOnFailure), sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
@@ -953,6 +981,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
             boolean removeShardOnFailure) {
+        shardReplicaOperationsInProgress.remove(shardName);
+
         if(removeShardOnFailure) {
             ShardInformation shardInfo = localShards.remove(shardName);
             if (shardInfo.getActor() != null) {
@@ -967,6 +997,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
             String leaderPath, boolean removeShardOnFailure) {
         String shardName = shardInfo.getShardName();
+        shardReplicaOperationsInProgress.remove(shardName);
+
         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
@@ -1054,6 +1086,46 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private static class ForwardedAddServerPrimaryShardFound {
+        String shardName;
+        RemotePrimaryShardFound primaryFound;
+
+        ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+            this.shardName = shardName;
+            this.primaryFound = primaryFound;
+        }
+    }
+
+    private static class ForwardedAddServerReply {
+        ShardInformation shardInfo;
+        AddServerReply addServerReply;
+        String leaderPath;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+                boolean removeShardOnFailure) {
+            this.shardInfo = shardInfo;
+            this.addServerReply = addServerReply;
+            this.leaderPath = leaderPath;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
+    private static class ForwardedAddServerFailure {
+        String shardName;
+        String failureMessage;
+        Throwable failure;
+        boolean removeShardOnFailure;
+
+        ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+                boolean removeShardOnFailure) {
+            this.shardName = shardName;
+            this.failureMessage = failureMessage;
+            this.failure = failure;
+            this.removeShardOnFailure = removeShardOnFailure;
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -1102,6 +1174,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardName;
         }
 
+        @Nullable
         ActorRef getActor(){
             return actor;
         }