Add ShutDown message to RaftActor to transfer leader 65/31565/8
authorTom Pantelis <tpanteli@brocade.com>
Wed, 16 Dec 2015 17:37:42 +0000 (12:37 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 21 Dec 2015 15:23:21 +0000 (10:23 -0500)
Added a ShutDown message to gracefully stop a RaftActor. If the leader,
it attempts leadership transfer as follows:

1) Send a LeaderStateChanged message with a null leader Id to the local
RoleChangeNotifier. This will cause the ShardManager to clear it's
cached leader state and primaryInfoCache.

2) Send a LeaderTransitioning message to each follower so each can send
LeaderStateChanged messages to their local RoleChangeNotifiers.

3) Call a protected method, pauseLeader, passing the
RaftActorLeadershipTransferCohort. This allows derived classes to
perform work prior to transferring leadership. The Shard will wait for
current transactions to complete.

4) After pause is complete, the run method on the
RaftActorLeadershipTransferCohort is called which in turn calls
transferLeadership on the Leader.

5) When transfer is complete, send a PoisonPill to self.

If not the leader or has no followers, it just calls pauseLeader an
sends a PoisonPill on completion.

Change-Id: I27fa8a95f260b75516b7e558caea4a1a3255dda3
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/Shutdown.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 3afaad857204b75ebb1fe36ca9383d0fe9a0e76d..6851f6aab9ca4441ab79055c967d14c4c5335cc5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,6 +125,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
+    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
+
+    private boolean shuttingDown;
+
     public RaftActor(String id, Map<String, String> peerAddresses,
          Optional<ConfigParams> configParams, short payloadVersion) {
 
@@ -253,11 +259,74 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             switchBehavior(((SwitchBehavior) message));
         } else if(message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
+        } else if(message instanceof Shutdown) {
+            onShutDown();
+        } else if(message instanceof Runnable) {
+            ((Runnable)message).run();
         } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
 
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+        LOG.debug("{}: Initiating leader transfer", persistenceId());
+
+        if(leadershipTransferInProgress == null) {
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+            leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                    leadershipTransferInProgress = null;
+                }
+
+                @Override
+                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                    leadershipTransferInProgress = null;
+                }
+            });
+
+            leadershipTransferInProgress.addOnComplete(onComplete);
+            leadershipTransferInProgress.init();
+        } else {
+            LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+            leadershipTransferInProgress.addOnComplete(onComplete);
+        }
+    }
+
+    private void onShutDown() {
+        LOG.debug("{}: onShutDown", persistenceId());
+
+        if(shuttingDown) {
+            return;
+        }
+
+        shuttingDown = true;
+        if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
+            initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                    LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+                    raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+                }
+
+                @Override
+                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                    LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+                    raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+                }
+            });
+        } else if(currentBehavior.state() == RaftState.Leader) {
+            pauseLeader(new Runnable() {
+                @Override
+                public void run() {
+                    self().tell(PoisonPill.getInstance(), self());
+                }
+            });
+        } else {
+            self().tell(PoisonPill.getInstance(), self());
+        }
+    }
+
     private void onLeaderTransitioning() {
         LOG.debug("{}: onLeaderTransitioning", persistenceId());
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
@@ -361,6 +430,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             }
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+
+            if(leadershipTransferInProgress != null) {
+                leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
+            }
         }
 
         if (roleChangeNotifier.isPresent() &&
@@ -458,6 +531,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return context.getId().equals(currentBehavior.getLeaderId());
     }
 
+    protected boolean isLeaderActive() {
+        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+    }
+
     /**
      * Derived actor can call getLeader if they need a reference to the Leader.
      * This would be useful for example in forwarding a request to an actor
@@ -638,6 +715,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
 
+    /**
+     * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
+     * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
+     * work prior to performing the operation. On completion of any work, the run method must be called to
+     * proceed with the given operation.
+     * <p>
+     * The default implementation immediately runs the operation.
+     *
+     * @param operation the operation to run
+     */
+    protected void pauseLeader(Runnable operation) {
+        operation.run();
+    }
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private String getLeaderAddress(){
index 77678e9f2423a2e8956e33a2483618d3b0d1d4dd..623fa4902ca6fc45ce5d86630ef620afde3b9c10 100644 (file)
  */
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A helper class that participates in raft actor leadership transfer. An instance is created upon
+ * A raft actor support class that participates in leadership transfer. An instance is created upon
  * initialization of leadership transfer.
  * <p>
- * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state.
+ * The transfer process is as follows:
+ * <ol>
+ * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
+ *     clients that we no longer have a working leader.</li>
+ * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
+ *     their local RoleChangeNotifiers.</li>
+ * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
+ *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
+ * <li>When the pause is complete, the {@link #run} method is called which in turn calls
+ *     {@link Leader#transferLeadership}.</li>
+ * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
+ * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
+ *     possibly complete work that was suspended while we were transferring.</li>
+ * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
+ * </ol>
+ * <p>
+ * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
+ * internal state.
  *
  * @author Thomas Pantelis
  */
-public abstract class RaftActorLeadershipTransferCohort {
+public class RaftActorLeadershipTransferCohort implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
+
     private final RaftActor raftActor;
+    private final ActorRef replyTo;
+    private Cancellable newLeaderTimer;
+    private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
+    private long newLeaderTimeoutInMillis = 2000;
+    private final Stopwatch transferTimer = Stopwatch.createUnstarted();
 
-    protected RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+    RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
         this.raftActor = raftActor;
+        this.replyTo = replyTo;
+    }
+
+    void init() {
+        RaftActorContext context = raftActor.getRaftActorContext();
+        RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
+
+        transferTimer.start();
+
+        Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
+        if(roleChangeNotifier.isPresent()) {
+            roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
+                    currentBehavior.getLeaderPayloadVersion()), raftActor.self());
+        }
+
+        LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
+        for(String peerId: context.getPeerIds()) {
+            ActorSelection followerActor = context.getPeerActorSelection(peerId);
+            if(followerActor != null) {
+                followerActor.tell(leaderTransitioning, context.getActor());
+            }
+        }
+
+        raftActor.pauseLeader(this);
     }
 
     /**
-     * This method is invoked to start leadership transfer.
+     * This method is invoked to run the leadership transfer.
      */
-    public void startTransfer() {
+    @Override
+    public void run() {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
+        // Sanity check...
         if(behavior instanceof Leader) {
             ((Leader)behavior).transferLeadership(this);
+        } else {
+            LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
+            finish(true);
         }
     }
 
     /**
-     * This method is invoked to abort leadership transfer.
+     * This method is invoked to abort leadership transfer on failure.
      */
     public void abortTransfer() {
-        transferComplete();
+        LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
+        finish(false);
     }
 
     /**
-     * This method is invoked when leadership transfer is complete.
+     * This method is invoked when leadership transfer was carried out and complete.
      */
-    public abstract void transferComplete();
+    public void transferComplete() {
+        LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
+
+        // We'll give it a little time for the new leader to be elected to give the derived class a
+        // chance to possibly complete work that was suspended while we were transferring. The
+        // RequestVote message from the new leader candidate should cause us to step down as leader
+        // and convert to follower due to higher term. We should then get an AppendEntries heart
+        // beat with the new leader id.
+
+        // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
+        // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
+        // safely run on actor's thread dispatcher.
+        FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
+        newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+                        finish(true);
+                    }
+                }, raftActor.getContext().system().dispatcher(), raftActor.self());
+    }
+
+    void onNewLeader(String newLeader) {
+        if(newLeader != null && newLeaderTimer != null) {
+            LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
+            newLeaderTimer.cancel();
+            finish(true);
+        }
+    }
+
+    private void finish(boolean success) {
+        if(transferTimer.isRunning()) {
+            transferTimer.stop();
+            if(success) {
+                LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
+                        raftActor.getLeaderId(), transferTimer.toString());
+            } else {
+                LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
+                        transferTimer.toString());
+            }
+        }
+
+        for(OnComplete onComplete: onCompleteCallbacks) {
+            if(success) {
+                onComplete.onSuccess(raftActor.self(), replyTo);
+            } else {
+                onComplete.onFailure(raftActor.self(), replyTo);
+            }
+        }
+    }
+
+    void addOnComplete(OnComplete onComplete) {
+        onCompleteCallbacks.add(onComplete);
+    }
+
+    @VisibleForTesting
+    void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+        this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+    }
+
+    interface OnComplete {
+        void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
+        void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+    }
 }
index 6bbb70ce6b71ebb2ed7077b4962235fbf13f5cdf..21aad966cb2b7dd375b6e4667e952ab4b85891a9 100644 (file)
@@ -91,12 +91,25 @@ public class Leader extends AbstractLeader {
         return returnBehavior;
     }
 
+    /**
+     * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
+     * <ul>
+     * <li>Start a timer (Stopwatch).</li>
+     * <li>Send an initial AppendEntries heartbeat to all followers.</li>
+     * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
+     * <li>If it matches, </li>
+     *   <ul>
+     *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
+     *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
+     *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
+     *   </ul>
+     * <li>Otherwise if the election time out period elapses, notify
+     *     {@link RaftActorLeadershipTransferCohort#abortTtransfer}.</li>
+     * </ul>
+     *
+     * @param leadershipTransferCohort
+     */
     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
-        if(!context.hasFollowers()) {
-            leadershipTransferCohort.transferComplete();
-            return;
-        }
-
         LOG.debug("{}: Attempting to transfer leadership", logName());
 
         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
@@ -124,7 +137,7 @@ public class Leader extends AbstractLeader {
             LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
 
             // We can't be sure if the follower has applied all its log entries to its state so send an
-            // additional AppendEntries.
+            // additional AppendEntries with the latest commit index.
             sendAppendEntries(0, false);
 
             // Now send an ElectionTimeout to the matching follower to immediately start an election.
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/Shutdown.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/Shutdown.java
new file mode 100644 (file)
index 0000000..930a4b0
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.io.Serializable;
+
+/**
+ * Message sent to a raft actor to shutdown gracefully. If it's the leader it will transfer leadership to a
+ * follower. As its last act, the actor self-destructs via a PoisonPill.
+ *
+ * @author Thomas Pantelis
+ */
+public class Shutdown implements Serializable {
+    private static final long serialVersionUID = 1L;
+}
index 30ead98cb4060edefba42a5f4583113523c109f3..bf55fa7aca01e1612e1384e007c34c4582da2308 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.InvalidActorNameException;
 import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
@@ -72,16 +71,9 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
 
-        private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
-                TestActorRef<MessageCollectorActor> collectorActor) {
-            super(builder().id(id).peerAddresses(peerAddresses).config(config));
-            this.collectorActor = collectorActor;
-        }
-
-        public static Props props(String id, Map<String, String> peerAddresses, ConfigParams config,
-                TestActorRef<MessageCollectorActor> collectorActor) {
-            return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor).
-                    withDispatcher(Dispatchers.DefaultDispatcherId());
+        private TestRaftActor(Builder builder) {
+            super(builder);
+            this.collectorActor = builder.collectorActor;
         }
 
         void startDropMessages(Class<?> msgClass) {
@@ -147,6 +139,23 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         public ActorRef collectorActor() {
             return collectorActor;
         }
+
+        public static Builder newBuilder() {
+            return new Builder();
+        }
+
+        public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
+            private TestActorRef<MessageCollectorActor> collectorActor;
+
+            public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+                this.collectorActor = collectorActor;
+                return this;
+            }
+
+            private Builder() {
+                super(TestRaftActor.class);
+            }
+        }
     }
 
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
@@ -211,16 +220,19 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
     protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
             ConfigParams configParams) {
-        TestActorRef<MessageCollectorActor> collectorActor = factory.createTestActor(
+        return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
+            Collections.<String, String>emptyMap()).config(configParams));
+    }
+
+    protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
+        builder.collectorActor(factory.<MessageCollectorActor>createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                        factory.generateActorId(id + "-collector"));
+                        factory.generateActorId(id + "-collector"))).id(id);
 
         InvalidActorNameException lastEx = null;
         for(int i = 0; i < 10; i++) {
             try {
-                return factory.createTestActor(TestRaftActor.props(id,
-                        peerAddresses != null ? peerAddresses : Collections.<String, String>emptyMap(),
-                                configParams, collectorActor), id);
+                return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
             } catch (InvalidActorNameException e) {
                 lastEx = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -309,7 +321,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected String testActorPath(String id){
-        return "akka://test/user" + id;
+        return factory.createTestActorPath(id);
     }
 
     protected void verifyLeadersTrimmedLog(long lastIndex) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
new file mode 100644 (file)
index 0000000..3cc330a
--- /dev/null
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static akka.pattern.Patterns.ask;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests leadership transfer end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    private TestActorRef<MessageCollectorActor> leaderNotifierActor;
+    private TestActorRef<MessageCollectorActor> follower1NotifierActor;
+    private TestActorRef<MessageCollectorActor> follower2NotifierActor;
+
+    @Test
+    public void testLeaderTransferOnShutDown() throws Throwable {
+        testLog.info("testLeaderTransferOnShutDown starting");
+
+        createRaftActors();
+
+        sendPayloadWithFollower2Lagging();
+
+        sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
+
+        sendShutDown(follower2Actor);
+
+        testLog.info("testLeaderTransferOnShutDown ending");
+    }
+
+    private void sendShutDown(ActorRef actor) throws Exception {
+        testLog.info("sendShutDown for {} starting", actor.path());
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown());
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        testLog.info("sendShutDown for {} ending", actor.path());
+    }
+
+    private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
+        testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
+
+        clearMessages(leaderNotifierActor);
+        clearMessages(follower1NotifierActor);
+        clearMessages(follower2NotifierActor);
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+
+        assertNullLeaderIdChange(leaderNotifierActor);
+        assertNullLeaderIdChange(follower1NotifierActor);
+        assertNullLeaderIdChange(follower2NotifierActor);
+
+        verifyRaftState(follower1Actor, RaftState.Leader);
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
+        assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
+
+        testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
+    }
+
+    private void sendPayloadWithFollower2Lagging() {
+        testLog.info("sendPayloadWithFollower2Lagging starting");
+
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        sendPayloadData(leaderActor, "zero");
+
+        expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        expectFirstMatching(follower1CollectorActor, ApplyState.class);
+
+        testLog.info("sendPayloadWithFollower2Lagging ending");
+    }
+
+    private void createRaftActors() {
+        testLog.info("createRaftActors starting");
+
+        follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower1Id + "-notifier"));
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                    config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
+
+        follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower2Id + "-notifier"));
+        follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
+                    config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).
+                put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderConfigParams.setElectionTimeoutFactor(3);
+        leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(leaderId + "-notifier"));
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+                config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
+        waitUntilLeader(leaderActor);
+
+        testLog.info("createRaftActors starting");
+    }
+
+    private void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
+        Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+        Throwable lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            try {
+                OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+                        GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+                assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
+                return;
+            } catch (Exception | AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
+
+    private void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
+        LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
+        assertNull("Expected null leader Id", change.getLeaderId());
+    }
+
+    @Test
+    public void testLeaderTransferAborted() throws Throwable {
+        testLog.info("testLeaderTransferAborted starting");
+
+        createRaftActors();
+
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        sendShutDown(leaderActor);
+
+        testLog.info("testLeaderTransferOnShutDown ending");
+    }
+
+    @Test
+    public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
+        testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
+
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
+        waitUntilLeader(leaderActor);
+
+        sendShutDown(leaderActor);
+
+        testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
+    }
+}
index 38650e834f1eb015a0831a6078dd88bf904a22ba..550504b4006161713ae8d5e166ac9bf169c2417e 100644 (file)
@@ -44,7 +44,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     private final byte[] restoreFromSnapshot;
     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
 
-    protected MockRaftActor(Builder builder) {
+    protected MockRaftActor(AbstractBuilder<?, ?> builder) {
         super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
@@ -259,7 +259,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return new Builder();
     }
 
-    public static class Builder {
+    public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private String id;
         private ConfigParams config;
@@ -268,49 +268,65 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
         private byte[] restoreFromSnapshot;
         private Optional<Boolean> persistent = Optional.absent();
+        private final Class<A> actorClass;
 
-        public Builder id(String id) {
+        protected AbstractBuilder(Class<A> actorClass) {
+            this.actorClass = actorClass;
+        }
+
+        @SuppressWarnings("unchecked")
+        private T self() {
+            return (T) this;
+        }
+
+        public T id(String id) {
             this.id = id;
-            return this;
+            return self();
         }
 
-        public Builder peerAddresses(Map<String, String> peerAddresses) {
+        public T peerAddresses(Map<String, String> peerAddresses) {
             this.peerAddresses = peerAddresses;
-            return this;
+            return self();
         }
 
-        public Builder config(ConfigParams config) {
+        public T config(ConfigParams config) {
             this.config = config;
-            return this;
+            return self();
         }
 
-        public Builder dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
+        public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
             this.dataPersistenceProvider = dataPersistenceProvider;
-            return this;
+            return self();
         }
 
-        public Builder roleChangeNotifier(ActorRef roleChangeNotifier) {
+        public T roleChangeNotifier(ActorRef roleChangeNotifier) {
             this.roleChangeNotifier = roleChangeNotifier;
-            return this;
+            return self();
         }
 
-        public Builder snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
+        public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
             this.snapshotMessageSupport = snapshotMessageSupport;
-            return this;
+            return self();
         }
 
-        public Builder restoreFromSnapshot(byte[] restoreFromSnapshot) {
+        public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
             this.restoreFromSnapshot = restoreFromSnapshot;
-            return this;
+            return self();
         }
 
-        public Builder persistent(Optional<Boolean> persistent) {
+        public T persistent(Optional<Boolean> persistent) {
             this.persistent = persistent;
-            return this;
+            return self();
         }
 
         public Props props() {
-            return Props.create(MockRaftActor.class, this);
+            return Props.create(actorClass, this);
+        }
+    }
+
+    public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
+        private Builder() {
+            super(MockRaftActor.class);
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java
new file mode 100644 (file)
index 0000000..5680a0b
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import akka.dispatch.Dispatchers;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
+
+/**
+ * Unit tests for RaftActorLeadershipTransferCohort.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest {
+    private final TestActorFactory factory = new TestActorFactory(getSystem());
+    private MockRaftActor mockRaftActor;
+    private RaftActorLeadershipTransferCohort cohort;
+    private final OnComplete onComplete = mock(OnComplete.class);
+    DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+    @After
+    public void tearDown() {
+        factory.close();
+    }
+
+    private void setup() {
+        String persistenceId = factory.generateActorId("leader-");
+        mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
+                config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId).underlyingActor();
+        cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null);
+        cohort.addOnComplete(onComplete);
+        mockRaftActor.waitForInitializeBehaviorComplete();
+    }
+
+    @Test
+    public void testOnNewLeader() {
+        setup();
+        cohort.setNewLeaderTimeoutInMillis(20000);
+
+        cohort.onNewLeader("new-leader");
+        verify(onComplete, never()).onSuccess(mockRaftActor.self(), null);
+
+        cohort.transferComplete();
+
+        cohort.onNewLeader(null);
+        verify(onComplete, never()).onSuccess(mockRaftActor.self(), null);
+
+        cohort.onNewLeader("new-leader");
+        verify(onComplete).onSuccess(mockRaftActor.self(), null);
+    }
+
+    @Test
+    public void testNewLeaderTimeout() {
+        setup();
+        cohort.setNewLeaderTimeoutInMillis(200);
+        cohort.transferComplete();
+        verify(onComplete, timeout(3000)).onSuccess(mockRaftActor.self(), null);
+    }
+
+    @Test
+    public void testNotLeaderOnRun() {
+        config.setElectionTimeoutFactor(10000);
+        setup();
+        cohort.run();
+        verify(onComplete).onSuccess(mockRaftActor.self(), null);
+    }
+
+    @Test
+    public void testAbortTransfer() {
+        setup();
+        cohort.abortTransfer();
+        verify(onComplete).onFailure(mockRaftActor.self(), null);
+    }
+}
index c39c62c727b601b0dd20c9e59098345604490798..b51f0a70b1dc7ef54da2f837b55354fdffaf02af 100644 (file)
@@ -2094,20 +2094,6 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
     }
 
-    @Test
-    public void testTransferLeadershipWithNoFollowers() {
-        logStart("testTransferLeadershipWithNoFollowers");
-
-        MockRaftActorContext leaderActorContext = createActorContext();
-
-        leader = new Leader(leaderActorContext);
-
-        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
-        leader.transferLeadership(mockTransferCohort);
-
-        verify(mockTransferCohort).transferComplete();
-    }
-
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {