From: Tony Tkacik Date: Mon, 28 Jul 2014 08:53:03 +0000 (+0000) Subject: Merge "BUG-1381: do not use JavassistUtils.getLock()" X-Git-Tag: release/helium~429 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a449700f10ad3733d64555510d5d6b8012c9deaf;hp=47682b456bcf31e444e374fc48f7d191d33b214c Merge "BUG-1381: do not use JavassistUtils.getLock()" --- diff --git a/opendaylight/md-sal/forwardingrules-manager/pom.xml b/opendaylight/md-sal/forwardingrules-manager/pom.xml index b3096e6478..ed5e192193 100644 --- a/opendaylight/md-sal/forwardingrules-manager/pom.xml +++ b/opendaylight/md-sal/forwardingrules-manager/pom.xml @@ -15,10 +15,6 @@ junit junit - - org.opendaylight.controller - flow-management-compatibility - org.opendaylight.controller sal-binding-api diff --git a/opendaylight/md-sal/sal-akka-raft/pom.xml b/opendaylight/md-sal/sal-akka-raft/pom.xml new file mode 100644 index 0000000000..50442bda5c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/pom.xml @@ -0,0 +1,135 @@ + + + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.1-SNAPSHOT + + sal-akka-raft + bundle + + + + com.google.guava + guava + + + + com.typesafe.akka + akka-actor_${scala.version} + + + + com.typesafe.akka + akka-cluster_${scala.version} + + + + com.typesafe.akka + akka-persistence-experimental_${scala.version} + + + + com.typesafe.akka + akka-remote_${scala.version} + + + + com.typesafe.akka + akka-testkit_${scala.version} + + + + org.osgi + org.osgi.core + + + + org.scala-lang + scala-library + + + + + junit + junit + test + + + org.mockito + mockito-all + test + + + + org.slf4j + slf4j-simple + ${slf4j.version} + test + + + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + ${project.groupId}.${project.artifactId} + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.jacoco + jacoco-maven-plugin + + + org.opendaylight.controller.* + + false + + + + pre-test + + prepare-agent + + + + post-test + + report + + test + + + + + + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + HEAD + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering + + diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java new file mode 100644 index 0000000000..2560f16588 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; +import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; + +public class ClientActor extends UntypedActor { + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + private final ActorRef target; + + public ClientActor(ActorRef target){ + this.target = target; + } + + public static Props props(final ActorRef target){ + return Props.create(new Creator(){ + + @Override public ClientActor create() throws Exception { + return new ClientActor(target); + } + }); + } + + @Override public void onReceive(Object message) throws Exception { + if(message instanceof KeyValue) { + target.tell(message, getSelf()); + } else if(message instanceof KeyValueSaved){ + LOG.info("KeyValue saved"); + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java new file mode 100644 index 0000000000..8d4d5e48c8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; +import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; +import org.opendaylight.controller.cluster.example.messages.PrintRole; +import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.RaftActor; + +import java.util.HashMap; +import java.util.Map; + +/** + * A sample actor showing how the RaftActor is to be extended + */ +public class ExampleActor extends RaftActor { + + private final Map state = new HashMap(); + + private long persistIdentifier = 1; + + + public ExampleActor(String id, Map peerAddresses) { + super(id, peerAddresses); + } + + public static Props props(final String id, final Map peerAddresses){ + return Props.create(new Creator(){ + + @Override public ExampleActor create() throws Exception { + return new ExampleActor(id, peerAddresses); + } + }); + } + + @Override public void onReceiveCommand(Object message){ + if(message instanceof KeyValue){ + + if(isLeader()) { + String persistId = Long.toString(persistIdentifier++); + persistData(getSender(), persistId, message); + } else { + if(getLeader() != null) { + getLeader().forward(message, getContext()); + } + } + + } else if (message instanceof PrintState) { + LOG.debug("State of the node:"+getId() + " is="+state.size()); + + } else if (message instanceof PrintRole) { + LOG.debug(getId() + " = " + getRaftState()); + } + super.onReceiveCommand(message); + } + + @Override protected void applyState(ActorRef clientActor, String identifier, + Object data) { + if(data instanceof KeyValue){ + KeyValue kv = (KeyValue) data; + state.put(kv.getKey(), kv.getValue()); + if(clientActor != null) { + clientActor.tell(new KeyValueSaved(), getSelf()); + } + } + } + + @Override protected Object createSnapshot() { + return state; + } + + @Override protected void applySnapshot(Object snapshot) { + state.clear(); + state.putAll((HashMap) snapshot); + } + + @Override public void onReceiveRecover(Object message) { + super.onReceiveRecover(message); + } + + @Override public String persistenceId() { + return getId(); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java new file mode 100644 index 0000000000..fbe1447c72 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/LogGenerator.java @@ -0,0 +1,67 @@ +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.example.messages.KeyValue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * Created by kramesha on 7/16/14. + */ +public class LogGenerator { + private Map clientToLoggingThread = new HashMap(); + + public void startLoggingForClient(ActorRef client) { + LoggingThread lt = new LoggingThread(client); + clientToLoggingThread.put(client, lt); + Thread t = new Thread(lt); + t.start(); + } + + public void stopLoggingForClient(ActorRef client) { + clientToLoggingThread.get(client).stopLogging(); + clientToLoggingThread.remove(client); + } + + public class LoggingThread implements Runnable { + + private ActorRef clientActor; + private volatile boolean stopLogging = false; + + public LoggingThread(ActorRef clientActor) { + this.clientActor = clientActor; + } + + public void run() { + Random r = new Random(); + while (true) { + if (stopLogging) { + System.out.println("Logging stopped for client:" + clientActor.path()); + break; + } + String key = clientActor.path().name(); + int random = r.nextInt(100); + clientActor.tell(new KeyValue(key+"-key-" + random, "value-" + random), null); + try { + Thread.sleep((random%10) * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public void stopLogging() { + stopLogging = true; + } + + public void startLogging() { + stopLogging = false; + } + + + } + + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java new file mode 100644 index 0000000000..a148ed4009 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import org.opendaylight.controller.cluster.example.messages.KeyValue; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Main { + private static final ActorSystem actorSystem = ActorSystem.create(); + // Create three example actors + private static Map allPeers = new HashMap<>(); + + static { + allPeers.put("example-1", "akka://default/user/example-1"); + allPeers.put("example-2", "akka://default/user/example-2"); + allPeers.put("example-3", "akka://default/user/example-3"); + } + + public static void main(String[] args) throws Exception{ + ActorRef example1Actor = + actorSystem.actorOf(ExampleActor.props("example-1", + withoutPeer("example-1")), "example-1"); + + ActorRef example2Actor = + actorSystem.actorOf(ExampleActor.props("example-2", + withoutPeer("example-2")), "example-2"); + + ActorRef example3Actor = + actorSystem.actorOf(ExampleActor.props("example-3", + withoutPeer("example-3")), "example-3"); + + + List examples = Arrays.asList(example1Actor, example2Actor, example3Actor); + + ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor)); + BufferedReader br = + new BufferedReader(new InputStreamReader(System.in)); + + System.out.println("Usage :"); + System.out.println("s <1-3> to start a peer"); + System.out.println("k <1-3> to kill a peer"); + + while(true) { + System.out.print("Enter command (0 to exit):"); + try { + String s = br.readLine(); + String[] split = s.split(" "); + if(split.length > 1) { + String command = split[0]; + String actor = split[1]; + + if ("k".equals(command)) { + int i = Integer.parseInt(actor); + examples.get(i - 1) + .tell(PoisonPill.getInstance(), null); + continue; + } else if ("s".equals(command)) { + int i = Integer.parseInt(actor); + String actorName = "example-" + i; + examples.add(i - 1, + actorSystem.actorOf(ExampleActor.props(actorName, + withoutPeer(actorName)), actorName)); + System.out.println("Created actor : " + actorName); + continue; + } + } + + int i = Integer.parseInt(s); + if(i == 0){ + System.exit(0); + } + clientActor.tell(new KeyValue("key " + i, "value " + i), null); + } catch (NumberFormatException nfe) { + System.err.println("Invalid Format!"); + } + } + } + + private static Map withoutPeer(String peerId) { + Map without = new HashMap<>(allPeers); + without.remove(peerId); + return without; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java new file mode 100644 index 0000000000..c8a7835334 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -0,0 +1,246 @@ +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.opendaylight.controller.cluster.example.messages.PrintRole; +import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; +import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + + +public class TestDriver { + + private static final ActorSystem actorSystem = ActorSystem.create(); + private static Map allPeers = new HashMap<>(); + private static Map clientActorRefs = new HashMap(); + private static Map actorRefs = new HashMap(); + private static LogGenerator logGenerator = new LogGenerator();; + + /** + * Create nodes, add clients and start logging. + * Commands + * bye + * createNodes:{num} + * addNodes:{num} + * stopNode:{nodeName} + * addClients:{num} + * addClientsToNode:{nodeName, num} + * startLogging + * stopLogging + * startLoggingForClient:{nodeName} + * stopLoggingForClient:{nodeName} + * printNodes + * printState + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + TestDriver td = new TestDriver(); + + System.out.println("Enter command (type bye to exit):"); + + + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + while(true) { + String command = br.readLine(); + if (command.startsWith("bye")) { + System.exit(0); + + } else if (command.startsWith("createNodes")) { + String[] arr = command.split(":"); + int n = Integer.parseInt(arr[1]); + td.createNodes(n); + + } else if (command.startsWith("addNodes")) { + String[] arr = command.split(":"); + int n = Integer.parseInt(arr[1]); + td.addNodes(n); + + } else if (command.startsWith("addClients")) { + String[] arr = command.split(":"); + int n = Integer.parseInt(arr[1]); + td.addClients(n); + + } else if (command.startsWith("addClientsToNode")) { + String[] arr = command.split(":"); + String nodeName = arr[1]; + int n = Integer.parseInt(arr[1]); + td.addClientsToNode(nodeName, n); + + } else if (command.startsWith("stopNode")) { + String[] arr = command.split(":"); + td.stopNode(arr[1]); + + } else if (command.startsWith("startLogging")) { + td.startAllLogging(); + + } else if (command.startsWith("startLoggingForClient")) { + String[] arr = command.split(":"); + td.startLoggingForClient(clientActorRefs.get(arr[1])); + + } else if (command.startsWith("stopLogging")) { + td.stopAllLogging(); + + } else if (command.startsWith("stopLoggingForClient")) { + String[] arr = command.split(":"); + td.stopLoggingForClient(clientActorRefs.get(arr[1])); + + } else if (command.startsWith("printState")) { + td.printState(); + } else if (command.startsWith("printNodes")) { + td.printNodes(); + } + + } + } + + public void createNodes(int num) { + for (int i=0; i < num; i++) { + int rand = getUnusedRandom(num); + allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + } + + for (String s : allPeers.keySet()) { + ActorRef exampleActor = actorSystem.actorOf( + ExampleActor.props(s, withoutPeer(s)), s); + actorRefs.put(s, exampleActor); + System.out.println("Created node:"+s); + + } + } + + // add new nodes , pass in the count + public void addNodes(int num) { + Map newPeers = new HashMap<>(); + for (int i=0; i < num; i++) { + int rand = getUnusedRandom(num); + newPeers.put("example-"+rand, "akka://default/user/example-"+rand); + allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + + } + Map newActorRefs = new HashMap(num); + for (Map.Entry entry : newPeers.entrySet()) { + ActorRef exampleActor = actorSystem.actorOf( + ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey()); + newActorRefs.put(entry.getKey(), exampleActor); + + //now also add these new nodes as peers from the previous nodes + for (ActorRef actor : actorRefs.values()) { + actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null); + } + + System.out.println("Added node:" + entry); + } + + actorRefs.putAll(newActorRefs); + } + + + // add num clients to all nodes in the system + public void addClients(int num) { + for(Map.Entry actorRefEntry : actorRefs.entrySet()) { + for (int i=0; i < num; i++) { + String clientName = "client-" + i + "-" + actorRefEntry.getKey(); + ActorRef clientActor = actorSystem.actorOf( + ClientActor.props(actorRefEntry.getValue()), clientName); + clientActorRefs.put(clientName, clientActor); + System.out.println("Created client-node:" + clientName); + } + } + } + + // add num clients to a node + public void addClientsToNode(String actorName, int num) { + ActorRef actorRef = actorRefs.get(actorName); + for (int i=0; i < num; i++) { + String clientName = "client-" + i + "-" + actorRef; + clientActorRefs.put(clientName, + actorSystem.actorOf(ClientActor.props(actorRef), clientName)); + System.out.println("Added client-node:" + clientName); + } + } + + public void stopNode(String actorName) { + ActorRef actorRef = actorRefs.get(actorName); + String clientName = "client-"+actorName; + if(clientActorRefs.containsKey(clientName)) { + actorSystem.stop(clientActorRefs.get(clientName)); + clientActorRefs.remove(clientName); + } + actorSystem.stop(actorRef); + actorRefs.remove(actorName); + + for (ActorRef actor : actorRefs.values()) { + actor.tell(new RemoveRaftPeer(actorName), null); + } + + allPeers.remove(actorName); + + } + + public void startAllLogging() { + if(!clientActorRefs.isEmpty()) { + for(Map.Entry client : clientActorRefs.entrySet()) { + logGenerator.startLoggingForClient(client.getValue()); + System.out.println("Started logging for client:"+client.getKey()); + } + } else { + System.out.println("There are no clients for any nodes. First create clients using commands- addClients: or addClientsToNode::"); + } + + } + + public void startLoggingForClient(ActorRef client) { + logGenerator.startLoggingForClient(client); + } + + public void stopAllLogging() { + for(Map.Entry client : clientActorRefs.entrySet()) { + logGenerator.stopLoggingForClient(client.getValue()); + } + } + + public void stopLoggingForClient(ActorRef client) { + logGenerator.stopLoggingForClient(client); + } + + public void printState() { + for (ActorRef ref : actorRefs.values()) { + ref.tell(new PrintState(), null); + } + } + + public void printNodes() { + for (ActorRef ref : actorRefs.values()) { + ref.tell(new PrintRole(), null); + } + } + + public ActorRef getLeader() { + return null; + } + + private int getUnusedRandom(int num) { + int rand = -1; + do { + rand = (new Random()).nextInt(num * num); + } while (allPeers.keySet().contains("example-"+rand)); + + return rand; + } + + private static Map withoutPeer(String peerId) { + Map without = new ConcurrentHashMap<>(allPeers); + without.remove(peerId); + + return without; + } +} + diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java new file mode 100644 index 0000000000..00cc09ae30 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example.messages; + +import java.io.Serializable; + +public class KeyValue implements Serializable{ + private final String key; + private final String value; + + public KeyValue(String key, String value){ + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override public String toString() { + return "KeyValue{" + + "key='" + key + '\'' + + ", value='" + value + '\'' + + '}'; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java new file mode 100644 index 0000000000..e10e5a7b43 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example.messages; + +public class KeyValueSaved { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java new file mode 100644 index 0000000000..c9d4bfa72c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java @@ -0,0 +1,7 @@ +package org.opendaylight.controller.cluster.example.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class PrintRole { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java new file mode 100644 index 0000000000..dbf863df9a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java @@ -0,0 +1,7 @@ +package org.opendaylight.controller.cluster.example.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class PrintState { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java new file mode 100644 index 0000000000..4972b348ff --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; + +public interface ClientRequestTracker { + /** + * The client actor who is waiting for a response + * + * @return + */ + ActorRef getClientActor(); + + /** + * + * @return + */ + String getIdentifier(); + + /** + * The index of the log entry which needs to be replicated + * + * @return + */ + long getIndex(); + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java new file mode 100644 index 0000000000..15de6d01a7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; + +public class ClientRequestTrackerImpl implements ClientRequestTracker { + + private final ActorRef clientActor; + private final String identifier; + private final long logIndex; + + public ClientRequestTrackerImpl(ActorRef clientActor, String identifier, + long logIndex) { + + this.clientActor = clientActor; + + this.identifier = identifier; + + this.logIndex = logIndex; + } + + @Override public ActorRef getClientActor() { + return clientActor; + } + + @Override public long getIndex() { + return logIndex; + } + + public String getIdentifier() { + return identifier; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java new file mode 100644 index 0000000000..9f0d02edb9 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +/** + * ElectionTerm contains information about a RaftActors election term. + *

+ * This information includes the last known current term of the RaftActor + * and which peer was voted for by the RaftActor in that term + *

+ * This class ensures that election term information is persisted + */ +public interface ElectionTerm { + /** + * latest term server has seen (initialized to 0 + * on first boot, increases monotonically) + */ + long getCurrentTerm(); + + /** + * candidateId that received vote in current + * term (or null if none) + */ + String getVotedFor(); + + /** + * To be called mainly when we are recovering in-memory election state from + * persistent storage + * + * @param currentTerm + * @param votedFor + */ + void update(long currentTerm, String votedFor); + + /** + * To be called when we need to update the current term either because we + * received a message from someone with a more up-to-date term or because we + * just voted for someone + *

+ * This information needs to be persisted so that on recovery the replica + * can start itself in the right term and know if it has already voted in + * that term or not + * + * @param currentTerm + * @param votedFor + */ + void updateAndPersist(long currentTerm, String votedFor); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java new file mode 100644 index 0000000000..f3de983538 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * The state of the followers log as known by the Leader + */ +public interface FollowerLogInformation { + + /** + * Increment the value of the nextIndex + * @return + */ + public long incrNextIndex(); + + /** + * Decrement the value of the nextIndex + * @return + */ + public long decrNextIndex(); + + /** + * + * @param nextIndex + */ + void setNextIndex(long nextIndex); + + /** + * Increment the value of the matchIndex + * @return + */ + public long incrMatchIndex(); + + public void setMatchIndex(long matchIndex); + + /** + * The identifier of the follower + * This could simply be the url of the remote actor + */ + public String getId(); + + /** + * for each server, index of the next log entry + * to send to that server (initialized to leader + * last log index + 1) + */ + public AtomicLong getNextIndex(); + + /** + * for each server, index of highest log entry + * known to be replicated on server + * (initialized to 0, increases monotonically) + */ + public AtomicLong getMatchIndex(); + + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java new file mode 100644 index 0000000000..94f9a53a85 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import java.util.concurrent.atomic.AtomicLong; + +public class FollowerLogInformationImpl implements FollowerLogInformation{ + + private final String id; + + private final AtomicLong nextIndex; + + private final AtomicLong matchIndex; + + public FollowerLogInformationImpl(String id, AtomicLong nextIndex, + AtomicLong matchIndex) { + this.id = id; + this.nextIndex = nextIndex; + this.matchIndex = matchIndex; + } + + public long incrNextIndex(){ + return nextIndex.incrementAndGet(); + } + + @Override public long decrNextIndex() { + return nextIndex.decrementAndGet(); + } + + @Override public void setNextIndex(long nextIndex) { + this.nextIndex.set(nextIndex); + } + + public long incrMatchIndex(){ + return matchIndex.incrementAndGet(); + } + + @Override public void setMatchIndex(long matchIndex) { + this.matchIndex.set(matchIndex); + } + + public String getId() { + return id; + } + + public AtomicLong getNextIndex() { + return nextIndex; + } + + public AtomicLong getMatchIndex() { + return matchIndex; + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java new file mode 100644 index 0000000000..0ff2341c70 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -0,0 +1,673 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Procedure; +import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.UntypedPersistentActor; +import org.opendaylight.controller.cluster.raft.behaviors.Candidate; +import org.opendaylight.controller.cluster.raft.behaviors.Follower; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * RaftActor encapsulates a state machine that needs to be kept synchronized + * in a cluster. It implements the RAFT algorithm as described in the paper + * + * In Search of an Understandable Consensus Algorithm + *

+ * RaftActor has 3 states and each state has a certain behavior associated + * with it. A Raft actor can behave as, + *

    + *
  • A Leader
  • + *
  • A Follower (or)
  • + *
  • A Candidate
  • + *
+ *

+ *

+ * A RaftActor MUST be a Leader in order to accept requests from clients to + * change the state of it's encapsulated state machine. Once a RaftActor becomes + * a Leader it is also responsible for ensuring that all followers ultimately + * have the same log and therefore the same state machine as itself. + *

+ *

+ * The current behavior of a RaftActor determines how election for leadership + * is initiated and how peer RaftActors react to request for votes. + *

+ *

+ * Each RaftActor also needs to know the current election term. It uses this + * information for a couple of things. One is to simply figure out who it + * voted for in the last election. Another is to figure out if the message + * it received to update it's state is stale. + *

+ *

+ * The RaftActor uses akka-persistence to store it's replicated log. + * Furthermore through it's behaviors a Raft Actor determines + *

+ *

    + *
  • when a log entry should be persisted
  • + *
  • when a log entry should be applied to the state machine (and)
  • + *
  • when a snapshot should be saved
  • + *
+ */ +public abstract class RaftActor extends UntypedPersistentActor { + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + /** + * The current state determines the current behavior of a RaftActor + * A Raft Actor always starts off in the Follower State + */ + private RaftActorBehavior currentBehavior; + + /** + * This context should NOT be passed directly to any other actor it is + * only to be consumed by the RaftActorBehaviors + */ + private RaftActorContext context; + + /** + * The in-memory journal + */ + private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); + + + public RaftActor(String id, Map peerAddresses) { + context = new RaftActorContextImpl(this.getSelf(), + this.getContext(), + id, new ElectionTermImpl(), + -1, -1, replicatedLog, peerAddresses, LOG); + } + + @Override public void onReceiveRecover(Object message) { + if (message instanceof SnapshotOffer) { + SnapshotOffer offer = (SnapshotOffer) message; + Snapshot snapshot = (Snapshot) offer.snapshot(); + + // Create a replicated log with the snapshot information + // The replicated log can be used later on to retrieve this snapshot + // when we need to install it on a peer + replicatedLog = new ReplicatedLogImpl(snapshot); + + // Apply the snapshot to the actors state + applySnapshot(snapshot.getState()); + + } else if (message instanceof ReplicatedLogEntry) { + replicatedLog.append((ReplicatedLogEntry) message); + } else if (message instanceof DeleteEntries) { + replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + LOG.debug( + "Last index in log : " + replicatedLog.lastIndex()); + currentBehavior = switchBehavior(RaftState.Follower); + } + } + + @Override public void onReceiveCommand(Object message) { + if (message instanceof ApplyState){ + ApplyState applyState = (ApplyState) message; + + LOG.debug("Applying state for log index {} data {}", + applyState.getReplicatedLogEntry().getIndex(), + applyState.getReplicatedLogEntry().getData()); + + applyState(applyState.getClientActor(), applyState.getIdentifier(), + applyState.getReplicatedLogEntry().getData()); + } else if(message instanceof ApplySnapshot ) { + applySnapshot(((ApplySnapshot) message).getSnapshot()); + } else if (message instanceof FindLeader) { + getSender().tell( + new FindLeaderReply( + context.getPeerAddress(currentBehavior.getLeaderId())), + getSelf() + ); + } else if (message instanceof SaveSnapshotSuccess) { + SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + + // TODO: Not sure if we want to be this aggressive with trimming stuff + trimPersistentData(success.metadata().sequenceNr()); + + } else if (message instanceof SaveSnapshotFailure) { + + // TODO: Handle failure in saving the snapshot + // Maybe do retries on failure + + } else if (message instanceof AddRaftPeer){ + + // FIXME : Do not add raft peers like this. + // When adding a new Peer we have to ensure that the a majority of + // the peers know about the new Peer. Doing it this way may cause + // a situation where multiple Leaders may emerge + AddRaftPeer arp = (AddRaftPeer)message; + context.addToPeers(arp.getName(), arp.getAddress()); + + } else if (message instanceof RemoveRaftPeer){ + + RemoveRaftPeer rrp = (RemoveRaftPeer)message; + context.removePeer(rrp.getName()); + + } else { + + RaftState state = + currentBehavior.handleMessage(getSender(), message); + currentBehavior = switchBehavior(state); + } + } + + + + /** + * When a derived RaftActor needs to persist something it must call + * persistData. + * + * @param clientActor + * @param identifier + * @param data + */ + protected void persistData(ActorRef clientActor, String identifier, + Object data) { + + ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( + context.getReplicatedLog().lastIndex() + 1, + context.getTermInformation().getCurrentTerm(), data); + + LOG.debug("Persist data {}", replicatedLogEntry); + + replicatedLog + .appendAndPersist(clientActor, identifier, replicatedLogEntry); + } + + protected String getId() { + return context.getId(); + } + + /** + * Derived actors can call the isLeader method to check if the current + * RaftActor is the Leader or not + * + * @return true it this RaftActor is a Leader false otherwise + */ + protected boolean isLeader() { + return context.getId().equals(currentBehavior.getLeaderId()); + } + + /** + * Derived actor can call getLeader if they need a reference to the Leader. + * This would be useful for example in forwarding a request to an actor + * which is the leader + * + * @return A reference to the leader if known, null otherwise + */ + protected ActorSelection getLeader(){ + String leaderId = currentBehavior.getLeaderId(); + if (leaderId == null) { + return null; + } + String peerAddress = context.getPeerAddress(leaderId); + LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + + peerAddress); + return context.actorSelection(peerAddress); + } + + protected RaftState getRaftState() { + return currentBehavior.state(); + } + + + + /** + * The applyState method will be called by the RaftActor when some data + * needs to be applied to the actor's state + * + * @param clientActor A reference to the client who sent this message. This + * is the same reference that was passed to persistData + * by the derived actor. clientActor may be null when + * the RaftActor is behaving as a follower or during + * recovery. + * @param identifier The identifier of the persisted data. This is also + * the same identifier that was passed to persistData by + * the derived actor. identifier may be null when + * the RaftActor is behaving as a follower or during + * recovery + * @param data A piece of data that was persisted by the persistData call. + * This should NEVER be null. + */ + protected abstract void applyState(ActorRef clientActor, String identifier, + Object data); + + /** + * This method will be called by the RaftActor when a snapshot needs to be + * created. The derived actor should respond with its current state. + *

+ * During recovery the state that is returned by the derived actor will + * be passed back to it by calling the applySnapshot method + * + * @return The current state of the actor + */ + protected abstract Object createSnapshot(); + + /** + * This method will be called by the RaftActor during recovery to + * reconstruct the state of the actor. + *

+ * This method may also be called at any other point during normal + * operations when the derived actor is out of sync with it's peers + * and the only way to bring it in sync is by applying a snapshot + * + * @param snapshot A snapshot of the state of the actor + */ + protected abstract void applySnapshot(Object snapshot); + + private RaftActorBehavior switchBehavior(RaftState state) { + if (currentBehavior != null) { + if (currentBehavior.state() == state) { + return currentBehavior; + } + LOG.info("Switching from state " + currentBehavior.state() + " to " + + state); + + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.error(e, + "Failed to close behavior : " + currentBehavior.state()); + } + + } else { + LOG.info("Switching behavior to " + state); + } + RaftActorBehavior behavior = null; + if (state == RaftState.Candidate) { + behavior = new Candidate(context); + } else if (state == RaftState.Follower) { + behavior = new Follower(context); + } else { + behavior = new Leader(context); + } + return behavior; + } + + private void trimPersistentData(long sequenceNumber) { + // Trim snapshots + // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied + // For now guessing that it is ANDed. + deleteSnapshots(new SnapshotSelectionCriteria( + sequenceNumber - 100000, 43200000)); + + // Trim journal + deleteMessages(sequenceNumber); + } + + + private class ReplicatedLogImpl implements ReplicatedLog { + private final List journal; + private final Object snapshot; + private long snapshotIndex = -1; + private long snapshotTerm = -1; + + public ReplicatedLogImpl(Snapshot snapshot) { + this.snapshot = snapshot.getState(); + this.snapshotIndex = snapshot.getLastAppliedIndex(); + this.snapshotTerm = snapshot.getLastAppliedTerm(); + + this.journal = new ArrayList<>(snapshot.getUnAppliedEntries()); + } + + public ReplicatedLogImpl() { + this.snapshot = null; + this.journal = new ArrayList<>(); + } + + @Override public ReplicatedLogEntry get(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return null; + } + + return journal.get(adjustedIndex); + } + + @Override public ReplicatedLogEntry last() { + if (journal.size() == 0) { + return null; + } + return get(journal.size() - 1); + } + + @Override public long lastIndex() { + if (journal.size() == 0) { + return -1; + } + + return last().getIndex(); + } + + @Override public long lastTerm() { + if (journal.size() == 0) { + return -1; + } + + return last().getTerm(); + } + + + @Override public void removeFrom(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return; + } + + journal.subList(adjustedIndex , journal.size()).clear(); + } + + + @Override public void removeFromAndPersist(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return; + } + + // FIXME: Maybe this should be done after the command is saved + journal.subList(adjustedIndex , journal.size()).clear(); + + persist(new DeleteEntries(adjustedIndex), new Procedure(){ + + @Override public void apply(DeleteEntries param) + throws Exception { + //FIXME : Doing nothing for now + } + }); + + + } + + @Override public void append( + final ReplicatedLogEntry replicatedLogEntry) { + journal.add(replicatedLogEntry); + } + + @Override public List getFrom(long index) { + int adjustedIndex = adjustedIndex(index); + + List entries = new ArrayList<>(100); + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return entries; + } + + + for (int i = adjustedIndex; + i < journal.size(); i++) { + entries.add(journal.get(i)); + } + return entries; + } + + @Override public void appendAndPersist( + final ReplicatedLogEntry replicatedLogEntry) { + appendAndPersist(null, null, replicatedLogEntry); + } + + public void appendAndPersist(final ActorRef clientActor, + final String identifier, + final ReplicatedLogEntry replicatedLogEntry) { + context.getLogger().debug( + "Append log entry and persist " + replicatedLogEntry); + // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs + journal.add(replicatedLogEntry); + + // When persisting events with persist it is guaranteed that the + // persistent actor will not receive further commands between the + // persist call and the execution(s) of the associated event + // handler. This also holds for multiple persist calls in context + // of a single command. + persist(replicatedLogEntry, + new Procedure() { + public void apply(ReplicatedLogEntry evt) throws Exception { + // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. + if (size() > 100000) { + ReplicatedLogEntry lastAppliedEntry = + get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } + + saveSnapshot(Snapshot.create(createSnapshot(), + getFrom(context.getLastApplied() + 1), + lastIndex(), lastTerm(), lastAppliedIndex, + lastAppliedTerm)); + } + // Send message for replication + if (clientActor != null) { + currentBehavior.handleMessage(getSelf(), + new Replicate(clientActor, identifier, + replicatedLogEntry) + ); + } + } + } + ); + } + + @Override public long size() { + return journal.size() + snapshotIndex + 1; + } + + @Override public boolean isPresent(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return false; + } + return true; + } + + @Override public boolean isInSnapshot(long index) { + return index <= snapshotIndex; + } + + @Override public Object getSnapshot() { + return snapshot; + } + + @Override public long getSnapshotIndex() { + return snapshotIndex; + } + + @Override public long getSnapshotTerm() { + return snapshotTerm; + } + + private int adjustedIndex(long index) { + if(snapshotIndex < 0){ + return (int) index; + } + return (int) (index - snapshotIndex); + } + } + + + private static class ReplicatedLogImplEntry implements ReplicatedLogEntry, + Serializable { + + private final long index; + private final long term; + private final Object payload; + + public ReplicatedLogImplEntry(long index, long term, Object payload) { + + this.index = index; + this.term = term; + this.payload = payload; + } + + @Override public Object getData() { + return payload; + } + + @Override public long getTerm() { + return term; + } + + @Override public long getIndex() { + return index; + } + + @Override public String toString() { + return "Entry{" + + "index=" + index + + ", term=" + term + + '}'; + } + } + + private static class DeleteEntries implements Serializable { + private final int fromIndex; + + + public DeleteEntries(int fromIndex) { + this.fromIndex = fromIndex; + } + + public int getFromIndex() { + return fromIndex; + } + } + + + private static class Snapshot implements Serializable { + private final Object state; + private final List unAppliedEntries; + private final long lastIndex; + private final long lastTerm; + private final long lastAppliedIndex; + private final long lastAppliedTerm; + + private Snapshot(Object state, + List unAppliedEntries, long lastIndex, + long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { + this.state = state; + this.unAppliedEntries = unAppliedEntries; + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + + public static Snapshot create(Object state, + List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + return new Snapshot(state, entries, lastIndex, lastTerm, + lastAppliedIndex, lastAppliedTerm); + } + + public Object getState() { + return state; + } + + public List getUnAppliedEntries() { + return unAppliedEntries; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + } + + private class ElectionTermImpl implements ElectionTerm { + /** + * Identifier of the actor whose election term information this is + */ + private long currentTerm = 0; + private String votedFor = null; + + public long getCurrentTerm() { + return currentTerm; + } + + public String getVotedFor() { + return votedFor; + } + + @Override public void update(long currentTerm, String votedFor) { + LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + + this.currentTerm = currentTerm; + this.votedFor = votedFor; + } + + @Override + public void updateAndPersist(long currentTerm, String votedFor){ + update(currentTerm, votedFor); + // FIXME : Maybe first persist then update the state + persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ + + @Override public void apply(UpdateElectionTerm param) + throws Exception { + + } + }); + } + } + + private static class UpdateElectionTerm implements Serializable { + private final long currentTerm; + private final String votedFor; + + public UpdateElectionTerm(long currentTerm, String votedFor) { + this.currentTerm = currentTerm; + this.votedFor = votedFor; + } + + public long getCurrentTerm() { + return currentTerm; + } + + public String getVotedFor() { + return votedFor; + } + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java new file mode 100644 index 0000000000..7150ec0e6e --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.event.LoggingAdapter; + +import java.util.Map; + +/** + * The RaftActorContext contains that portion of the RaftActors state that + * needs to be shared with it's behaviors. A RaftActorContext should NEVER be + * used in any actor context outside the RaftActor that constructed it. + */ +public interface RaftActorContext { + /** + * Create a new local actor + * @param props + * @return + */ + ActorRef actorOf(Props props); + + /** + * Create a actor selection + * @param path + * @return + */ + ActorSelection actorSelection(String path); + + /** + * Get the identifier for the RaftActor. This identifier represents the + * name of the actor whose common state is being shared. For example the + * id could be 'inventory' + * @return the identifier + */ + String getId(); + + /** + * A reference to the RaftActor itself. This could be used to send messages + * to the RaftActor + * @return + */ + ActorRef getActor(); + + /** + * Get the ElectionTerm information + * @return + */ + ElectionTerm getTermInformation(); + + /** + * index of highest log entry known to be + * committed (initialized to 0, increases + * monotonically) + * @return + */ + long getCommitIndex(); + + + /** + * + */ + void setCommitIndex(long commitIndex); + + /** + * index of highest log entry applied to state + * machine (initialized to 0, increases + * monotonically) + * @return + */ + long getLastApplied(); + + + /** + * + */ + void setLastApplied(long lastApplied); + + /** + * @return A representation of the log + */ + ReplicatedLog getReplicatedLog(); + + /** + * @return The ActorSystem associated with this context + */ + ActorSystem getActorSystem(); + + /** + * + * @return + */ + LoggingAdapter getLogger(); + + /** + * Get a mapping of peer id's their addresses + * @return + */ + Map getPeerAddresses(); + + /** + * + * @param peerId + * @return + */ + String getPeerAddress(String peerId); + + /** + * Add to actor peers + * @param name + * @param address + */ + void addToPeers(String name, String address); + + /** + * + * @param name + */ + public void removePeer(String name); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java new file mode 100644 index 0000000000..a0f13280c2 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActorContext; +import akka.event.LoggingAdapter; + +import java.util.Map; + +public class RaftActorContextImpl implements RaftActorContext{ + + private final ActorRef actor; + + private final UntypedActorContext context; + + private final String id; + + private final ElectionTerm termInformation; + + private long commitIndex; + + private long lastApplied; + + private final ReplicatedLog replicatedLog; + + private final Map peerAddresses; + + private final LoggingAdapter LOG; + + + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, + String id, + ElectionTerm termInformation, long commitIndex, + long lastApplied, ReplicatedLog replicatedLog, Map peerAddresses, LoggingAdapter logger) { + this.actor = actor; + this.context = context; + this.id = id; + this.termInformation = termInformation; + this.commitIndex = commitIndex; + this.lastApplied = lastApplied; + this.replicatedLog = replicatedLog; + this.peerAddresses = peerAddresses; + this.LOG = logger; + } + + public ActorRef actorOf(Props props){ + return context.actorOf(props); + } + + public ActorSelection actorSelection(String path){ + return context.actorSelection(path); + } + + public String getId() { + return id; + } + + public ActorRef getActor() { + return actor; + } + + public ElectionTerm getTermInformation() { + return termInformation; + } + + public long getCommitIndex() { + return commitIndex; + } + + @Override public void setCommitIndex(long commitIndex) { + this.commitIndex = commitIndex; + } + + public long getLastApplied() { + return lastApplied; + } + + @Override public void setLastApplied(long lastApplied) { + this.lastApplied = lastApplied; + } + + @Override public ReplicatedLog getReplicatedLog() { + return replicatedLog; + } + + @Override public ActorSystem getActorSystem() { + return context.system(); + } + + @Override public LoggingAdapter getLogger() { + return this.LOG; + } + + @Override public Map getPeerAddresses() { + return peerAddresses; + } + + @Override public String getPeerAddress(String peerId) { + return peerAddresses.get(peerId); + } + + @Override public void addToPeers(String name, String address) { + LOG.debug("Kamal--> addToPeer for:"+name); + peerAddresses.put(name, address); + } + + @Override public void removePeer(String name) { + LOG.debug("Kamal--> removePeer for:"+name); + peerAddresses.remove(name); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java new file mode 100644 index 0000000000..65114eb659 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java @@ -0,0 +1,7 @@ +package org.opendaylight.controller.cluster.raft; + +public enum RaftState { + Candidate, + Follower, + Leader +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java new file mode 100644 index 0000000000..b7c8955aad --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import java.util.List; + +/** + * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor + */ +public interface ReplicatedLog { + /** + * Get a replicated log entry at the specified index + * + * @param index the index of the log entry + * @return the ReplicatedLogEntry at index. null if index less than 0 or + * greater than the size of the in-memory journal. + */ + ReplicatedLogEntry get(long index); + + + /** + * Get the last replicated log entry + * + * @return + */ + ReplicatedLogEntry last(); + + /** + * + * @return + */ + long lastIndex(); + + /** + * + * @return + */ + long lastTerm(); + + /** + * To be called when we need to remove entries from the in-memory log. + * This method will remove all entries >= index. This method should be used + * during recovery to appropriately trim the log based on persisted + * information + * + * @param index the index of the log entry + */ + void removeFrom(long index); + + + /** + * To be called when we need to remove entries from the in-memory log and we + * need that information persisted to disk. This method will remove all + * entries >= index. + *

+ * The persisted information would then be used during recovery to properly + * reconstruct the state of the in-memory replicated log + * + * @param index the index of the log entry + */ + void removeFromAndPersist(long index); + + /** + * Append an entry to the log + * @param replicatedLogEntry + */ + void append(ReplicatedLogEntry replicatedLogEntry); + + /** + * + * @param replicatedLogEntry + */ + void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry); + + /** + * + * @param index the index of the log entry + */ + List getFrom(long index); + + + /** + * + * @return + */ + long size(); + + /** + * Checks if the entry at the specified index is present or not + * + * @param index the index of the log entry + * @return true if the entry is present in the in-memory journal + */ + boolean isPresent(long index); + + /** + * Checks if the entry is present in a snapshot + * + * @param index the index of the log entry + * @return true if the entry is in the snapshot. false if the entry is not + * in the snapshot even if the entry may be present in the replicated log + */ + boolean isInSnapshot(long index); + + /** + * Get the snapshot + * + * @return an object representing the snapshot if it exists. null otherwise + */ + Object getSnapshot(); + + /** + * Get the index of the snapshot + * + * @return the index from which the snapshot was created. -1 otherwise. + */ + long getSnapshotIndex(); + + /** + * Get the term of the snapshot + * + * @return the term of the index from which the snapshot was created. -1 + * otherwise + */ + long getSnapshotTerm(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java new file mode 100644 index 0000000000..3bbaa22838 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +/** + * Represents one entry in the replicated log + */ +public interface ReplicatedLogEntry { + /** + * The data stored in that entry + * + * @return + */ + Object getData(); + + /** + * The term stored in that entry + * + * @return + */ + long getTerm(); + + /** + * The index of the entry + * + * @return + */ + long getIndex(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java new file mode 100644 index 0000000000..1d78bb0227 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * Abstract class that represents the behavior of a RaftActor + *

+ * All Servers: + *

    + *
  • If commitIndex > lastApplied: increment lastApplied, apply + * log[lastApplied] to state machine (§5.3) + *
  • If RPC request or response contains term T > currentTerm: + * set currentTerm = T, convert to follower (§5.1) + */ +public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { + + /** + * Information about the RaftActor whose behavior this class represents + */ + protected final RaftActorContext context; + + /** + * The maximum election time variance + */ + private static final int ELECTION_TIME_MAX_VARIANCE = 100; + + /** + * The interval at which a heart beat message will be sent to the remote + * RaftActor + *

    + * Since this is set to 100 milliseconds the Election timeout should be + * at least 200 milliseconds + */ + protected static final FiniteDuration HEART_BEAT_INTERVAL = + new FiniteDuration(100, TimeUnit.MILLISECONDS); + + /** + * The interval in which a new election would get triggered if no leader is found + */ + private static final long ELECTION_TIME_INTERVAL = + HEART_BEAT_INTERVAL.toMillis() * 2; + + /** + * + */ + private Cancellable electionCancel = null; + + /** + * + */ + protected String leaderId = null; + + + protected AbstractRaftActorBehavior(RaftActorContext context) { + this.context = context; + } + + /** + * Derived classes should not directly handle AppendEntries messages it + * should let the base class handle it first. Once the base class handles + * the AppendEntries message and does the common actions that are applicable + * in all RaftState's it will delegate the handling of the AppendEntries + * message to the derived class to do more state specific handling by calling + * this method + * + * @param sender The actor that sent this message + * @param appendEntries The AppendEntries message + * @return + */ + protected abstract RaftState handleAppendEntries(ActorRef sender, + AppendEntries appendEntries); + + + /** + * appendEntries first processes the AppendEntries message and then + * delegates handling to a specific behavior + * + * @param sender + * @param appendEntries + * @return + */ + protected RaftState appendEntries(ActorRef sender, + AppendEntries appendEntries) { + + // 1. Reply false if term < currentTerm (§5.1) + if (appendEntries.getTerm() < currentTerm()) { + context.getLogger().debug( + "Cannot append entries because sender term " + appendEntries + .getTerm() + " is less than " + currentTerm()); + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm()), actor() + ); + return state(); + } + + + return handleAppendEntries(sender, appendEntries); + } + + /** + * Derived classes should not directly handle AppendEntriesReply messages it + * should let the base class handle it first. Once the base class handles + * the AppendEntriesReply message and does the common actions that are + * applicable in all RaftState's it will delegate the handling of the + * AppendEntriesReply message to the derived class to do more state specific + * handling by calling this method + * + * @param sender The actor that sent this message + * @param appendEntriesReply The AppendEntriesReply message + * @return + */ + protected abstract RaftState handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply); + + /** + * requestVote handles the RequestVote message. This logic is common + * for all behaviors + * + * @param sender + * @param requestVote + * @return + */ + protected RaftState requestVote(ActorRef sender, + RequestVote requestVote) { + + boolean grantVote = false; + + // Reply false if term < currentTerm (§5.1) + if (requestVote.getTerm() < currentTerm()) { + grantVote = false; + + // If votedFor is null or candidateId, and candidate’s log is at + // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) + } else if (votedFor() == null || votedFor() + .equals(requestVote.getCandidateId())) { + + boolean candidateLatest = false; + + // From §5.4.1 + // Raft determines which of two logs is more up-to-date + // by comparing the index and term of the last entries in the + // logs. If the logs have last entries with different terms, then + // the log with the later term is more up-to-date. If the logs + // end with the same term, then whichever log is longer is + // more up-to-date. + if (requestVote.getLastLogTerm() > lastTerm()) { + candidateLatest = true; + } else if ((requestVote.getLastLogTerm() == lastTerm()) + && requestVote.getLastLogIndex() >= lastIndex()) { + candidateLatest = true; + } + + if (candidateLatest) { + grantVote = true; + context.getTermInformation().updateAndPersist(requestVote.getTerm(), + requestVote.getCandidateId()); + } + } + + sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); + + return state(); + } + + /** + * Derived classes should not directly handle RequestVoteReply messages it + * should let the base class handle it first. Once the base class handles + * the RequestVoteReply message and does the common actions that are + * applicable in all RaftState's it will delegate the handling of the + * RequestVoteReply message to the derived class to do more state specific + * handling by calling this method + * + * @param sender The actor that sent this message + * @param requestVoteReply The RequestVoteReply message + * @return + */ + protected abstract RaftState handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply); + + /** + * Creates a random election duration + * + * @return + */ + protected FiniteDuration electionDuration() { + long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); + return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, + TimeUnit.MILLISECONDS); + } + + /** + * stop the scheduled election + */ + protected void stopElection() { + if (electionCancel != null && !electionCancel.isCancelled()) { + electionCancel.cancel(); + } + } + + /** + * schedule a new election + * + * @param interval + */ + protected void scheduleElection(FiniteDuration interval) { + stopElection(); + + // Schedule an election. When the scheduler triggers an ElectionTimeout + // message is sent to itself + electionCancel = + context.getActorSystem().scheduler().scheduleOnce(interval, + context.getActor(), new ElectionTimeout(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + /** + * Get the current term + * @return + */ + protected long currentTerm() { + return context.getTermInformation().getCurrentTerm(); + } + + /** + * Get the candidate for whom we voted in the current term + * @return + */ + protected String votedFor() { + return context.getTermInformation().getVotedFor(); + } + + /** + * Get the actor associated with this behavior + * @return + */ + protected ActorRef actor() { + return context.getActor(); + } + + /** + * Get the term from the last entry in the log + * + * @return + */ + protected long lastTerm() { + return context.getReplicatedLog().lastTerm(); + } + + /** + * Get the index from the last entry in the log + * + * @return + */ + protected long lastIndex() { + return context.getReplicatedLog().lastIndex(); + } + + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + return null; + } + + /** + * Find the log index from the previous to last entry in the log + * + * @return + */ + protected long prevLogIndex(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getIndex(); + } + return -1; + } + + /** + * Find the log term from the previous to last entry in the log + * @return + */ + protected long prevLogTerm(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getTerm(); + } + return -1; + } + + /** + * Apply the provided index to the state machine + * + * @param index a log index that is known to be committed + */ + protected void applyLogToStateMachine(long index) { + // Now maybe we apply to the state machine + for (long i = context.getLastApplied() + 1; + i < index + 1; i++) { + ActorRef clientActor = null; + String identifier = null; + ClientRequestTracker tracker = findClientRequestTracker(i); + + if (tracker != null) { + clientActor = tracker.getClientActor(); + identifier = tracker.getIdentifier(); + } + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(i); + + if (replicatedLogEntry != null) { + actor().tell(new ApplyState(clientActor, identifier, + replicatedLogEntry), actor()); + } else { + context.getLogger().error( + "Missing index " + i + " from log. Cannot apply state."); + } + } + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) + context.setLastApplied(index); + } + + @Override + public RaftState handleMessage(ActorRef sender, Object message) { + if (message instanceof AppendEntries) { + return appendEntries(sender, (AppendEntries) message); + } else if (message instanceof AppendEntriesReply) { + return handleAppendEntriesReply(sender, (AppendEntriesReply) message); + } else if (message instanceof RequestVote) { + return requestVote(sender, (RequestVote) message); + } else if (message instanceof RequestVoteReply) { + return handleRequestVoteReply(sender, (RequestVoteReply) message); + } + return state(); + } + + @Override public String getLeaderId() { + return leaderId; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java new file mode 100644 index 0000000000..ecd4901246 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * The behavior of a RaftActor when it is in the CandidateState + *

    + * Candidates (§5.2): + *

      + *
    • On conversion to candidate, start election: + *
        + *
      • Increment currentTerm + *
      • Vote for self + *
      • Reset election timer + *
      • Send RequestVote RPCs to all other servers + *
      + *
    • If votes received from majority of servers: become leader + *
    • If AppendEntries RPC received from new leader: convert to + * follower + *
    • If election timeout elapses: start new election + *
    + */ +public class Candidate extends AbstractRaftActorBehavior { + + private final Map peerToActor = new HashMap<>(); + + private int voteCount; + + private final int votesRequired; + + public Candidate(RaftActorContext context) { + super(context); + + Collection peerPaths = context.getPeerAddresses().values(); + + for (String peerPath : peerPaths) { + peerToActor.put(peerPath, + context.actorSelection(peerPath)); + } + + context.getLogger().debug("Election:Candidate has following peers:"+peerToActor.keySet()); + if(peerPaths.size() > 0) { + // Votes are required from a majority of the peers including self. + // The votesRequired field therefore stores a calculated value + // of the number of votes required for this candidate to win an + // election based on it's known peers. + // If a peer was added during normal operation and raft replicas + // came to know about them then the new peer would also need to be + // taken into consideration when calculating this value. + // Here are some examples for what the votesRequired would be for n + // peers + // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1 + // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2 + // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3 + int noOfPeers = peerPaths.size(); + int self = 1; + votesRequired = (noOfPeers + self) / 2 + 1; + } else { + votesRequired = 0; + } + + startNewTerm(); + scheduleElection(electionDuration()); + } + + @Override protected RaftState handleAppendEntries(ActorRef sender, + AppendEntries appendEntries) { + + return state(); + } + + @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply) { + + return state(); + } + + @Override protected RaftState handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply) { + + if (requestVoteReply.isVoteGranted()) { + voteCount++; + } + + if (voteCount >= votesRequired) { + return RaftState.Leader; + } + + return state(); + } + + @Override public RaftState state() { + return RaftState.Candidate; + } + + @Override + public RaftState handleMessage(ActorRef sender, Object message) { + + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + return RaftState.Follower; + } + } + + if (message instanceof ElectionTimeout) { + if (votesRequired == 0) { + // If there are no peers then we should be a Leader + // We wait for the election timeout to occur before declare + // ourselves the leader. This gives enough time for a leader + // who we do not know about (as a peer) + // to send a message to the candidate + return RaftState.Leader; + } + startNewTerm(); + scheduleElection(electionDuration()); + return state(); + } + return super.handleMessage(sender, message); + } + + + private void startNewTerm() { + + + // set voteCount back to 1 (that is voting for self) + voteCount = 1; + + // Increment the election term and vote for self + long currentTerm = context.getTermInformation().getCurrentTerm(); + context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId()); + + context.getLogger().debug("Starting new term " + (currentTerm+1)); + + // Request for a vote + // TODO: Retry request for vote if replies do not arrive in a reasonable + // amount of time TBD + for (ActorSelection peerActor : peerToActor.values()) { + peerActor.tell(new RequestVote( + context.getTermInformation().getCurrentTerm(), + context.getId(), + context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().lastTerm()), + context.getActor() + ); + } + + + } + + @Override public void close() throws Exception { + stopElection(); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java new file mode 100644 index 0000000000..532201b26e --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; + +/** + * The behavior of a RaftActor in the Follower state + *

    + *

      + *
    • Respond to RPCs from candidates and leaders + *
    • If election timeout elapses without receiving AppendEntries + * RPC from current leader or granting vote to candidate: + * convert to candidate + *
    + */ +public class Follower extends AbstractRaftActorBehavior { + public Follower(RaftActorContext context) { + super(context); + + scheduleElection(electionDuration()); + } + + @Override protected RaftState handleAppendEntries(ActorRef sender, + AppendEntries appendEntries) { + + // TODO : Refactor this method into a bunch of smaller methods + // to make it easier to read. Before refactoring ensure tests + // cover the code properly + + // 1. Reply false if term < currentTerm (§5.1) + // This is handled in the appendEntries method of the base class + + // If we got here then we do appear to be talking to the leader + leaderId = appendEntries.getLeaderId(); + + // 2. Reply false if log doesn’t contain an entry at prevLogIndex + // whose term matches prevLogTerm (§5.3) + + ReplicatedLogEntry previousEntry = context.getReplicatedLog() + .get(appendEntries.getPrevLogIndex()); + + + boolean outOfSync = true; + + // First check if the logs are in sync or not + if (lastIndex() == -1 + && appendEntries.getPrevLogIndex() != -1) { + + // The follower's log is out of sync because the leader does have + // an entry at prevLogIndex and this follower has no entries in + // it's log. + + context.getLogger().debug( + "The followers log is empty and the senders prevLogIndex is {}", + appendEntries.getPrevLogIndex()); + + } else if (lastIndex() > -1 + && appendEntries.getPrevLogIndex() != -1 + && previousEntry == null) { + + // The follower's log is out of sync because the Leader's + // prevLogIndex entry was not found in it's log + + context.getLogger().debug( + "The log is not empty but the prevLogIndex {} was not found in it", + appendEntries.getPrevLogIndex()); + + } else if (lastIndex() > -1 + && previousEntry != null + && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) { + + // The follower's log is out of sync because the Leader's + // prevLogIndex entry does exist in the follower's log but it has + // a different term in it + + context.getLogger().debug( + "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" + , previousEntry.getTerm() + , appendEntries.getPrevLogTerm()); + } else { + outOfSync = false; + } + + if (outOfSync) { + // We found that the log was out of sync so just send a negative + // reply and return + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm()), actor() + ); + return state(); + } + + if (appendEntries.getEntries() != null + && appendEntries.getEntries().size() > 0) { + context.getLogger().debug( + "Number of entries to be appended = " + appendEntries + .getEntries().size() + ); + + // 3. If an existing entry conflicts with a new one (same index + // but different terms), delete the existing entry and all that + // follow it (§5.3) + int addEntriesFrom = 0; + if (context.getReplicatedLog().size() > 0) { + + // Find the entry up until which the one that is not in the + // follower's log + for (int i = 0; + i < appendEntries.getEntries() + .size(); i++, addEntriesFrom++) { + ReplicatedLogEntry matchEntry = + appendEntries.getEntries().get(i); + ReplicatedLogEntry newEntry = context.getReplicatedLog() + .get(matchEntry.getIndex()); + + if (newEntry == null) { + //newEntry not found in the log + break; + } + + if (newEntry.getTerm() == matchEntry + .getTerm()) { + continue; + } + + context.getLogger().debug( + "Removing entries from log starting at " + + matchEntry.getIndex() + ); + + // Entries do not match so remove all subsequent entries + context.getReplicatedLog() + .removeFromAndPersist(matchEntry.getIndex()); + break; + } + } + + context.getLogger().debug( + "After cleanup entries to be added from = " + (addEntriesFrom + + lastIndex()) + ); + + // 4. Append any new entries not already in the log + for (int i = addEntriesFrom; + i < appendEntries.getEntries().size(); i++) { + + context.getLogger().debug( + "Append entry to log " + appendEntries.getEntries().get(i).getData() + .toString() + ); + context.getReplicatedLog() + .appendAndPersist(appendEntries.getEntries().get(i)); + } + + context.getLogger().debug( + "Log size is now " + context.getReplicatedLog().size()); + } + + + // 5. If leaderCommit > commitIndex, set commitIndex = + // min(leaderCommit, index of last new entry) + + long prevCommitIndex = context.getCommitIndex(); + + context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), + context.getReplicatedLog().lastIndex())); + + if (prevCommitIndex != context.getCommitIndex()) { + context.getLogger() + .debug("Commit index set to " + context.getCommitIndex()); + } + + // If commitIndex > lastApplied: increment lastApplied, apply + // log[lastApplied] to state machine (§5.3) + if (appendEntries.getLeaderCommit() > context.getLastApplied()) { + applyLogToStateMachine(appendEntries.getLeaderCommit()); + } + + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, + lastIndex(), lastTerm()), actor()); + + return state(); + } + + @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply) { + return state(); + } + + @Override protected RaftState handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply) { + return state(); + } + + @Override public RaftState state() { + return RaftState.Follower; + } + + @Override public RaftState handleMessage(ActorRef sender, Object message) { + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + } + } + + if (message instanceof ElectionTimeout) { + return RaftState.Candidate; + } else if (message instanceof InstallSnapshot) { + InstallSnapshot snapshot = (InstallSnapshot) message; + actor().tell(new ApplySnapshot(snapshot), actor()); + } + + scheduleElection(electionDuration()); + + return super.handleMessage(sender, message); + } + + @Override public void close() throws Exception { + stopElection(); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java new file mode 100644 index 0000000000..fb8be8b891 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; +import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation; +import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; +import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * The behavior of a RaftActor when it is in the Leader state + *

    + * Leaders: + *

      + *
    • Upon election: send initial empty AppendEntries RPCs + * (heartbeat) to each server; repeat during idle periods to + * prevent election timeouts (§5.2) + *
    • If command received from client: append entry to local log, + * respond after entry applied to state machine (§5.3) + *
    • If last log index ≥ nextIndex for a follower: send + * AppendEntries RPC with log entries starting at nextIndex + *
        + *
      • If successful: update nextIndex and matchIndex for + * follower (§5.3) + *
      • If AppendEntries fails because of log inconsistency: + * decrement nextIndex and retry (§5.3) + *
      + *
    • If there exists an N such that N > commitIndex, a majority + * of matchIndex[i] ≥ N, and log[N].term == currentTerm: + * set commitIndex = N (§5.3, §5.4). + */ +public class Leader extends AbstractRaftActorBehavior { + + + private final Map followerToLog = + new HashMap(); + + private final Map followerToActor = new HashMap<>(); + + private Cancellable heartbeatSchedule = null; + private Cancellable appendEntriesSchedule = null; + private Cancellable installSnapshotSchedule = null; + + private List trackerList = new ArrayList<>(); + + private final int minReplicationCount; + + public Leader(RaftActorContext context) { + super(context); + + if (lastIndex() >= 0) { + context.setCommitIndex(lastIndex()); + } + + for (String followerId : context.getPeerAddresses().keySet()) { + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl(followerId, + new AtomicLong(lastIndex()), + new AtomicLong(-1)); + + followerToActor.put(followerId, + context.actorSelection(context.getPeerAddress(followerId))); + + followerToLog.put(followerId, followerLogInformation); + } + + context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet()); + + if (followerToActor.size() > 0) { + minReplicationCount = (followerToActor.size() + 1) / 2 + 1; + } else { + minReplicationCount = 0; + } + + + // Immediately schedule a heartbeat + // Upon election: send initial empty AppendEntries RPCs + // (heartbeat) to each server; repeat during idle periods to + // prevent election timeouts (§5.2) + scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + + scheduleInstallSnapshotCheck( + new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, + HEART_BEAT_INTERVAL.unit()) + ); + + } + + @Override protected RaftState handleAppendEntries(ActorRef sender, + AppendEntries appendEntries) { + + return state(); + } + + @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply) { + + // Update the FollowerLogInformation + String followerId = appendEntriesReply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + if (appendEntriesReply.isSuccess()) { + followerLogInformation + .setMatchIndex(appendEntriesReply.getLogLastIndex()); + followerLogInformation + .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + } else { + + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about + + followerLogInformation.decrNextIndex(); + } + + // Now figure out if this reply warrants a change in the commitIndex + // If there exists an N such that N > commitIndex, a majority + // of matchIndex[i] ≥ N, and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). + for (long N = context.getCommitIndex() + 1; ; N++) { + int replicatedCount = 1; + + for (FollowerLogInformation info : followerToLog.values()) { + if (info.getMatchIndex().get() >= N) { + replicatedCount++; + } + } + + if (replicatedCount >= minReplicationCount) { + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(N); + if (replicatedLogEntry != null + && replicatedLogEntry.getTerm() + == currentTerm()) { + context.setCommitIndex(N); + } + } else { + break; + } + } + + // Apply the change to the state machine + if (context.getCommitIndex() > context.getLastApplied()) { + applyLogToStateMachine(context.getCommitIndex()); + } + + return state(); + } + + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + for (ClientRequestTracker tracker : trackerList) { + if (tracker.getIndex() == logIndex) { + return tracker; + } + } + + return null; + } + + @Override protected RaftState handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply) { + return state(); + } + + @Override public RaftState state() { + return RaftState.Leader; + } + + @Override public RaftState handleMessage(ActorRef sender, Object message) { + Preconditions.checkNotNull(sender, "sender should not be null"); + + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + return RaftState.Follower; + } + } + + try { + if (message instanceof SendHeartBeat) { + return sendHeartBeat(); + } else if(message instanceof SendInstallSnapshot) { + installSnapshotIfNeeded(); + } else if (message instanceof Replicate) { + replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply( + (InstallSnapshotReply) message); + } + } finally { + scheduleHeartBeat(HEART_BEAT_INTERVAL); + } + + return super.handleMessage(sender, message); + } + + private void handleInstallSnapshotReply(InstallSnapshotReply message) { + InstallSnapshotReply reply = message; + String followerId = reply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + followerLogInformation + .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation + .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } + + private void replicate(Replicate replicate) { + long logIndex = replicate.getReplicatedLogEntry().getIndex(); + + context.getLogger().debug("Replicate message " + logIndex); + + if (followerToActor.size() == 0) { + context.setCommitIndex( + replicate.getReplicatedLogEntry().getIndex()); + + context.getActor() + .tell(new ApplyState(replicate.getClientActor(), + replicate.getIdentifier(), + replicate.getReplicatedLogEntry()), + context.getActor() + ); + } else { + + // Create a tracker entry we will use this later to notify the + // client actor + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + + sendAppendEntries(); + } + } + + private void sendAppendEntries() { + // Send an AppendEntries to all followers + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + List entries = Collections.emptyList(); + + if(context.getReplicatedLog().isPresent(nextIndex)){ + // TODO: Instead of sending all entries from nextIndex + // only send a fixed number of entries to each follower + // This is to avoid the situation where there are a lot of + // entries to install for a fresh follower or to a follower + // that has fallen too far behind with the log but yet is not + // eligible to receive a snapshot + entries = + context.getReplicatedLog().getFrom(nextIndex); + } + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(nextIndex), prevLogTerm(nextIndex), + entries, context.getCommitIndex() + ), + actor() + ); + } + } + + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ + private void installSnapshotIfNeeded(){ + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){ + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().getSnapshot() + ), + actor() + ); + } + } + } + + private RaftState sendHeartBeat() { + if (followerToActor.size() > 0) { + sendAppendEntries(); + } + return state(); + } + + private void stopHeartBeat() { + if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { + heartbeatSchedule.cancel(); + } + } + + private void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); + } + } + + private void scheduleHeartBeat(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + + stopHeartBeat(); + + // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat + // message is sent to itself. + // Scheduling the heartbeat only once here because heartbeats do not + // need to be sent if there are other messages being sent to the remote + // actor. + heartbeatSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new SendHeartBeat(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + + private void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + + stopInstallSnapshotSchedule(); + + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new SendInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + + + @Override public void close() throws Exception { + stopHeartBeat(); + } + + @Override public String getLeaderId() { + return context.getId(); + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java new file mode 100644 index 0000000000..ca2d916ecf --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.RaftState; + +/** + * A RaftActorBehavior represents the specific behavior of a RaftActor + *

      + * A RaftActor can behave as one of the following, + *

        + *
      • Follower
      • + *
      • Candidate
      • + *
      • Leader
      • + *
      + *

      + * In each of these behaviors the Raft Actor handles the same Raft messages + * differently. + */ +public interface RaftActorBehavior extends AutoCloseable{ + /** + * Handle a message. If the processing of the message warrants a state + * change then a new state should be returned otherwise this method should + * return the state for the current behavior. + * + * @param sender The sender of the message + * @param message A message that needs to be processed + * + * @return The new state or self (this) + */ + RaftState handleMessage(ActorRef sender, Object message); + + /** + * The state associated with a given behavior + * + * @return + */ + RaftState state(); + + /** + * + * @return + */ + String getLeaderId(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java new file mode 100644 index 0000000000..d1f4c43c86 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/AddRaftPeer.java @@ -0,0 +1,23 @@ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class AddRaftPeer { + + private String name; + private String address; + + public AddRaftPeer(String name, String address) { + this.name = name; + this.address = address; + } + + public String getName() { + return name; + } + + public String getAddress() { + return address; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java new file mode 100644 index 0000000000..a60aea46e8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.client.messages; + +public class FindLeader { + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java new file mode 100644 index 0000000000..b36ef112b3 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.client.messages; + +public class FindLeaderReply { + private final String leaderActor; + + public FindLeaderReply(String leaderActor) { + this.leaderActor = leaderActor; + } + + public String getLeaderActor() { + return leaderActor; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java new file mode 100644 index 0000000000..4b766e04b7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/RemoveRaftPeer.java @@ -0,0 +1,16 @@ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Created by kramesha on 7/17/14. + */ +public class RemoveRaftPeer { + private String name; + + public RemoveRaftPeer(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java new file mode 100644 index 0000000000..a7172e22c8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class ApplySnapshot { + private final Object snapshot; + + public ApplySnapshot(Object snapshot) { + this.snapshot = snapshot; + } + + public Object getSnapshot() { + return snapshot; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java new file mode 100644 index 0000000000..c9ba26eaad --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + +public class ApplyState { + private final ActorRef clientActor; + private final String identifier; + private final ReplicatedLogEntry replicatedLogEntry; + + public ApplyState(ActorRef clientActor, String identifier, + ReplicatedLogEntry replicatedLogEntry) { + this.clientActor = clientActor; + this.identifier = identifier; + this.replicatedLogEntry = replicatedLogEntry; + } + + public ActorRef getClientActor() { + return clientActor; + } + + public String getIdentifier() { + return identifier; + } + + public ReplicatedLogEntry getReplicatedLogEntry() { + return replicatedLogEntry; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/CommitEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/CommitEntry.java new file mode 100644 index 0000000000..5afd4925af --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/CommitEntry.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +/** + * Message sent to commit an entry to the log + */ +public class CommitEntry { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java new file mode 100644 index 0000000000..0a4b8fa669 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class ElectionTimeout { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/PersistEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/PersistEntry.java new file mode 100644 index 0000000000..7afe0b579b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/PersistEntry.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +/** + * Message sent to Persist an entry into the transaction journal + */ +public class PersistEntry { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java new file mode 100644 index 0000000000..6ff7cfce5c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + +public class Replicate { + private final ActorRef clientActor; + private final String identifier; + private final ReplicatedLogEntry replicatedLogEntry; + + public Replicate(ActorRef clientActor, String identifier, + ReplicatedLogEntry replicatedLogEntry) { + + this.clientActor = clientActor; + this.identifier = identifier; + this.replicatedLogEntry = replicatedLogEntry; + } + + public ActorRef getClientActor() { + return clientActor; + } + + public String getIdentifier() { + return identifier; + } + + public ReplicatedLogEntry getReplicatedLogEntry() { + return replicatedLogEntry; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SaveSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SaveSnapshot.java new file mode 100644 index 0000000000..20e5927e6f --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SaveSnapshot.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +/** + * This message is sent by a RaftActor to itself so that a subclass can process + * it and use it to save it's state + */ +public class SaveSnapshot { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendHeartBeat.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendHeartBeat.java new file mode 100644 index 0000000000..5048cbb0b9 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendHeartBeat.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +/** + * This messages is sent to the Leader to prompt it to send a heartbeat + * to it's followers. + * + * Typically the Leader to itself on a schedule + */ +public class SendHeartBeat { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java new file mode 100644 index 0000000000..0c370aa981 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +public class SendInstallSnapshot { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java new file mode 100644 index 0000000000..3cafda9a15 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class AbstractRaftRPC implements RaftRPC { + // term + protected long term; + + protected AbstractRaftRPC(long term){ + this.term = term; + } + + public long getTerm() { + return term; + } + + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java new file mode 100644 index 0000000000..9bb5029548 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + +import java.util.List; + +/** + * Invoked by leader to replicate log entries (§5.3); also used as + * heartbeat (§5.2). + */ +public class AppendEntries extends AbstractRaftRPC { + // So that follower can redirect clients + private final String leaderId; + + // Index of log entry immediately preceding new ones + private final long prevLogIndex; + + // term of prevLogIndex entry + private final long prevLogTerm; + + // log entries to store (empty for heartbeat; + // may send more than one for efficiency) + private final List entries; + + // leader's commitIndex + private final long leaderCommit; + + public AppendEntries(long term, String leaderId, long prevLogIndex, + long prevLogTerm, List entries, long leaderCommit) { + super(term); + this.leaderId = leaderId; + this.prevLogIndex = prevLogIndex; + this.prevLogTerm = prevLogTerm; + this.entries = entries; + this.leaderCommit = leaderCommit; + } + + public String getLeaderId() { + return leaderId; + } + + public long getPrevLogIndex() { + return prevLogIndex; + } + + public long getPrevLogTerm() { + return prevLogTerm; + } + + public List getEntries() { + return entries; + } + + public long getLeaderCommit() { + return leaderCommit; + } + + @Override public String toString() { + return "AppendEntries{" + + "leaderId='" + leaderId + '\'' + + ", prevLogIndex=" + prevLogIndex + + ", prevLogTerm=" + prevLogTerm + + ", entries=" + entries + + ", leaderCommit=" + leaderCommit + + '}'; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java new file mode 100644 index 0000000000..7524d8f232 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +/** + * Reply for the AppendEntriesRpc message + */ +public class AppendEntriesReply extends AbstractRaftRPC{ + + // true if follower contained entry matching + // prevLogIndex and prevLogTerm + private final boolean success; + + // The index of the last entry in the followers log + // This will be used to set the matchIndex for the follower on the + // Leader + private final long logLastIndex; + + private final long logLastTerm; + + // The followerId - this will be used to figure out which follower is + // responding + private final String followerId; + + public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) { + super(term); + + this.followerId = followerId; + this.success = success; + this.logLastIndex = logLastIndex; + this.logLastTerm = logLastTerm; + } + + public long getTerm() { + return term; + } + + public boolean isSuccess() { + return success; + } + + public long getLogLastIndex() { + return logLastIndex; + } + + public long getLogLastTerm() { + return logLastTerm; + } + + public String getFollowerId() { + return followerId; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java new file mode 100644 index 0000000000..888854fa71 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class InstallSnapshot extends AbstractRaftRPC { + + private final String leaderId; + private final long lastIncludedIndex; + private final long lastIncludedTerm; + private final Object data; + + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) { + super(term); + this.leaderId = leaderId; + this.lastIncludedIndex = lastIncludedIndex; + this.lastIncludedTerm = lastIncludedTerm; + this.data = data; + } + + public String getLeaderId() { + return leaderId; + } + + public long getLastIncludedIndex() { + return lastIncludedIndex; + } + + public long getLastIncludedTerm() { + return lastIncludedTerm; + } + + public Object getData() { + return data; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java new file mode 100644 index 0000000000..85b89b70ae --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class InstallSnapshotReply extends AbstractRaftRPC { + + // The followerId - this will be used to figure out which follower is + // responding + private final String followerId; + + protected InstallSnapshotReply(long term, String followerId) { + super(term); + this.followerId = followerId; + } + + public String getFollowerId() { + return followerId; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RaftRPC.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RaftRPC.java new file mode 100644 index 0000000000..a770e54f58 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RaftRPC.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public interface RaftRPC { + public long getTerm(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java new file mode 100644 index 0000000000..981da17ce1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +/** + * Invoked by candidates to gather votes (§5.2). + */ +public class RequestVote extends AbstractRaftRPC{ + + // candidate requesting vote + private final String candidateId; + + // index of candidate’s last log entry (§5.4) + private final long lastLogIndex; + + // term of candidate’s last log entry (§5.4) + private final long lastLogTerm; + + public RequestVote(long term, String candidateId, long lastLogIndex, + long lastLogTerm) { + super(term); + this.candidateId = candidateId; + this.lastLogIndex = lastLogIndex; + this.lastLogTerm = lastLogTerm; + } + + public long getTerm() { + return term; + } + + public String getCandidateId() { + return candidateId; + } + + public long getLastLogIndex() { + return lastLogIndex; + } + + public long getLastLogTerm() { + return lastLogTerm; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java new file mode 100644 index 0000000000..816120cd93 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.messages; + +public class RequestVoteReply extends AbstractRaftRPC{ + + // true means candidate received vot + private final boolean voteGranted; + + public RequestVoteReply(long term, boolean voteGranted) { + super(term); + this.voteGranted = voteGranted; + } + + public long getTerm() { + return term; + } + + public boolean isVoteGranted() { + return voteGranted; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf new file mode 100644 index 0000000000..494a99e5d6 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf @@ -0,0 +1,12 @@ +akka { + loglevel = "DEBUG" + actor { + serializers { + java = "akka.serialization.JavaSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.raft.RaftActor$ReplicatedLogImplEntry" = java + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractActorTest.java new file mode 100644 index 0000000000..1971432fb9 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractActorTest.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public abstract class AbstractActorTest { + private static ActorSystem system; + + @BeforeClass + public static void setUpClass() { + System.setProperty("shard.persistent", "false"); + system = ActorSystem.create("test"); + } + + @AfterClass + public static void tearDownClass() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected ActorSystem getSystem() { + return system; + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java new file mode 100644 index 0000000000..77d0071917 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockRaftActorContext implements RaftActorContext { + + private String id; + private ActorSystem system; + private ActorRef actor; + private long index = 0; + private long lastApplied = 0; + private final ElectionTerm electionTerm; + private ReplicatedLog replicatedLog; + private Map peerAddresses = new HashMap(); + + public MockRaftActorContext(){ + electionTerm = null; + + initReplicatedLog(); + } + + public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){ + this.id = id; + this.system = system; + this.actor = actor; + + final String id1 = id; + electionTerm = new ElectionTerm() { + /** + * Identifier of the actor whose election term information this is + */ + private final String id = id1; + private long currentTerm = 0; + private String votedFor = ""; + + public long getCurrentTerm() { + return currentTerm; + } + + public String getVotedFor() { + return votedFor; + } + + public void update(long currentTerm, String votedFor){ + this.currentTerm = currentTerm; + this.votedFor = votedFor; + + // TODO : Write to some persistent state + } + + @Override public void updateAndPersist(long currentTerm, + String votedFor) { + update(currentTerm, votedFor); + } + }; + + initReplicatedLog(); + } + + + public void initReplicatedLog(){ + this.replicatedLog = new SimpleReplicatedLog(); + this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, "")); + } + + @Override public ActorRef actorOf(Props props) { + return system.actorOf(props); + } + + @Override public ActorSelection actorSelection(String path) { + return system.actorSelection(path); + } + + @Override public String getId() { + return id; + } + + @Override public ActorRef getActor() { + return actor; + } + + @Override public ElectionTerm getTermInformation() { + return electionTerm; + } + + public void setIndex(long index){ + this.index = index; + } + + @Override public long getCommitIndex() { + return index; + } + + @Override public void setCommitIndex(long commitIndex) { + this.index = commitIndex; + } + + @Override public void setLastApplied(long lastApplied){ + this.lastApplied = lastApplied; + } + + @Override public long getLastApplied() { + return lastApplied; + } + + public void setReplicatedLog(ReplicatedLog replicatedLog) { + this.replicatedLog = replicatedLog; + } + + @Override public ReplicatedLog getReplicatedLog() { + return replicatedLog; + } + + @Override public ActorSystem getActorSystem() { + return this.system; + } + + @Override public LoggingAdapter getLogger() { + return Logging.getLogger(system, this); + } + + @Override public Map getPeerAddresses() { + return peerAddresses; + } + + @Override public String getPeerAddress(String peerId) { + return peerAddresses.get(peerId); + } + + @Override public void addToPeers(String name, String address) { + peerAddresses.put(name, address); + } + + @Override public void removePeer(String name) { + peerAddresses.remove(name); + } + + public void setPeerAddresses(Map peerAddresses) { + this.peerAddresses = peerAddresses; + } + + + + public static class SimpleReplicatedLog implements ReplicatedLog { + private final List log = new ArrayList<>(); + + @Override public ReplicatedLogEntry get(long index) { + if(index >= log.size() || index < 0){ + return null; + } + return log.get((int) index); + } + + @Override public ReplicatedLogEntry last() { + if(log.size() == 0){ + return null; + } + return log.get(log.size()-1); + } + + @Override public long lastIndex() { + if(log.size() == 0){ + return -1; + } + + return last().getIndex(); + } + + @Override public long lastTerm() { + if(log.size() == 0){ + return -1; + } + + return last().getTerm(); + } + + @Override public void removeFrom(long index) { + if(index >= log.size() || index < 0){ + return; + } + + log.subList((int) index, log.size()).clear(); + //log.remove((int) index); + } + + @Override public void removeFromAndPersist(long index) { + removeFrom(index); + } + + @Override public void append(ReplicatedLogEntry replicatedLogEntry) { + log.add(replicatedLogEntry); + } + + @Override public void appendAndPersist( + ReplicatedLogEntry replicatedLogEntry) { + append(replicatedLogEntry); + } + + @Override public List getFrom(long index) { + if(index >= log.size() || index < 0){ + return Collections.EMPTY_LIST; + } + List entries = new ArrayList<>(); + for(int i=(int) index ; i < log.size() ; i++) { + entries.add(get(i)); + } + return entries; + } + + @Override public long size() { + return log.size(); + } + + @Override public boolean isPresent(long index) { + if(index >= log.size() || index < 0){ + return false; + } + + return true; + } + + @Override public boolean isInSnapshot(long index) { + return false; + } + + @Override public Object getSnapshot() { + return null; + } + + @Override public long getSnapshotIndex() { + return -1; + } + + @Override public long getSnapshotTerm() { + return -1; + } + } + + public static class MockReplicatedLogEntry implements ReplicatedLogEntry { + + private final long term; + private final long index; + private final Object data; + + public MockReplicatedLogEntry(long term, long index, Object data){ + + this.term = term; + this.index = index; + this.data = data; + } + + @Override public Object getData() { + return data; + } + + @Override public long getTerm() { + return term; + } + + @Override public long getIndex() { + return index; + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java new file mode 100644 index 0000000000..1a37b921e3 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -0,0 +1,364 @@ +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { + + private final ActorRef behaviorActor = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + /** + * This test checks that when a new Raft RPC message is received with a newer + * term the RaftActor gets into the Follower state. + * + * @throws Exception + */ + @Test + public void testHandleRaftRPCWithNewerTerm() throws Exception { + new JavaTestKit(getSystem()) {{ + + assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(), + createAppendEntriesWithNewerTerm()); + + assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(), + createAppendEntriesReplyWithNewerTerm()); + + assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(), + createRequestVoteWithNewerTerm()); + + assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(), + createRequestVoteReplyWithNewerTerm()); + + + }}; + } + + + /** + * This test verifies that when an AppendEntries is received with a term that + * is less that the currentTerm of the RaftActor then the RaftActor does not + * change it's state and it responds back with a failure + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to a high number (1000) + context.getTermInformation().update(1000, "test"); + + AppendEntries appendEntries = + new AppendEntries(100, "leader-1", 0, 0, null, 101); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + + + }}; + } + + + @Test + public void testHandleAppendEntriesAddSameEntryToLog(){ + new JavaTestKit(getSystem()) { + { + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(2, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); + + context.setReplicatedLog(log); + + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); + + AppendEntries appendEntries = + new AppendEntries(2, "leader-1", -1, 1, entries, 0); + + RaftActorBehavior behavior = createBehavior(context); + + if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) { + // Resetting the Candidates term to make sure it will match + // the term sent by AppendEntries. If this was not done then + // the test will fail because the Candidate will assume that + // the message was sent to it from a lower term peer and will + // thus respond with a failure + context.getTermInformation().update(2, "test"); + } + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + assertEquals(1, log.size()); + + + }}; + } + + /** + * This test verifies that when a RequestVote is received by the RaftActor + * with a term which is greater than the RaftActors' currentTerm and the + * senders' log is more upto date than the receiver that the receiver grants + * the vote to the sender + */ + @Test + public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorBehavior behavior = createBehavior( + createActorContext(behaviorActor)); + + RaftState raftState = behavior.handleMessage(getTestActor(), + new RequestVote(1000, "test", 10000, 999)); + + if(behavior.state() != RaftState.Follower){ + assertEquals(RaftState.Follower, raftState); + } else { + + final Boolean out = + new ExpectMsg(duration("1 seconds"), + "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = + (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + } + }; + }}; + } + + /** + * This test verifies that when a RaftActor receives a RequestVote message + * with a term that is greater than it's currentTerm but a less up-to-date + * log then the receiving RaftActor will not grant the vote to the sender + */ + @Test + public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext actorContext = + createActorContext(behaviorActor); + + MockRaftActorContext.SimpleReplicatedLog + log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(20000, + 1000000, "")); + + ((MockRaftActorContext) actorContext).setReplicatedLog(log); + + RaftActorBehavior behavior = createBehavior(actorContext); + + RaftState raftState = behavior.handleMessage(getTestActor(), + new RequestVote(1000, "test", 10000, 999)); + + if(behavior.state() != RaftState.Follower){ + assertEquals(RaftState.Follower, raftState); + } else { + final Boolean out = + new ExpectMsg(duration("1 seconds"), + "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = + (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + } + }; + }}; + } + + + + /** + * This test verifies that the receiving RaftActor will not grant a vote + * to a sender if the sender's term is lesser than the currentTerm of the + * recipient RaftActor + */ + @Test + public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = + createActorContext(behaviorActor); + + context.getTermInformation().update(1000, null); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), + new RequestVote(999, "test", 10000, 999)); + + final Boolean out = + new ExpectMsg(duration("1 seconds"), + "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = + (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm( + ActorRef actorRef, RaftRPC rpc) { + + RaftActorContext actorContext = createActorContext(); + setLastLogEntry( + (MockRaftActorContext) actorContext, 0, 0, ""); + + RaftState raftState = createBehavior(actorContext) + .handleMessage(actorRef, rpc); + + assertEquals(RaftState.Follower, raftState); + } + + protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( + MockRaftActorContext actorContext, long term, long index, Object data) { + return setLastLogEntry(actorContext, + new MockRaftActorContext.MockReplicatedLogEntry(term, index, data)); + } + + protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( + MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) { + MockRaftActorContext.SimpleReplicatedLog + log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(logEntry); + actorContext.setReplicatedLog(log); + + return log; + } + + protected abstract RaftActorBehavior createBehavior( + RaftActorContext actorContext); + + protected RaftActorBehavior createBehavior() { + return createBehavior(createActorContext()); + } + + protected RaftActorContext createActorContext() { + return new MockRaftActorContext(); + } + + protected RaftActorContext createActorContext(ActorRef actor) { + return new MockRaftActorContext("test", getSystem(), actor); + } + + protected AppendEntries createAppendEntriesWithNewerTerm() { + return new AppendEntries(100, "leader-1", 0, 0, null, 1); + } + + protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() { + return new AppendEntriesReply("follower-1", 100, false, 100, 100); + } + + protected RequestVote createRequestVoteWithNewerTerm() { + return new RequestVote(100, "candidate-1", 10, 100); + } + + protected RequestVoteReply createRequestVoteReplyWithNewerTerm() { + return new RequestVoteReply(100, false); + } + + + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java new file mode 100644 index 0000000000..8bcee58afe --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -0,0 +1,299 @@ +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class CandidateTest extends AbstractRaftActorBehaviorTest { + + private final ActorRef candidateActor = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor1 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor2 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor3 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final ActorRef peerActor4 = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + private final Map onePeer = new HashMap<>(); + private final Map twoPeers = new HashMap<>(); + private final Map fourPeers = new HashMap<>(); + + @Before + public void setUp(){ + onePeer.put(peerActor1.path().toString(), + peerActor1.path().toString()); + + twoPeers.put(peerActor1.path().toString(), + peerActor1.path().toString()); + twoPeers.put(peerActor2.path().toString(), + peerActor2.path().toString()); + + fourPeers.put(peerActor1.path().toString(), + peerActor1.path().toString()); + fourPeers.put(peerActor2.path().toString(), + peerActor2.path().toString()); + fourPeers.put(peerActor3.path().toString(), + peerActor3.path().toString()); + fourPeers.put(peerActor4.path().toString(), + peerActor3.path().toString()); + + + } + + @Test + public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){ + RaftActorContext raftActorContext = createActorContext(); + long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm(); + + new Candidate(raftActorContext); + + assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm()); + assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor()); + } + + @Test + public void testThatAnElectionTimeoutIsTriggered(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Candidate candidate = new Candidate(createActorContext(getTestActor())); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof ElectionTimeout) { + return true; + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleElectionTimeoutWhenThereAreZeroPeers(){ + RaftActorContext raftActorContext = createActorContext(); + Candidate candidate = + new Candidate(raftActorContext); + + RaftState raftState = + candidate.handleMessage(candidateActor, new ElectionTimeout()); + + Assert.assertEquals(RaftState.Leader, raftState); + } + + @Test + public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){ + MockRaftActorContext raftActorContext = + (MockRaftActorContext) createActorContext(); + raftActorContext.setPeerAddresses(onePeer); + Candidate candidate = + new Candidate(raftActorContext); + + RaftState raftState = + candidate.handleMessage(candidateActor, new ElectionTimeout()); + + Assert.assertEquals(RaftState.Candidate, raftState); + } + + @Test + public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){ + MockRaftActorContext raftActorContext = + (MockRaftActorContext) createActorContext(); + raftActorContext.setPeerAddresses(twoPeers); + Candidate candidate = + new Candidate(raftActorContext); + + RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + + Assert.assertEquals(RaftState.Leader, stateOnFirstVote); + + } + + @Test + public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){ + MockRaftActorContext raftActorContext = + (MockRaftActorContext) createActorContext(); + raftActorContext.setPeerAddresses(fourPeers); + Candidate candidate = + new Candidate(raftActorContext); + + RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + + RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true)); + + Assert.assertEquals(RaftState.Candidate, stateOnFirstVote); + Assert.assertEquals(RaftState.Leader, stateOnSecondVote); + + } + + @Test + public void testResponseToAppendEntriesWithLowerTerm(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Candidate candidate = new Candidate(createActorContext(getTestActor())); + + candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + @Test + public void testResponseToRequestVoteWithLowerTerm(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Candidate candidate = new Candidate(createActorContext(getTestActor())); + + candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, null); + + // Once a candidate is created it will immediately increment the current term so after + // construction the currentTerm should be 1001 + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1001, "test", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, "test"); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1001, "candidate", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + + + @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + return new Candidate(actorContext); + } + + @Override protected RaftActorContext createActorContext() { + return new MockRaftActorContext("test", getSystem(), candidateActor); + } + + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java new file mode 100644 index 0000000000..b7c371dd39 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -0,0 +1,410 @@ +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class FollowerTest extends AbstractRaftActorBehaviorTest { + + private final ActorRef followerActor = getSystem().actorOf(Props.create( + DoNothingActor.class)); + + + @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + return new Follower(actorContext); + } + + @Override protected RaftActorContext createActorContext() { + return new MockRaftActorContext("test", getSystem(), followerActor); + } + + @Test + public void testThatAnElectionTimeoutIsTriggered(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + Follower follower = new Follower(createActorContext(getTestActor())); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof ElectionTimeout) { + return true; + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleElectionTimeout(){ + RaftActorContext raftActorContext = createActorContext(); + Follower follower = + new Follower(raftActorContext); + + RaftState raftState = + follower.handleMessage(followerActor, new ElectionTimeout()); + + Assert.assertEquals(RaftState.Candidate, raftState); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, null); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + } + }; + }}; + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + RaftActorContext context = createActorContext(getTestActor()); + + context.getTermInformation().update(1000, "test"); + + RaftActorBehavior follower = createBehavior(context); + + follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999)); + + final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof RequestVoteReply) { + RequestVoteReply reply = (RequestVoteReply) in; + return reply.isVoteGranted(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + } + }; + }}; + } + + /** + * This test verifies that when an AppendEntries RPC is received by a RaftActor + * with a commitIndex that is greater than what has been applied to the + * state machine of the RaftActor, the RaftActor applies the state and + * sets it current applied state to the commitIndex of the sender. + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + RaftActorContext context = + createActorContext(); + + context.setLastApplied(100); + setLastLogEntry((MockRaftActorContext) context, 0, 0, ""); + + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101, + "foo") + ); + + // The new commitIndex is 101 + AppendEntries appendEntries = + new AppendEntries(100, "leader-1", 0, 0, entries, 101); + + RaftState raftState = + createBehavior(context).handleMessage(getRef(), appendEntries); + + assertEquals(101L, context.getLastApplied()); + + }}; + } + + /** + * This test verifies that when an AppendEntries is received a specific prevLogTerm + * which does not match the term that is in RaftActors log entry at prevLogIndex + * then the RaftActor does not change it's state and it returns a failure. + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(95, "test"); + + // Set the last log entry term for the receiver to be greater than + // what we will be sending as the prevLogTerm in AppendEntries + MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = + setLastLogEntry(context, 20, 0, ""); + + // AppendEntries is now sent with a bigger term + // this will set the receivers term to be the same as the sender's term + AppendEntries appendEntries = + new AppendEntries(100, "leader-1", 0, 0, null, 101); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + + + }}; + } + + + + /** + * This test verifies that when a new AppendEntries message is received with + * new entries and the logs of the sender and receiver match that the new + * entries get added to the log and the log is incremented by the number of + * entries received in appendEntries + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesAddNewEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three")); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = + new AppendEntries(1, "leader-1", 2, 1, entries, 4); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + assertEquals(5, log.last().getIndex() + 1); + assertNotNull(log.get(3)); + assertNotNull(log.get(4)); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + + + }}; + } + + + + /** + * This test verifies that when a new AppendEntries message is received with + * new entries and the logs of the sender and receiver are out-of-sync that + * the log is first corrected by removing the out of sync entries from the + * log and then adding in the new entries sent with the AppendEntries message + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesCorrectReceiverLogEntries() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(2, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1")); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = + new AppendEntries(2, "leader-1", 1, 1, entries, 3); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // The entry at index 2 will be found out-of-sync with the leader + // and will be removed + // Then the two new entries will be added to the log + // Thus making the log to have 4 entries + assertEquals(4, log.last().getIndex() + 1); + assertNotNull(log.get(2)); + + + assertEquals("one", log.get(1).getData()); + + // Check that the entry at index 2 has the new data + assertEquals("two-1", log.get(2).getData()); + + assertEquals("three", log.get(3).getData()); + assertNotNull(log.get(3)); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + + + }}; + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java new file mode 100644 index 0000000000..35bf6f15b4 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -0,0 +1,205 @@ +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; +import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class LeaderTest extends AbstractRaftActorBehaviorTest { + + private ActorRef leaderActor = + getSystem().actorOf(Props.create(DoNothingActor.class)); + private ActorRef senderActor = + getSystem().actorOf(Props.create(DoNothingActor.class)); + + @Test + public void testHandleMessageForUnknownMessage() throws Exception { + new JavaTestKit(getSystem()) {{ + Leader leader = + new Leader(createActorContext()); + + // handle message should return the Leader state when it receives an + // unknown message + RaftState state = leader.handleMessage(senderActor, "foo"); + Assert.assertEquals(RaftState.Leader, state); + }}; + } + + + @Test + public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef followerActor = getTestActor(); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + Map peerAddresses = new HashMap(); + + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + actorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(actorContext); + leader.handleMessage(senderActor, new SendHeartBeat()); + + final String out = + new ExpectMsg(duration("1 seconds"), + "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof AppendEntries) { + if (((AppendEntries) in).getTerm() + == 0) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + } + + + }; + }}; + } + + @Test + public void testHandleReplicateMessageSendAppendEntriesToFollower() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef followerActor = getTestActor(); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + Map peerAddresses = new HashMap(); + + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + actorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(actorContext); + RaftState raftState = leader + .handleMessage(senderActor, new Replicate(null, null, + new MockRaftActorContext.MockReplicatedLogEntry(1, + 100, + "foo") + )); + + // State should not change + assertEquals(RaftState.Leader, raftState); + + final String out = + new ExpectMsg(duration("1 seconds"), + "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof AppendEntries) { + if (((AppendEntries) in).getTerm() + == 0) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + } + + + }; + }}; + } + + @Test + public void testHandleReplicateMessageWhenThereAreNoFollowers() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef raftActor = getTestActor(); + + MockRaftActorContext actorContext = + new MockRaftActorContext("test", getSystem(), raftActor); + + Leader leader = new Leader(actorContext); + RaftState raftState = leader + .handleMessage(senderActor, new Replicate(null, "state-id", + new MockRaftActorContext.MockReplicatedLogEntry(1, + 100, + "foo") + )); + + // State should not change + assertEquals(RaftState.Leader, raftState); + + assertEquals(100, actorContext.getCommitIndex()); + + final String out = + new ExpectMsg(duration("1 seconds"), + "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof ApplyState) { + if (((ApplyState) in).getIdentifier().equals("state-id")) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + } + + + }; + }}; + } + + @Override protected RaftActorBehavior createBehavior( + RaftActorContext actorContext) { + return new Leader(actorContext); + } + + @Override protected RaftActorContext createActorContext() { + return new MockRaftActorContext("test", getSystem(), leaderActor); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java new file mode 100644 index 0000000000..741c473681 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/DoNothingActor.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.utils; + +import akka.actor.UntypedActor; + +public class DoNothingActor extends UntypedActor{ + @Override public void onReceive(Object message) throws Exception { + + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf new file mode 100644 index 0000000000..2647850667 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf @@ -0,0 +1,11 @@ +akka { + actor { + serializers { + java = "akka.serialization.JavaSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java + } + } +} \ No newline at end of file