This makes more sense. The RaftReplicator does not seem to add any value.
Change-Id: Id3cf8ecbd2493b35c1a32382d65876795a78fa30
Signed-off-by: Moiz Raja <moraja@cisco.com>
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.concurrent.atomic.AtomicLong;
AtomicLong getLastApplied();
/**
- *
+ * @return A representation of the log
*/
ReplicatedLog getReplicatedLog();
+
+ /**
+ * @return The ActorSystem associated with this context
+ */
+ ActorSystem getActorSystem();
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
@Override public ReplicatedLog getReplicatedLog() {
return replicatedLog;
}
+
+ @Override public ActorSystem getActorSystem() {
+ return context.system();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A RaftReplicator is responsible for replicating messages to any one follower.
- * Once it gets a message for replication it should keep trying to replicate it
- * to the remote follower indefinitely.
- * <p>
- * Any new messages that are sent to this actor while it is replicating a
- * message may need to be stashed till the current message has been successfully
- * replicated. When a message is successfully replicated the RaftReplicator
- * needs to inform the RaftActor of it.
- */
-public class RaftReplicator extends UntypedActor {
-
- /**
- * The interval at which a heart beat message will be sent to the remote
- * RaftActor
- *
- * Since this is set to 100 milliseconds the Election timeout should be
- * at least 200 milliseconds
- *
- */
- private static final FiniteDuration HEART_BEAT_INTERVAL =
- new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
- /**
- * The state of the follower as known to this replicator
- */
- private final FollowerLogInformation followerLogInformation;
-
- /**
- * The local RaftActor that created this replicator so that it could
- * replicate messages to the follower
- */
- private final ActorRef leader;
-
-
- /**
- * The remote RaftActor to whom the messages need to be replicated
- */
- private ActorSelection follower;
-
- private Cancellable heartbeatCancel = null;
-
- public RaftReplicator(FollowerLogInformation followerLogInformation,
- ActorRef leader) {
-
- this.followerLogInformation = followerLogInformation;
- this.leader = leader;
- this.follower = getContext().actorSelection(followerLogInformation.getId());
-
- // Immediately schedule a heartbeat
- // Upon election: send initial empty AppendEntries RPCs
- // (heartbeat) to each server; repeat during idle periods to
- // prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
- }
-
- private void scheduleHeartBeat(FiniteDuration interval) {
- if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){
- heartbeatCancel.cancel();
- }
-
- // Schedule a heartbeat. When the scheduler triggers the replicator
- // will let the RaftActor (leader) know that a new heartbeat needs to be sent
- // Scheduling the heartbeat only once here because heartbeats do not
- // need to be sent if there are other messages being sent to the remote
- // actor.
- heartbeatCancel =
- getContext().system().scheduler().scheduleOnce(interval,
- leader, new SendHeartBeat(), getContext().dispatcher(), getSelf());
- }
-
-
-
- @Override public void onReceive(Object message) throws Exception {
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
- follower.forward(message, getContext());
- }
-
- public static Props props(final FollowerLogInformation followerLogInformation,
- final ActorRef leader) {
- return Props.create(new Creator<RaftReplicator>() {
-
- @Override public RaftReplicator create() throws Exception {
- return new RaftReplicator(followerLogInformation, leader);
- }
- });
- }
-}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* The behavior of a RaftActor when it is in the Leader state
- * <p>
+ * <p/>
* Leaders:
* <ul>
* <li> Upon election: send initial empty AppendEntries RPCs
*/
public class Leader extends AbstractRaftActorBehavior {
+ /**
+ * The interval at which a heart beat message will be sent to the remote
+ * RaftActor
+ * <p/>
+ * Since this is set to 100 milliseconds the Election timeout should be
+ * at least 200 milliseconds
+ */
+ private static final FiniteDuration HEART_BEAT_INTERVAL =
+ new FiniteDuration(100, TimeUnit.MILLISECONDS);
private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
- public Leader(RaftActorContext context, List<String> followers){
+ private final Map<String, FollowerLogInformation> followerToLog =
+ new HashMap();
+
+ private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+
+ private Cancellable heartbeatCancel = null;
+
+ public Leader(RaftActorContext context, List<String> followers) {
super(context);
- for(String follower : followers) {
+ for (String follower : followers) {
- ActorRef replicator = context.actorOf(
- RaftReplicator.props(
- new FollowerLogInformationImpl(follower,
- new AtomicLong(0),
- new AtomicLong(0)),
- context.getActor()
- )
- );
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(follower,
+ new AtomicLong(0),
+ new AtomicLong(0));
- // Create a replicator for each follower
- followerToReplicator.put(follower, replicator);
+ followerToActor.put(follower,
+ context.actorSelection(followerLogInformation.getId()));
+ followerToLog.put(follower, followerLogInformation);
}
+ // Immediately schedule a heartbeat
+ // Upon election: send initial empty AppendEntries RPCs
+ // (heartbeat) to each server; repeat during idle periods to
+ // prevent election timeouts (§5.2)
+ scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+
+
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
@Override public RaftState handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
- if(message instanceof SendHeartBeat) {
- sender.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get() , context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST, context.getCommitIndex().get()), context.getActor());
+ scheduleHeartBeat(HEART_BEAT_INTERVAL);
+
+ if (message instanceof SendHeartBeat) {
+ for (ActorSelection follower : followerToActor.values()) {
+ follower.tell(new AppendEntries(
+ context.getTermInformation().getCurrentTerm().get(),
+ context.getId(),
+ context.getReplicatedLog().last().getIndex(),
+ context.getReplicatedLog().last().getTerm(),
+ Collections.EMPTY_LIST, context.getCommitIndex().get()),
+ context.getActor());
+ }
return state();
}
return super.handleMessage(sender, message);
}
+
+ private void scheduleHeartBeat(FiniteDuration interval) {
+ if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
+ heartbeatCancel.cancel();
+ }
+
+ // Schedule a heartbeat. When the scheduler triggers the replicator
+ // will let the RaftActor (leader) know that a new heartbeat needs to be sent
+ // Scheduling the heartbeat only once here because heartbeats do not
+ // need to be sent if there are other messages being sent to the remote
+ // actor.
+ heartbeatCancel =
+ context.getActorSystem().scheduler().scheduleOnce(interval,
+ context.getActor(), new SendHeartBeat(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
}
* This messages is sent to the Leader to prompt it to send a heartbeat
* to it's followers.
*
- * Typically the RaftReplicator for a specific follower sends this message
- * to the Leader
+ * Typically the Leader to itself on a schedule
*/
public class SendHeartBeat {
}
}
};
}
+
+ @Override public ActorSystem getActorSystem() {
+ return this.system;
+ }
}
+++ /dev/null
-package org.opendaylight.controller.cluster.raft;
-
-import akka.testkit.JavaTestKit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.Assert.assertEquals;
-
-public class RaftReplicatorTest extends AbstractActorTest {
-
- @Test
- public void testThatHeartBeatIsGenerated () throws Exception {
- new JavaTestKit(getSystem()) {{
-
- new Within(duration("1 seconds")) {
- protected void run() {
-
- getSystem().actorOf(RaftReplicator.props(
- new FollowerLogInformationImpl("test",
- new AtomicLong(100), new AtomicLong(100)),
- getRef()));
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof SendHeartBeat) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- }
-
-
- };
- }};
- }
-}
public void testHandlingOfAppendEntriesWithNewerCommitIndex() throws Exception{
new JavaTestKit(getSystem()) {{
- MockRaftActorContext context =
- new MockRaftActorContext();
+ RaftActorContext context =
+ createActorContext();
- context.setLastApplied(new AtomicLong(100));
+ ((MockRaftActorContext) context).setLastApplied(new AtomicLong(100));
AppendEntries appendEntries =
new AppendEntries(100, "leader-1", 0, 0, null, 101);
protected abstract RaftActorBehavior createBehavior(RaftActorContext actorContext);
protected RaftActorBehavior createBehavior(){
- return createBehavior(new MockRaftActorContext());
+ return createBehavior(createActorContext());
+ }
+
+ protected RaftActorContext createActorContext(){
+ return new MockRaftActorContext();
}
protected AppendEntries createAppendEntriesWithNewerTerm(){
package org.opendaylight.controller.cluster.raft.behaviors;
+import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.testkit.JavaTestKit;
import junit.framework.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import java.util.ArrayList;
import java.util.Collections;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
+ private ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+ private ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+
@Test
public void testHandleMessageForUnknownMessage() throws Exception {
new JavaTestKit(getSystem()) {{
Leader leader =
- new Leader(new MockRaftActorContext(), Collections.EMPTY_LIST);
+ new Leader(createActorContext(), Collections.EMPTY_LIST);
// handle message should return the Leader state when it receives an
// unknown message
- RaftState state = leader.handleMessage(getRef(), "foo");
+ RaftState state = leader.handleMessage(senderActor, "foo");
Assert.assertEquals(RaftState.Leader, state);
}};
}
new Within(duration("1 seconds")) {
protected void run() {
+ ActorRef followerActor = getTestActor();
+
List<String> followers = new ArrayList();
- followers.add(getTestActor().path().toString());
+ followers.add(followerActor.path().toString());
- Leader leader = new Leader(new MockRaftActorContext("test", getSystem(), getTestActor()), followers);
- leader.handleMessage(getRef(), new SendHeartBeat());
+ Leader leader = new Leader(createActorContext(), followers);
+ leader.handleMessage(senderActor, new SendHeartBeat());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
@Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
return new Leader(actorContext, Collections.EMPTY_LIST);
}
+
+ @Override protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext("test", getSystem(), leaderActor);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.actor.UntypedActor;
+
+public class DoNothingActor extends UntypedActor{
+ @Override public void onReceive(Object message) throws Exception {
+
+ }
+}