Bug 2187: Timeout the Persist state 93/30493/6
authorTom Pantelis <tpanteli@brocade.com>
Tue, 1 Dec 2015 12:59:13 +0000 (07:59 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 14 Dec 2015 22:16:43 +0000 (22:16 +0000)
On add/remove server, if replication consensus isn't reached in the
Persist state, any pending or future operations will remain queued
until they eventually timeout on the caller side with no response. If
consensus is eventually reached, the pending operations would get
processed even though the caller is gone.

To alleviate this, I added a timer to the Persist state (2 * election
timeout). If it times out, pending operations are failed with a
PRIOR_REQUEST_CONSENSUS_TIMEOUT response. Also future operations are
failed if the timeout occurred.

Change-Id: I83ae528d6bec3fb8f8e3da7c5fd4ca75cfeeb4d5
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java

index 207642a..2a2b501 100644 (file)
@@ -14,14 +14,12 @@ import com.google.common.base.Preconditions;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
@@ -30,7 +28,6 @@ import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSn
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Handles server configuration related messages for a RaftActor.
@@ -59,8 +56,8 @@ class RaftActorServerConfigurationSupport {
         } else if(message instanceof RemoveServer) {
             onRemoveServer((RemoveServer) message, raftActor, sender);
             return true;
-        } else if (message instanceof FollowerCatchUpTimeout) {
-            currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout) message);
+        } else if (message instanceof ServerOperationTimeout) {
+            currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message);
             return true;
         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
@@ -77,6 +74,7 @@ class RaftActorServerConfigurationSupport {
     }
 
     private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) {
+        LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
         if(removeServer.getServerId().equals(raftActor.getLeaderId())){
             // Removing current leader is not supported yet
             // TODO: To properly support current leader removal we need to first implement transfer of leadership
@@ -121,7 +119,7 @@ class RaftActorServerConfigurationSupport {
      * </ul>
      */
     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
-        LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
+        LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
 
         onNewOperation(raftActor, new AddServerContext(addServer, sender));
     }
@@ -148,7 +146,7 @@ class RaftActorServerConfigurationSupport {
     private interface OperationState {
         void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
 
-        void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
+        void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout);
 
         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
 
@@ -165,7 +163,7 @@ class RaftActorServerConfigurationSupport {
     }
 
     /**
-     * Abstract base class for server operation FSM state. Handles common behavior for all states.
+     * Abstract base class for server operation FSM state. Handles common behavior for all states.
      */
     private abstract class AbstractOperationState implements OperationState {
         @Override
@@ -179,8 +177,8 @@ class RaftActorServerConfigurationSupport {
         }
 
         @Override
-        public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
-            LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
+        public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+            LOG.debug("onServerOperationTimeout should not be called in state {}", this);
         }
 
         @Override
@@ -204,7 +202,8 @@ class RaftActorServerConfigurationSupport {
 
             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
 
-            currentOperationState = new Persisting(operationContext);
+            currentOperationState = new Persisting(operationContext, newTimer(
+                    new ServerOperationTimeout(operationContext.getServerId())));
 
             sendReply(raftActor, operationContext, ServerChangeStatus.OK);
         }
@@ -225,7 +224,7 @@ class RaftActorServerConfigurationSupport {
             }
         }
 
-        private void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
+        protected void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
                 ServerChangeStatus status) {
             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
 
@@ -233,6 +232,12 @@ class RaftActorServerConfigurationSupport {
                     raftActor.self());
         }
 
+        Cancellable newTimer(Object message) {
+            return raftContext.getActorSystem().scheduler().scheduleOnce(
+                    raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(),
+                            message, raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+        }
+
         @Override
         public String toString() {
             return getClass().getSimpleName();
@@ -259,9 +264,12 @@ class RaftActorServerConfigurationSupport {
      */
     private class Persisting extends AbstractOperationState {
         private final ServerOperationContext<?> operationContext;
+        private final Cancellable timer;
+        private boolean timedOut = false;
 
-        Persisting(ServerOperationContext<?> operationContext) {
+        Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
             this.operationContext = operationContext;
+            this.timer = timer;
         }
 
         @Override
@@ -272,9 +280,34 @@ class RaftActorServerConfigurationSupport {
                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
                         applyState.getReplicatedLogEntry().getData());
 
+                timer.cancel();
                 operationComplete(raftActor, operationContext, null);
             }
         }
+
+        @Override
+        public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+            LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
+                    timeout.getServerId());
+
+            timedOut = true;
+
+            // Fail any pending operations
+            ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
+            while(nextOperation != null) {
+                sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+                nextOperation = pendingOperationsQueue.poll();
+            }
+        }
+
+        @Override
+        public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+            if(timedOut) {
+                sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+            } else {
+                super.onNewOperation(raftActor, operationContext);
+            }
+        }
     }
 
     /**
@@ -292,17 +325,13 @@ class RaftActorServerConfigurationSupport {
         }
 
         Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
-            return raftContext.getActorSystem().scheduler().scheduleOnce(
-                    new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
-                            TimeUnit.MILLISECONDS), raftContext.getActor(),
-                            new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
-                            raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+            return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
         }
 
-        void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
-            String serverId = followerTimeout.getNewServerId();
+        void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+            String serverId = timeout.getServerId();
 
-            LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
+            LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
 
             // cleanup
             raftContext.removePeer(serverId);
@@ -382,11 +411,11 @@ class RaftActorServerConfigurationSupport {
         }
 
         @Override
-        public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
-            handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
+        public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+            handleInstallSnapshotTimeout(raftActor, timeout);
 
             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
-                    followerTimeout.getNewServerId());
+                    timeout.getServerId());
         }
 
         @Override
@@ -447,11 +476,11 @@ class RaftActorServerConfigurationSupport {
         }
 
         @Override
-        public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
-            handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
+        public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+            handleInstallSnapshotTimeout(raftActor, timeout);
 
             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
-                    raftContext.getId(), followerTimeout.getNewServerId());
+                    raftContext.getId(), timeout.getServerId());
         }
     }
 
@@ -488,6 +517,8 @@ class RaftActorServerConfigurationSupport {
         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
 
         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
+
+        abstract String getServerId();
     }
 
     /**
@@ -512,6 +543,11 @@ class RaftActorServerConfigurationSupport {
         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
 
         }
+
+        @Override
+        String getServerId() {
+            return getOperation().getNewServerId();
+        }
     }
 
     private abstract class RemoveServerState extends AbstractOperationState {
@@ -526,7 +562,6 @@ class RaftActorServerConfigurationSupport {
         public RemoveServerContext getRemoveServerContext() {
             return removeServerContext;
         }
-
     }
 
     private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
@@ -567,5 +602,21 @@ class RaftActorServerConfigurationSupport {
             }
         }
 
+        @Override
+        String getServerId() {
+            return getOperation().getServerId();
+        }
+    }
+
+    static class ServerOperationTimeout {
+        private final String serverId;
+
+        ServerOperationTimeout(String serverId){
+           this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
+        }
+
+        String getServerId() {
+            return serverId;
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java
deleted file mode 100644 (file)
index 365e3a0..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2015 Dell Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.raft.messages;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Local message sent to self when catch-up of a new follower doesn't complete in a timely manner
- */
-
-public class FollowerCatchUpTimeout {
-    private final String newServerId;
-
-    public FollowerCatchUpTimeout(String serverId){
-       this.newServerId = Preconditions.checkNotNull(serverId, "serverId should not be null");
-    }
-    public String getNewServerId() {
-        return newServerId;
-    }
-
-}
index 64a0f66..7616fe6 100644 (file)
@@ -13,11 +13,39 @@ package org.opendaylight.controller.cluster.raft.messages;
  * @author Thomas Pantelis
  */
 public enum ServerChangeStatus {
+    /**
+     * Request successfully completed.
+     */
     OK,
+
+    /**
+     * No leader exists to process the request.
+     */
     NO_LEADER,
+
+    /**
+     * For an AddServer request, the leader timed out trying to install a snapshot on the new server.
+     */
     TIMEOUT,
+
+    /**
+     * For an AddServer request, the server to add already exists.
+     */
     ALREADY_EXISTS,
-    DOES_NOT_EXIST,  // Server with the specified address does not exist
-    NOT_SUPPORTED,   // Some types of RemoveServer for example Removing the current Leader may not be
-                     // supported (at least initially)
+
+    /**
+     * For a RemoveServer request, the server to remove does not exist.
+     */
+    DOES_NOT_EXIST,
+
+    /**
+     * The leader could not process the request due to a prior request that timed out while trying to
+     * achieve replication consensus.
+     */
+    PRIOR_REQUEST_CONSENSUS_TIMEOUT,
+
+    /**
+     * An unsupported request, for example removing the current leader may not be supported (at least initially)
+     */
+    NOT_SUPPORTED,
 }
index da85c67..d6cd982 100644 (file)
@@ -42,7 +42,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
@@ -484,7 +483,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
         leaderActor.tell(commitMsg, leaderActor);
 
-        leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
+        leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor);
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
@@ -589,18 +588,49 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
 
-        newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class);
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor);
+
+        // Drop UnInitializedFollowerSnapshotReply initially
+        leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
+
+        MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
+        TestActorRef<MessageCollectorActor> newFollowerCollectorActor =
+                newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID);
+
+        // Drop AppendEntries to the new follower so consensus isn't reached
+        newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
+        // Capture the UnInitializedFollowerSnapshotReply
+        Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class);
+
+        // Send the UnInitializedFollowerSnapshotReply to resume the first request
+        leaderRaftActor.setDropMessageOfType(null);
+        leaderActor.tell(snapshotReply, leaderActor);
+
+        expectFirstMatching(newFollowerCollectorActor, AppendEntries.class);
+
+        // Send a second AddServer
+        leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
+
+        // The first AddServer should succeed with OK even though consensus wasn't reached
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
         assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
 
         // Verify ServerConfigurationPayload entry in leader's log
-
         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
                 votingServer(NEW_SERVER_ID));
+
+        // The second AddServer should fail since consensus wasn't reached for the first
+        addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
+
+        // Re-send the second AddServer - should also fail
+        leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
+        addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
     }
 
     @Test
@@ -774,11 +804,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     }
 
     private TestActorRef<MessageCollectorActor> newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
-        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+        return newCollectorActor(leaderRaftActor, LEADER_ID);
+    }
+
+    private TestActorRef<MessageCollectorActor> newCollectorActor(AbstractMockRaftActor raftActor, String id) {
+        TestActorRef<MessageCollectorActor> collectorActor = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId(LEADER_ID + "Collector"));
-        leaderRaftActor.setCollectorActor(leaderCollectorActor);
-        return leaderCollectorActor;
+                actorFactory.generateActorId(id + "Collector"));
+        raftActor.setCollectorActor(collectorActor);
+        return collectorActor;
     }
 
     private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {