From 0032979e6c27ffdc879eabc9bb9dee2ca75ee2d8 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Fri, 11 Jul 2014 13:30:28 -0700 Subject: [PATCH] Remove RaftReplicator and move hearbeat logic to the leader This makes more sense. The RaftReplicator does not seem to add any value. Change-Id: Id3cf8ecbd2493b35c1a32382d65876795a78fa30 Signed-off-by: Moiz Raja --- .../cluster/raft/RaftActorContext.java | 8 +- .../cluster/raft/RaftActorContextImpl.java | 5 + .../cluster/raft/RaftReplicator.java | 109 ------------------ .../cluster/raft/behaviors/Leader.java | 87 ++++++++++---- .../raft/internal/messages/SendHeartBeat.java | 3 +- .../cluster/raft/MockRaftActorContext.java | 4 + .../cluster/raft/RaftReplicatorTest.java | 44 ------- .../AbstractRaftActorBehaviorTest.java | 12 +- .../cluster/raft/behaviors/LeaderTest.java | 22 +++- .../cluster/raft/utils/DoNothingActor.java | 17 +++ 10 files changed, 126 insertions(+), 185 deletions(-) delete mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java delete mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftReplicatorTest.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index a78b890950..554461a76c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; import java.util.concurrent.atomic.AtomicLong; @@ -72,7 +73,12 @@ public interface RaftActorContext { AtomicLong getLastApplied(); /** - * + * @return A representation of the log */ ReplicatedLog getReplicatedLog(); + + /** + * @return The ActorSystem associated with this context + */ + ActorSystem getActorSystem(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 9ec8dddf6a..1fdc3c628f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActorContext; @@ -75,4 +76,8 @@ public class RaftActorContextImpl implements RaftActorContext{ @Override public ReplicatedLog getReplicatedLog() { return replicatedLog; } + + @Override public ActorSystem getActorSystem() { + return context.system(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java deleted file mode 100644 index 73deceeb31..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft; - -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Cancellable; -import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.japi.Creator; -import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.TimeUnit; - -/** - * A RaftReplicator is responsible for replicating messages to any one follower. - * Once it gets a message for replication it should keep trying to replicate it - * to the remote follower indefinitely. - *

- * Any new messages that are sent to this actor while it is replicating a - * message may need to be stashed till the current message has been successfully - * replicated. When a message is successfully replicated the RaftReplicator - * needs to inform the RaftActor of it. - */ -public class RaftReplicator extends UntypedActor { - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - * - * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - * - */ - private static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - /** - * The state of the follower as known to this replicator - */ - private final FollowerLogInformation followerLogInformation; - - /** - * The local RaftActor that created this replicator so that it could - * replicate messages to the follower - */ - private final ActorRef leader; - - - /** - * The remote RaftActor to whom the messages need to be replicated - */ - private ActorSelection follower; - - private Cancellable heartbeatCancel = null; - - public RaftReplicator(FollowerLogInformation followerLogInformation, - ActorRef leader) { - - this.followerLogInformation = followerLogInformation; - this.leader = leader; - this.follower = getContext().actorSelection(followerLogInformation.getId()); - - // Immediately schedule a heartbeat - // Upon election: send initial empty AppendEntries RPCs - // (heartbeat) to each server; repeat during idle periods to - // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); - } - - private void scheduleHeartBeat(FiniteDuration interval) { - if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){ - heartbeatCancel.cancel(); - } - - // Schedule a heartbeat. When the scheduler triggers the replicator - // will let the RaftActor (leader) know that a new heartbeat needs to be sent - // Scheduling the heartbeat only once here because heartbeats do not - // need to be sent if there are other messages being sent to the remote - // actor. - heartbeatCancel = - getContext().system().scheduler().scheduleOnce(interval, - leader, new SendHeartBeat(), getContext().dispatcher(), getSelf()); - } - - - - @Override public void onReceive(Object message) throws Exception { - scheduleHeartBeat(HEART_BEAT_INTERVAL); - follower.forward(message, getContext()); - } - - public static Props props(final FollowerLogInformation followerLogInformation, - final ActorRef leader) { - return Props.create(new Creator() { - - @Override public RaftReplicator create() throws Exception { - return new RaftReplicator(followerLogInformation, leader); - } - }); - } -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 4adf8d08fd..6c3eee5415 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -9,26 +9,30 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftReplicator; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import scala.concurrent.duration.FiniteDuration; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * The behavior of a RaftActor when it is in the Leader state - *

+ *

* Leaders: *