Remove RaftReplicator and move hearbeat logic to the leader 53/8953/2
authorMoiz Raja <moraja@cisco.com>
Fri, 11 Jul 2014 20:30:28 +0000 (13:30 -0700)
committerMoiz Raja <moraja@cisco.com>
Sun, 20 Jul 2014 17:40:36 +0000 (10:40 -0700)
This makes more sense. The RaftReplicator does not seem to add any value.

Change-Id: Id3cf8ecbd2493b35c1a32382d65876795a78fa30
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendHeartBeat.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftReplicatorTest.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java [new file with mode: 0644]

index a78b890..554461a 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 
 import java.util.concurrent.atomic.AtomicLong;
@@ -72,7 +73,12 @@ public interface RaftActorContext {
     AtomicLong getLastApplied();
 
     /**
-     *
+     * @return A representation of the log
      */
     ReplicatedLog getReplicatedLog();
+
+    /**
+     * @return The ActorSystem associated with this context
+     */
+    ActorSystem getActorSystem();
 }
index 9ec8ddd..1fdc3c6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
 
@@ -75,4 +76,8 @@ public class RaftActorContextImpl implements RaftActorContext{
     @Override public ReplicatedLog getReplicatedLog() {
         return replicatedLog;
     }
+
+    @Override public ActorSystem getActorSystem() {
+        return context.system();
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftReplicator.java
deleted file mode 100644 (file)
index 73decee..0000000
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A RaftReplicator is responsible for replicating messages to any one follower.
- * Once it gets a message for replication it should keep trying to replicate it
- * to the remote follower indefinitely.
- * <p>
- * Any new messages that are sent to this actor while it is replicating a
- * message may need to be stashed till the current message has been successfully
- * replicated. When a message is successfully replicated the RaftReplicator
- * needs to inform the RaftActor of it.
- */
-public class RaftReplicator extends UntypedActor {
-
-    /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     *
-     * Since this is set to 100 milliseconds the Election timeout should be
-     * at least 200 milliseconds
-     *
-     */
-    private static final FiniteDuration HEART_BEAT_INTERVAL =
-        new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
-    /**
-     * The state of the follower as known to this replicator
-     */
-    private final FollowerLogInformation followerLogInformation;
-
-    /**
-     * The local RaftActor that created this replicator so that it could
-     * replicate messages to the follower
-     */
-    private final ActorRef leader;
-
-
-    /**
-     * The remote RaftActor to whom the messages need to be replicated
-     */
-    private ActorSelection follower;
-
-    private Cancellable heartbeatCancel = null;
-
-    public RaftReplicator(FollowerLogInformation followerLogInformation,
-        ActorRef leader) {
-
-        this.followerLogInformation = followerLogInformation;
-        this.leader = leader;
-        this.follower = getContext().actorSelection(followerLogInformation.getId());
-
-        // Immediately schedule a heartbeat
-        // Upon election: send initial empty AppendEntries RPCs
-        // (heartbeat) to each server; repeat during idle periods to
-        // prevent election timeouts (§5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
-    }
-
-    private void scheduleHeartBeat(FiniteDuration interval) {
-        if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){
-            heartbeatCancel.cancel();
-        }
-
-        // Schedule a heartbeat. When the scheduler triggers the replicator
-        // will let the RaftActor (leader) know that a new heartbeat needs to be sent
-        // Scheduling the heartbeat only once here because heartbeats do not
-        // need to be sent if there are other messages being sent to the remote
-        // actor.
-        heartbeatCancel =
-            getContext().system().scheduler().scheduleOnce(interval,
-                leader, new SendHeartBeat(), getContext().dispatcher(), getSelf());
-    }
-
-
-
-    @Override public void onReceive(Object message) throws Exception {
-        scheduleHeartBeat(HEART_BEAT_INTERVAL);
-        follower.forward(message, getContext());
-    }
-
-    public static Props props(final FollowerLogInformation followerLogInformation,
-        final ActorRef leader) {
-        return Props.create(new Creator<RaftReplicator>() {
-
-            @Override public RaftReplicator create() throws Exception {
-                return new RaftReplicator(followerLogInformation, leader);
-            }
-        });
-    }
-}
index 4adf8d0..6c3eee5 100644 (file)
@@ -9,26 +9,30 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
- * <p>
+ * <p/>
  * Leaders:
  * <ul>
  * <li> Upon election: send initial empty AppendEntries RPCs
@@ -50,28 +54,48 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class Leader extends AbstractRaftActorBehavior {
 
+    /**
+     * The interval at which a heart beat message will be sent to the remote
+     * RaftActor
+     * <p/>
+     * Since this is set to 100 milliseconds the Election timeout should be
+     * at least 200 milliseconds
+     */
+    private static final FiniteDuration HEART_BEAT_INTERVAL =
+        new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
     private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
 
-    public Leader(RaftActorContext context, List<String> followers){
+    private final Map<String, FollowerLogInformation> followerToLog =
+        new HashMap();
+
+    private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+
+    private Cancellable heartbeatCancel = null;
+
+    public Leader(RaftActorContext context, List<String> followers) {
         super(context);
 
-        for(String follower : followers) {
+        for (String follower : followers) {
 
-            ActorRef replicator = context.actorOf(
-                RaftReplicator.props(
-                    new FollowerLogInformationImpl(follower,
-                        new AtomicLong(0),
-                        new AtomicLong(0)),
-                    context.getActor()
-                )
-            );
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(follower,
+                    new AtomicLong(0),
+                    new AtomicLong(0));
 
-            // Create a replicator for each follower
-            followerToReplicator.put(follower, replicator);
+            followerToActor.put(follower,
+                context.actorSelection(followerLogInformation.getId()));
+            followerToLog.put(follower, followerLogInformation);
 
         }
 
+        // Immediately schedule a heartbeat
+        // Upon election: send initial empty AppendEntries RPCs
+        // (heartbeat) to each server; repeat during idle periods to
+        // prevent election timeouts (§5.2)
+        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+
+
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
@@ -101,14 +125,37 @@ public class Leader extends AbstractRaftActorBehavior {
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
-        if(message instanceof SendHeartBeat) {
-            sender.tell(new AppendEntries(
-                context.getTermInformation().getCurrentTerm().get() , context.getId(),
-                context.getReplicatedLog().last().getIndex(),
-                context.getReplicatedLog().last().getTerm(),
-                Collections.EMPTY_LIST, context.getCommitIndex().get()), context.getActor());
+        scheduleHeartBeat(HEART_BEAT_INTERVAL);
+
+        if (message instanceof SendHeartBeat) {
+            for (ActorSelection follower : followerToActor.values()) {
+                follower.tell(new AppendEntries(
+                    context.getTermInformation().getCurrentTerm().get(),
+                    context.getId(),
+                    context.getReplicatedLog().last().getIndex(),
+                    context.getReplicatedLog().last().getTerm(),
+                    Collections.EMPTY_LIST, context.getCommitIndex().get()),
+                    context.getActor());
+            }
             return state();
         }
         return super.handleMessage(sender, message);
     }
+
+    private void scheduleHeartBeat(FiniteDuration interval) {
+        if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
+            heartbeatCancel.cancel();
+        }
+
+        // Schedule a heartbeat. When the scheduler triggers the replicator
+        // will let the RaftActor (leader) know that a new heartbeat needs to be sent
+        // Scheduling the heartbeat only once here because heartbeats do not
+        // need to be sent if there are other messages being sent to the remote
+        // actor.
+        heartbeatCancel =
+            context.getActorSystem().scheduler().scheduleOnce(interval,
+                context.getActor(), new SendHeartBeat(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
 }
index 0d73a11..5048cbb 100644 (file)
@@ -12,8 +12,7 @@ package org.opendaylight.controller.cluster.raft.internal.messages;
  * This messages is sent to the Leader to prompt it to send a heartbeat
  * to it's followers.
  *
- * Typically the RaftReplicator for a specific follower sends this message
- * to the Leader
+ * Typically the Leader to itself on a schedule
  */
 public class SendHeartBeat {
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftReplicatorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftReplicatorTest.java
deleted file mode 100644 (file)
index c354b82..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.opendaylight.controller.cluster.raft;
-
-import akka.testkit.JavaTestKit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.Assert.assertEquals;
-
-public class RaftReplicatorTest extends AbstractActorTest {
-
-    @Test
-    public void testThatHeartBeatIsGenerated () throws Exception {
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    getSystem().actorOf(RaftReplicator.props(
-                        new FollowerLogInformationImpl("test",
-                            new AtomicLong(100), new AtomicLong(100)),
-                        getRef()));
-
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        protected String match(Object in) {
-                            if (in instanceof SendHeartBeat) {
-                                return "match";
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertEquals("match", out);
-
-                }
-
-
-            };
-        }};
-    }
-}
index ea417a1..e6bf26c 100644 (file)
@@ -42,10 +42,10 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
     public void testHandlingOfAppendEntriesWithNewerCommitIndex() throws Exception{
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context =
-                new MockRaftActorContext();
+            RaftActorContext context =
+                createActorContext();
 
-            context.setLastApplied(new AtomicLong(100));
+            ((MockRaftActorContext) context).setLastApplied(new AtomicLong(100));
 
             AppendEntries appendEntries =
                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
@@ -69,7 +69,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
     protected abstract RaftActorBehavior createBehavior(RaftActorContext actorContext);
 
     protected RaftActorBehavior createBehavior(){
-        return createBehavior(new MockRaftActorContext());
+        return createBehavior(createActorContext());
+    }
+
+    protected RaftActorContext createActorContext(){
+        return new MockRaftActorContext();
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm(){
index d0497f3..5684d66 100644 (file)
@@ -1,5 +1,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
@@ -8,6 +10,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -17,15 +20,18 @@ import static org.junit.Assert.assertEquals;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
+    private ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+    private ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+
     @Test
     public void testHandleMessageForUnknownMessage() throws Exception {
         new JavaTestKit(getSystem()) {{
             Leader leader =
-                new Leader(new MockRaftActorContext(), Collections.EMPTY_LIST);
+                new Leader(createActorContext(), Collections.EMPTY_LIST);
 
             // handle message should return the Leader state when it receives an
             // unknown message
-            RaftState state = leader.handleMessage(getRef(), "foo");
+            RaftState state = leader.handleMessage(senderActor, "foo");
             Assert.assertEquals(RaftState.Leader, state);
         }};
     }
@@ -38,12 +44,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             new Within(duration("1 seconds")) {
                 protected void run() {
 
+                    ActorRef followerActor = getTestActor();
+
                     List<String> followers = new ArrayList();
 
-                    followers.add(getTestActor().path().toString());
+                    followers.add(followerActor.path().toString());
 
-                    Leader leader = new Leader(new MockRaftActorContext("test", getSystem(), getTestActor()), followers);
-                    leader.handleMessage(getRef(), new SendHeartBeat());
+                    Leader leader = new Leader(createActorContext(), followers);
+                    leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
@@ -72,4 +80,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
         return new Leader(actorContext, Collections.EMPTY_LIST);
     }
+
+    @Override protected RaftActorContext createActorContext() {
+        return new MockRaftActorContext("test", getSystem(), leaderActor);
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java
new file mode 100644 (file)
index 0000000..741c473
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.actor.UntypedActor;
+
+public class DoNothingActor extends UntypedActor{
+    @Override public void onReceive(Object message) throws Exception {
+
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.