Make SwitchShardBehavior transport RaftState
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 3e14c5837dfadb2f4878167a74cbc64765d07899..33b3810447532741a30e2e0f7084888646346a6a 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static akka.pattern.Patterns.ask;
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
@@ -19,8 +18,10 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -89,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
@@ -100,6 +102,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -184,7 +187,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void postStop() {
-        LOG.info("Stopping ShardManager");
+        LOG.info("Stopping ShardManager {}", persistenceId());
 
         mBean.unregisterMBean();
     }
@@ -243,25 +246,84 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if (message instanceof SaveSnapshotSuccess) {
+        } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if (message instanceof SaveSnapshotFailure) {
+        } else if(message instanceof SaveSnapshotFailure) {
             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
+        } else if(message instanceof Shutdown) {
+            onShutDown();
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onShutDown() {
+        List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
+            }
+        }
+
+        LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+        combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+                LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+                self().tell(PoisonPill.getInstance(), self());
+
+                if(failure != null) {
+                    LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+                } else {
+                    int nfailed = 0;
+                    for(Boolean r: results) {
+                        if(!r) {
+                            nfailed++;
+                        }
+                    }
+
+                    if(nfailed > 0) {
+                        LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+                    }
+                }
+            }
+        }, dispatcher);
+    }
+
     private void onWrappedShardResponse(WrappedShardResponse message) {
         if (message.getResponse() instanceof RemoveServerReply) {
-            onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
+            onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+                    message.getLeaderPath());
         }
     }
 
-    private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
-        shardReplicaOperationsInProgress.remove(shardName);
-        originalSender.tell(new Status.Success(null), self());
+    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+            String leaderPath) {
+        shardReplicaOperationsInProgress.remove(shardId);
+
+        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+                    shardId.getShardName());
+            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else {
+            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+                    persistenceId(), shardId, replyMsg.getStatus());
+
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+                    leaderPath, shardId);
+            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+        }
     }
 
     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
@@ -308,7 +370,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
                 } else {
                     // SUCCESS
-                    self().tell(new WrappedShardResponse(shardName, response), sender);
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
@@ -321,8 +383,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
         } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(Shutdown.INSTANCE, self());
         }
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
         persistShardList();
@@ -687,7 +749,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
 
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -700,7 +762,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberExited(ClusterEvent.MemberExited message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
 
         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -713,7 +775,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
 
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -736,7 +798,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
@@ -745,7 +807,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
@@ -791,7 +853,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         if(shardInformation != null && shardInformation.getActor() != null) {
             shardInformation.getActor().tell(
-                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+                    new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
         } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
                     message.getShardName(), message.getNewState());
@@ -871,8 +933,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 continue;
             }
 
-            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
-                    shardName, address);
+            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
+                    persistenceId(), shardName, address, visitedAddresses);
 
             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
                     message.isWaitUntilReady(), visitedAddresses), getContext());
@@ -1128,7 +1190,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
+    private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
                                                String leaderPath, ShardIdentifier shardId) {
         Exception failure;
         switch (serverChangeStatus) {
@@ -1154,6 +1216,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+        LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
@@ -1211,7 +1275,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
-        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
+            0, 0));
     }
 
     private static class ForwardedAddServerReply {
@@ -1249,7 +1314,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
-        private ActorPath actorPath;
         private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
@@ -1297,13 +1361,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return actor;
         }
 
-        ActorPath getActorPath() {
-            return actorPath;
-        }
-
         void setActor(ActorRef actor) {
             this.actor = actor;
-            this.actorPath = actor.path();
         }
 
         ShardIdentifier getShardId() {
@@ -1458,6 +1517,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         void setLeaderAvailable(boolean leaderAvailable) {
             this.leaderAvailable = leaderAvailable;
+
+            if(leaderAvailable) {
+                notifyOnShardInitializedCallbacks();
+            }
         }
 
         short getLeaderVersion() {
@@ -1558,7 +1621,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private CountDownLatch waitTillReadyCountdownLatch;
         private PrimaryShardInfoFutureCache primaryShardInfoCache;
         private DatastoreSnapshot restoreFromSnapshot;
-
         private volatile boolean sealed;
 
         @SuppressWarnings("unchecked")
@@ -1780,21 +1842,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * The WrappedShardResponse class wraps a response from a Shard.
      */
     private static class WrappedShardResponse {
-        private final String shardName;
+        private final ShardIdentifier shardId;
         private final Object response;
+        private final String leaderPath;
 
-        private WrappedShardResponse(String shardName, Object response) {
-            this.shardName = shardName;
+        private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+            this.shardId = shardId;
             this.response = response;
+            this.leaderPath = leaderPath;
         }
 
-        String getShardName() {
-            return shardName;
+        ShardIdentifier getShardId() {
+            return shardId;
         }
 
         Object getResponse() {
             return response;
         }
+
+        String getLeaderPath() {
+            return leaderPath;
+        }
     }
 }