From 77a8b91c42d76b6cb6a1790b35817c08d2ef5e84 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 1 Dec 2015 07:59:13 -0500 Subject: [PATCH] Bug 2187: Timeout the Persist state On add/remove server, if replication consensus isn't reached in the Persist state, any pending or future operations will remain queued until they eventually timeout on the caller side with no response. If consensus is eventually reached, the pending operations would get processed even though the caller is gone. To alleviate this, I added a timer to the Persist state (2 * election timeout). If it times out, pending operations are failed with a PRIOR_REQUEST_CONSENSUS_TIMEOUT response. Also future operations are failed if the timeout occurred. Change-Id: I83ae528d6bec3fb8f8e3da7c5fd4ca75cfeeb4d5 Signed-off-by: Tom Pantelis --- .../RaftActorServerConfigurationSupport.java | 107 +++++++++++++----- .../raft/messages/FollowerCatchUpTimeout.java | 26 ----- .../raft/messages/ServerChangeStatus.java | 34 +++++- ...ftActorServerConfigurationSupportTest.java | 50 ++++++-- 4 files changed, 152 insertions(+), 65 deletions(-) delete mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 207642a721..2a2b50137c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -14,14 +14,12 @@ import com.google.common.base.Preconditions; import java.util.LinkedList; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; -import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; @@ -30,7 +28,6 @@ import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSn import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; /** * Handles server configuration related messages for a RaftActor. @@ -59,8 +56,8 @@ class RaftActorServerConfigurationSupport { } else if(message instanceof RemoveServer) { onRemoveServer((RemoveServer) message, raftActor, sender); return true; - } else if (message instanceof FollowerCatchUpTimeout) { - currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout) message); + } else if (message instanceof ServerOperationTimeout) { + currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message); return true; } else if (message instanceof UnInitializedFollowerSnapshotReply) { currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor, @@ -77,6 +74,7 @@ class RaftActorServerConfigurationSupport { } private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) { + LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); if(removeServer.getServerId().equals(raftActor.getLeaderId())){ // Removing current leader is not supported yet // TODO: To properly support current leader removal we need to first implement transfer of leadership @@ -121,7 +119,7 @@ class RaftActorServerConfigurationSupport { * */ private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { - LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer); + LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState); onNewOperation(raftActor, new AddServerContext(addServer, sender)); } @@ -148,7 +146,7 @@ class RaftActorServerConfigurationSupport { private interface OperationState { void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext); - void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout); + void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout); void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply); @@ -165,7 +163,7 @@ class RaftActorServerConfigurationSupport { } /** - * Abstract base class for server operation FSM state. Handles common behavior for all states. + * Abstract base class for a server operation FSM state. Handles common behavior for all states. */ private abstract class AbstractOperationState implements OperationState { @Override @@ -179,8 +177,8 @@ class RaftActorServerConfigurationSupport { } @Override - public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { - LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this); + public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + LOG.debug("onServerOperationTimeout should not be called in state {}", this); } @Override @@ -204,7 +202,8 @@ class RaftActorServerConfigurationSupport { raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); - currentOperationState = new Persisting(operationContext); + currentOperationState = new Persisting(operationContext, newTimer( + new ServerOperationTimeout(operationContext.getServerId()))); sendReply(raftActor, operationContext, ServerChangeStatus.OK); } @@ -225,7 +224,7 @@ class RaftActorServerConfigurationSupport { } } - private void sendReply(RaftActor raftActor, ServerOperationContext operationContext, + protected void sendReply(RaftActor raftActor, ServerOperationContext operationContext, ServerChangeStatus status) { LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation()); @@ -233,6 +232,12 @@ class RaftActorServerConfigurationSupport { raftActor.self()); } + Cancellable newTimer(Object message) { + return raftContext.getActorSystem().scheduler().scheduleOnce( + raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), + message, raftContext.getActorSystem().dispatcher(), raftContext.getActor()); + } + @Override public String toString() { return getClass().getSimpleName(); @@ -259,9 +264,12 @@ class RaftActorServerConfigurationSupport { */ private class Persisting extends AbstractOperationState { private final ServerOperationContext operationContext; + private final Cancellable timer; + private boolean timedOut = false; - Persisting(ServerOperationContext operationContext) { + Persisting(ServerOperationContext operationContext, Cancellable timer) { this.operationContext = operationContext; + this.timer = timer; } @Override @@ -272,9 +280,34 @@ class RaftActorServerConfigurationSupport { LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(), applyState.getReplicatedLogEntry().getData()); + timer.cancel(); operationComplete(raftActor, operationContext, null); } } + + @Override + public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(), + timeout.getServerId()); + + timedOut = true; + + // Fail any pending operations + ServerOperationContext nextOperation = pendingOperationsQueue.poll(); + while(nextOperation != null) { + sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + nextOperation = pendingOperationsQueue.poll(); + } + } + + @Override + public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + if(timedOut) { + sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + } else { + super.onNewOperation(raftActor, operationContext); + } + } } /** @@ -292,17 +325,13 @@ class RaftActorServerConfigurationSupport { } Cancellable newInstallSnapshotTimer(RaftActor raftActor) { - return raftContext.getActorSystem().scheduler().scheduleOnce( - new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2), - TimeUnit.MILLISECONDS), raftContext.getActor(), - new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()), - raftContext.getActorSystem().dispatcher(), raftContext.getActor()); + return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId())); } - void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { - String serverId = followerTimeout.getNewServerId(); + void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + String serverId = timeout.getServerId(); - LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId); + LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId); // cleanup raftContext.removePeer(serverId); @@ -382,11 +411,11 @@ class RaftActorServerConfigurationSupport { } @Override - public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { - handleOnFollowerCatchupTimeout(raftActor, followerTimeout); + public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + handleInstallSnapshotTimeout(raftActor, timeout); LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), - followerTimeout.getNewServerId()); + timeout.getServerId()); } @Override @@ -447,11 +476,11 @@ class RaftActorServerConfigurationSupport { } @Override - public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { - handleOnFollowerCatchupTimeout(raftActor, followerTimeout); + public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + handleInstallSnapshotTimeout(raftActor, timeout); LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete", - raftContext.getId(), followerTimeout.getNewServerId()); + raftContext.getId(), timeout.getServerId()); } } @@ -488,6 +517,8 @@ class RaftActorServerConfigurationSupport { abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support); abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus); + + abstract String getServerId(); } /** @@ -512,6 +543,11 @@ class RaftActorServerConfigurationSupport { void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) { } + + @Override + String getServerId() { + return getOperation().getNewServerId(); + } } private abstract class RemoveServerState extends AbstractOperationState { @@ -526,7 +562,6 @@ class RaftActorServerConfigurationSupport { public RemoveServerContext getRemoveServerContext() { return removeServerContext; } - } private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{ @@ -567,5 +602,21 @@ class RaftActorServerConfigurationSupport { } } + @Override + String getServerId() { + return getOperation().getServerId(); + } + } + + static class ServerOperationTimeout { + private final String serverId; + + ServerOperationTimeout(String serverId){ + this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null"); + } + + String getServerId() { + return serverId; + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java deleted file mode 100644 index 365e3a0b3c..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2015 Dell Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.raft.messages; - -import com.google.common.base.Preconditions; - -/** - * Local message sent to self when catch-up of a new follower doesn't complete in a timely manner - */ - -public class FollowerCatchUpTimeout { - private final String newServerId; - - public FollowerCatchUpTimeout(String serverId){ - this.newServerId = Preconditions.checkNotNull(serverId, "serverId should not be null"); - } - public String getNewServerId() { - return newServerId; - } - -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java index 64a0f66fc2..7616fe6743 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java @@ -13,11 +13,39 @@ package org.opendaylight.controller.cluster.raft.messages; * @author Thomas Pantelis */ public enum ServerChangeStatus { + /** + * Request successfully completed. + */ OK, + + /** + * No leader exists to process the request. + */ NO_LEADER, + + /** + * For an AddServer request, the leader timed out trying to install a snapshot on the new server. + */ TIMEOUT, + + /** + * For an AddServer request, the server to add already exists. + */ ALREADY_EXISTS, - DOES_NOT_EXIST, // Server with the specified address does not exist - NOT_SUPPORTED, // Some types of RemoveServer for example Removing the current Leader may not be - // supported (at least initially) + + /** + * For a RemoveServer request, the server to remove does not exist. + */ + DOES_NOT_EXIST, + + /** + * The leader could not process the request due to a prior request that timed out while trying to + * achieve replication consensus. + */ + PRIOR_REQUEST_CONSENSUS_TIMEOUT, + + /** + * An unsupported request, for example removing the current leader may not be supported (at least initially) + */ + NOT_SUPPORTED, } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index da85c67c06..d6cd98218f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -42,7 +42,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; @@ -484,7 +483,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader leaderActor.tell(commitMsg, leaderActor); - leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor); + leaderActor.tell(new RaftActorServerConfigurationSupport.ServerOperationTimeout(NEW_SERVER_ID), leaderActor); AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); @@ -589,18 +588,49 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); - newFollowerRaftActor.underlyingActor().setDropMessageOfType(AppendEntries.class); + TestActorRef leaderCollectorActor = newLeaderCollectorActor(leaderRaftActor); + + // Drop UnInitializedFollowerSnapshotReply initially + leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class); + + MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor(); + TestActorRef newFollowerCollectorActor = + newCollectorActor(newFollowerRaftActorInstance, NEW_SERVER_ID); + + // Drop AppendEntries to the new follower so consensus isn't reached + newFollowerRaftActorInstance.setDropMessageOfType(AppendEntries.class); leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + // Capture the UnInitializedFollowerSnapshotReply + Object snapshotReply = expectFirstMatching(leaderCollectorActor, UnInitializedFollowerSnapshotReply.class); + + // Send the UnInitializedFollowerSnapshotReply to resume the first request + leaderRaftActor.setDropMessageOfType(null); + leaderActor.tell(snapshotReply, leaderActor); + + expectFirstMatching(newFollowerCollectorActor, AppendEntries.class); + + // Send a second AddServer + leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef()); + + // The first AddServer should succeed with OK even though consensus wasn't reached AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); // Verify ServerConfigurationPayload entry in leader's log - verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), votingServer(NEW_SERVER_ID)); + + // The second AddServer should fail since consensus wasn't reached for the first + addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus()); + + // Re-send the second AddServer - should also fail + leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef()); + addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus()); } @Test @@ -774,11 +804,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { } private TestActorRef newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) { - TestActorRef leaderCollectorActor = actorFactory.createTestActor( + return newCollectorActor(leaderRaftActor, LEADER_ID); + } + + private TestActorRef newCollectorActor(AbstractMockRaftActor raftActor, String id) { + TestActorRef collectorActor = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId(LEADER_ID + "Collector")); - leaderRaftActor.setCollectorActor(leaderCollectorActor); - return leaderCollectorActor; + actorFactory.generateActorId(id + "Collector")); + raftActor.setCollectorActor(collectorActor); + return collectorActor; } private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) { -- 2.36.6