Make SwitchShardBehavior transport RaftState
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index d61e12e1cb2fd2bc97e7f6b34429c22adaf72669..33b3810447532741a30e2e0f7084888646346a6a 100644 (file)
@@ -9,17 +9,19 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static akka.pattern.Patterns.ask;
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -88,8 +90,11 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -97,6 +102,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -156,7 +162,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.cluster = builder.cluster;
         this.configuration = builder.configuration;
         this.datastoreContextFactory = builder.datastoreContextFactory;
-        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
+        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
@@ -181,7 +187,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void postStop() {
-        LOG.info("Stopping ShardManager");
+        LOG.info("Stopping ShardManager {}", persistenceId());
 
         mBean.unregisterMBean();
     }
@@ -232,26 +238,142 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof PrimaryShardFoundForContext) {
             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
             onPrimaryShardFoundContext(primaryShardFoundContext);
-        } else if(message instanceof RemoveShardReplica){
-            onRemoveShardReplica((RemoveShardReplica)message);
+        } else if(message instanceof RemoveShardReplica) {
+            onRemoveShardReplica((RemoveShardReplica) message);
+        } else if(message instanceof WrappedShardResponse){
+            onWrappedShardResponse((WrappedShardResponse) message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
-        } else if (message instanceof SaveSnapshotSuccess) {
+        } else if(message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
-        } else if (message instanceof SaveSnapshotFailure) {
+        } else if(message instanceof SaveSnapshotFailure) {
             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
+        } else if(message instanceof Shutdown) {
+            onShutDown();
         } else {
             unknownMessage(message);
         }
     }
 
+    private void onShutDown() {
+        List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() != null) {
+                LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+                FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+                stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
+            }
+        }
+
+        LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+        ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+        Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+        combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<Boolean> results) {
+                LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+                self().tell(PoisonPill.getInstance(), self());
+
+                if(failure != null) {
+                    LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+                } else {
+                    int nfailed = 0;
+                    for(Boolean r: results) {
+                        if(!r) {
+                            nfailed++;
+                        }
+                    }
+
+                    if(nfailed > 0) {
+                        LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+                    }
+                }
+            }
+        }, dispatcher);
+    }
+
+    private void onWrappedShardResponse(WrappedShardResponse message) {
+        if (message.getResponse() instanceof RemoveServerReply) {
+            onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+                    message.getLeaderPath());
+        }
+    }
+
+    private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+            String leaderPath) {
+        shardReplicaOperationsInProgress.remove(shardId);
+
+        LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+                    shardId.getShardName());
+            originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else {
+            LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+                    persistenceId(), shardId, replyMsg.getStatus());
+
+            Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+                    leaderPath, shardId);
+            originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+        }
+    }
+
     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
-            addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
+            addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
+                    getSender());
+        } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
+            removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
+                    primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+        }
+    }
+
+    private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
+            final ActorRef sender) {
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
         }
+
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+        final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+        //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+        LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+                primaryPath, shardId);
+
+        Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
+                duration());
+        Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+                new RemoveServer(shardId.toString()), removeServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+                            primaryPath, shardName);
+
+                    LOG.debug ("{}: {}", persistenceId(), msg, failure);
+
+                    // FAILURE
+                    sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+                } else {
+                    // SUCCESS
+                    self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
     private void onShardReplicaRemoved(ServerRemoved message) {
@@ -261,8 +383,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
             return;
         } else if(shardInformation.getActor() != null) {
-            LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
-            shardInformation.getActor().tell(PoisonPill.getInstance(), self());
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+            shardInformation.getActor().tell(Shutdown.INSTANCE, self());
         }
         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
         persistShardList();
@@ -627,7 +749,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
 
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -640,7 +762,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberExited(ClusterEvent.MemberExited message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
 
         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -653,7 +775,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
 
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
@@ -676,7 +798,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         addPeerAddress(memberName, message.member().address());
@@ -685,7 +807,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
-        String memberName = message.member().roles().head();
+        String memberName = message.member().roles().iterator().next();
         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
 
         markMemberUnavailable(memberName);
@@ -731,7 +853,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         if(shardInformation != null && shardInformation.getActor() != null) {
             shardInformation.getActor().tell(
-                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+                    new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
         } else {
             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
                     message.getShardName(), message.getNewState());
@@ -797,12 +919,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
+        Collection<String> visitedAddresses;
+        if(message instanceof RemoteFindPrimary) {
+            visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
+        } else {
+            visitedAddresses = new ArrayList<>();
+        }
+
+        visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
+
         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
-            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
-                    shardName, address);
+            if(visitedAddresses.contains(address)) {
+                continue;
+            }
+
+            LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
+                    persistenceId(), shardName, address, visitedAddresses);
 
             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
-                    message.isWaitUntilReady()), getContext());
+                    message.isWaitUntilReady(), visitedAddresses), getContext());
             return;
         }
 
@@ -939,7 +1074,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             @Override
             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
-
             }
 
             @Override
@@ -1050,40 +1184,52 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            Exception failure;
-            switch (replyMsg.getStatus()) {
-                case TIMEOUT:
-                    failure = new TimeoutException(String.format(
-                            "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
-                            "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                            leaderPath, shardName));
-                    break;
-                case NO_LEADER:
-                    failure = createNoShardLeaderException(shardInfo.getShardId());
-                    break;
-                default :
-                    failure = new RuntimeException(String.format(
-                            "AddServer request to leader %s for shard %s failed with status %s",
-                            leaderPath, shardName, replyMsg.getStatus()));
-            }
+            Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
 
             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
     }
 
-    private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
-        String shardName = shardReplicaMsg.getShardName();
-
-        // verify the local shard replica is available in the controller node
-        if (!localShards.containsKey(shardName)) {
-            String msg = String.format("Local shard %s does not", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
-            return;
+    private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
+                                               String leaderPath, ShardIdentifier shardId) {
+        Exception failure;
+        switch (serverChangeStatus) {
+            case TIMEOUT:
+                failure = new TimeoutException(String.format(
+                        "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+                        "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                        leaderPath, shardId.getShardName()));
+                break;
+            case NO_LEADER:
+                failure = createNoShardLeaderException(shardId);
+                break;
+            case NOT_SUPPORTED:
+                failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+                        serverChange.getSimpleName(), shardId.getShardName()));
+                break;
+            default :
+                failure = new RuntimeException(String.format(
+                        "%s request to leader %s for shard %s failed with status %s",
+                        serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
         }
-        // call RemoveShard for the shardName
-        getSender().tell(new akka.actor.Status.Success(true), getSelf());
-        return;
+        return failure;
+    }
+
+    private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+        LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+        findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
+                shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
+            @Override
+            public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            }
+
+            @Override
+            public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+                getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+            }
+        });
     }
 
     private void persistShardList() {
@@ -1129,7 +1275,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
-        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
+            0, 0));
     }
 
     private static class ForwardedAddServerReply {
@@ -1167,7 +1314,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
-        private ActorPath actorPath;
         private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
@@ -1215,13 +1361,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return actor;
         }
 
-        ActorPath getActorPath() {
-            return actorPath;
-        }
-
         void setActor(ActorRef actor) {
             this.actor = actor;
-            this.actorPath = actor.path();
         }
 
         ShardIdentifier getShardId() {
@@ -1376,6 +1517,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         void setLeaderAvailable(boolean leaderAvailable) {
             this.leaderAvailable = leaderAvailable;
+
+            if(leaderAvailable) {
+                notifyOnShardInitializedCallbacks();
+            }
         }
 
         short getLeaderVersion() {
@@ -1629,10 +1774,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardName;
         }
 
-        public ActorRef getShardManagerActor() {
-            return shardManagerActor;
-        }
-
         @Override
         public void onFailure(Throwable failure) {
             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
@@ -1662,44 +1803,65 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final RemotePrimaryShardFound remotePrimaryShardFound;
         private final LocalPrimaryShardFound localPrimaryShardFound;
 
-        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
+        public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
+                @Nonnull Object primaryFoundMessage) {
             this.shardName = Preconditions.checkNotNull(shardName);
             this.contextMessage = Preconditions.checkNotNull(contextMessage);
             Preconditions.checkNotNull(primaryFoundMessage);
-            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
-            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
+            this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
+                    (RemotePrimaryShardFound) primaryFoundMessage : null;
+            this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
+                    (LocalPrimaryShardFound) primaryFoundMessage : null;
         }
 
         @Nonnull
-        public String getPrimaryPath(){
-            if(remotePrimaryShardFound != null){
+        String getPrimaryPath(){
+            if(remotePrimaryShardFound != null) {
                 return remotePrimaryShardFound.getPrimaryPath();
             }
             return localPrimaryShardFound.getPrimaryPath();
         }
 
         @Nonnull
-        public Object getContextMessage() {
+        Object getContextMessage() {
             return contextMessage;
         }
 
         @Nullable
-        public RemotePrimaryShardFound getRemotePrimaryShardFound(){
+        RemotePrimaryShardFound getRemotePrimaryShardFound() {
             return remotePrimaryShardFound;
         }
 
-        @Nullable
-        public LocalPrimaryShardFound getLocalPrimaryShardFound(){
-            return localPrimaryShardFound;
+        @Nonnull
+        String getShardName() {
+            return shardName;
+        }
+    }
+
+    /**
+     * The WrappedShardResponse class wraps a response from a Shard.
+     */
+    private static class WrappedShardResponse {
+        private final ShardIdentifier shardId;
+        private final Object response;
+        private final String leaderPath;
+
+        private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+            this.shardId = shardId;
+            this.response = response;
+            this.leaderPath = leaderPath;
         }
 
-        boolean isPrimaryLocal(){
-            return (remotePrimaryShardFound == null);
+        ShardIdentifier getShardId() {
+            return shardId;
         }
 
-        @Nonnull
-        public String getShardName() {
-            return shardName;
+        Object getResponse() {
+            return response;
+        }
+
+        String getLeaderPath() {
+            return leaderPath;
         }
     }
 }