<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>flow-management-compatibility</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-binding-api</artifactId>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>sal-akka-raft</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-cluster_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <Export-package></Export-package>
+ <Private-Package></Private-Package>
+ <Import-Package></Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>org.opendaylight.controller.*</include>
+ </includes>
+ <check>false</check>
+ </configuration>
+ <executions>
+ <execution>
+ <id>pre-test</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>post-test</id>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+ </scm>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+
+public class ClientActor extends UntypedActor {
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+ private final ActorRef target;
+
+ public ClientActor(ActorRef target){
+ this.target = target;
+ }
+
+ public static Props props(final ActorRef target){
+ return Props.create(new Creator<ClientActor>(){
+
+ @Override public ClientActor create() throws Exception {
+ return new ClientActor(target);
+ }
+ });
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(message instanceof KeyValue) {
+ target.tell(message, getSelf());
+ } else if(message instanceof KeyValueSaved){
+ LOG.info("KeyValue saved");
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.RaftActor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A sample actor showing how the RaftActor is to be extended
+ */
+public class ExampleActor extends RaftActor {
+
+ private final Map<String, String> state = new HashMap();
+
+ private long persistIdentifier = 1;
+
+
+ public ExampleActor(String id, Map<String, String> peerAddresses) {
+ super(id, peerAddresses);
+ }
+
+ public static Props props(final String id, final Map<String, String> peerAddresses){
+ return Props.create(new Creator<ExampleActor>(){
+
+ @Override public ExampleActor create() throws Exception {
+ return new ExampleActor(id, peerAddresses);
+ }
+ });
+ }
+
+ @Override public void onReceiveCommand(Object message){
+ if(message instanceof KeyValue){
+
+ if(isLeader()) {
+ String persistId = Long.toString(persistIdentifier++);
+ persistData(getSender(), persistId, message);
+ } else {
+ if(getLeader() != null) {
+ getLeader().forward(message, getContext());
+ }
+ }
+
+ } else if (message instanceof PrintState) {
+ LOG.debug("State of the node:"+getId() + " is="+state.size());
+
+ } else if (message instanceof PrintRole) {
+ LOG.debug(getId() + " = " + getRaftState());
+ }
+ super.onReceiveCommand(message);
+ }
+
+ @Override protected void applyState(ActorRef clientActor, String identifier,
+ Object data) {
+ if(data instanceof KeyValue){
+ KeyValue kv = (KeyValue) data;
+ state.put(kv.getKey(), kv.getValue());
+ if(clientActor != null) {
+ clientActor.tell(new KeyValueSaved(), getSelf());
+ }
+ }
+ }
+
+ @Override protected Object createSnapshot() {
+ return state;
+ }
+
+ @Override protected void applySnapshot(Object snapshot) {
+ state.clear();
+ state.putAll((HashMap) snapshot);
+ }
+
+ @Override public void onReceiveRecover(Object message) {
+ super.onReceiveRecover(message);
+ }
+
+ @Override public String persistenceId() {
+ return getId();
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Created by kramesha on 7/16/14.
+ */
+public class LogGenerator {
+ private Map<ActorRef, LoggingThread> clientToLoggingThread = new HashMap<ActorRef, LoggingThread>();
+
+ public void startLoggingForClient(ActorRef client) {
+ LoggingThread lt = new LoggingThread(client);
+ clientToLoggingThread.put(client, lt);
+ Thread t = new Thread(lt);
+ t.start();
+ }
+
+ public void stopLoggingForClient(ActorRef client) {
+ clientToLoggingThread.get(client).stopLogging();
+ clientToLoggingThread.remove(client);
+ }
+
+ public class LoggingThread implements Runnable {
+
+ private ActorRef clientActor;
+ private volatile boolean stopLogging = false;
+
+ public LoggingThread(ActorRef clientActor) {
+ this.clientActor = clientActor;
+ }
+
+ public void run() {
+ Random r = new Random();
+ while (true) {
+ if (stopLogging) {
+ System.out.println("Logging stopped for client:" + clientActor.path());
+ break;
+ }
+ String key = clientActor.path().name();
+ int random = r.nextInt(100);
+ clientActor.tell(new KeyValue(key+"-key-" + random, "value-" + random), null);
+ try {
+ Thread.sleep((random%10) * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void stopLogging() {
+ stopLogging = true;
+ }
+
+ public void startLogging() {
+ stopLogging = false;
+ }
+
+
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Main {
+ private static final ActorSystem actorSystem = ActorSystem.create();
+ // Create three example actors
+ private static Map<String, String> allPeers = new HashMap<>();
+
+ static {
+ allPeers.put("example-1", "akka://default/user/example-1");
+ allPeers.put("example-2", "akka://default/user/example-2");
+ allPeers.put("example-3", "akka://default/user/example-3");
+ }
+
+ public static void main(String[] args) throws Exception{
+ ActorRef example1Actor =
+ actorSystem.actorOf(ExampleActor.props("example-1",
+ withoutPeer("example-1")), "example-1");
+
+ ActorRef example2Actor =
+ actorSystem.actorOf(ExampleActor.props("example-2",
+ withoutPeer("example-2")), "example-2");
+
+ ActorRef example3Actor =
+ actorSystem.actorOf(ExampleActor.props("example-3",
+ withoutPeer("example-3")), "example-3");
+
+
+ List<ActorRef> examples = Arrays.asList(example1Actor, example2Actor, example3Actor);
+
+ ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor));
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(System.in));
+
+ System.out.println("Usage :");
+ System.out.println("s <1-3> to start a peer");
+ System.out.println("k <1-3> to kill a peer");
+
+ while(true) {
+ System.out.print("Enter command (0 to exit):");
+ try {
+ String s = br.readLine();
+ String[] split = s.split(" ");
+ if(split.length > 1) {
+ String command = split[0];
+ String actor = split[1];
+
+ if ("k".equals(command)) {
+ int i = Integer.parseInt(actor);
+ examples.get(i - 1)
+ .tell(PoisonPill.getInstance(), null);
+ continue;
+ } else if ("s".equals(command)) {
+ int i = Integer.parseInt(actor);
+ String actorName = "example-" + i;
+ examples.add(i - 1,
+ actorSystem.actorOf(ExampleActor.props(actorName,
+ withoutPeer(actorName)), actorName));
+ System.out.println("Created actor : " + actorName);
+ continue;
+ }
+ }
+
+ int i = Integer.parseInt(s);
+ if(i == 0){
+ System.exit(0);
+ }
+ clientActor.tell(new KeyValue("key " + i, "value " + i), null);
+ } catch (NumberFormatException nfe) {
+ System.err.println("Invalid Format!");
+ }
+ }
+ }
+
+ private static Map<String, String> withoutPeer(String peerId) {
+ Map<String, String> without = new HashMap<>(allPeers);
+ without.remove(peerId);
+ return without;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
+import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class TestDriver {
+
+ private static final ActorSystem actorSystem = ActorSystem.create();
+ private static Map<String, String> allPeers = new HashMap<>();
+ private static Map<String, ActorRef> clientActorRefs = new HashMap<String, ActorRef>();
+ private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
+ private static LogGenerator logGenerator = new LogGenerator();;
+
+ /**
+ * Create nodes, add clients and start logging.
+ * Commands
+ * bye
+ * createNodes:{num}
+ * addNodes:{num}
+ * stopNode:{nodeName}
+ * addClients:{num}
+ * addClientsToNode:{nodeName, num}
+ * startLogging
+ * stopLogging
+ * startLoggingForClient:{nodeName}
+ * stopLoggingForClient:{nodeName}
+ * printNodes
+ * printState
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ TestDriver td = new TestDriver();
+
+ System.out.println("Enter command (type bye to exit):");
+
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+ while(true) {
+ String command = br.readLine();
+ if (command.startsWith("bye")) {
+ System.exit(0);
+
+ } else if (command.startsWith("createNodes")) {
+ String[] arr = command.split(":");
+ int n = Integer.parseInt(arr[1]);
+ td.createNodes(n);
+
+ } else if (command.startsWith("addNodes")) {
+ String[] arr = command.split(":");
+ int n = Integer.parseInt(arr[1]);
+ td.addNodes(n);
+
+ } else if (command.startsWith("addClients")) {
+ String[] arr = command.split(":");
+ int n = Integer.parseInt(arr[1]);
+ td.addClients(n);
+
+ } else if (command.startsWith("addClientsToNode")) {
+ String[] arr = command.split(":");
+ String nodeName = arr[1];
+ int n = Integer.parseInt(arr[1]);
+ td.addClientsToNode(nodeName, n);
+
+ } else if (command.startsWith("stopNode")) {
+ String[] arr = command.split(":");
+ td.stopNode(arr[1]);
+
+ } else if (command.startsWith("startLogging")) {
+ td.startAllLogging();
+
+ } else if (command.startsWith("startLoggingForClient")) {
+ String[] arr = command.split(":");
+ td.startLoggingForClient(clientActorRefs.get(arr[1]));
+
+ } else if (command.startsWith("stopLogging")) {
+ td.stopAllLogging();
+
+ } else if (command.startsWith("stopLoggingForClient")) {
+ String[] arr = command.split(":");
+ td.stopLoggingForClient(clientActorRefs.get(arr[1]));
+
+ } else if (command.startsWith("printState")) {
+ td.printState();
+ } else if (command.startsWith("printNodes")) {
+ td.printNodes();
+ }
+
+ }
+ }
+
+ public void createNodes(int num) {
+ for (int i=0; i < num; i++) {
+ int rand = getUnusedRandom(num);
+ allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+ }
+
+ for (String s : allPeers.keySet()) {
+ ActorRef exampleActor = actorSystem.actorOf(
+ ExampleActor.props(s, withoutPeer(s)), s);
+ actorRefs.put(s, exampleActor);
+ System.out.println("Created node:"+s);
+
+ }
+ }
+
+ // add new nodes , pass in the count
+ public void addNodes(int num) {
+ Map<String, String> newPeers = new HashMap<>();
+ for (int i=0; i < num; i++) {
+ int rand = getUnusedRandom(num);
+ newPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+ allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+
+ }
+ Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
+ for (Map.Entry<String, String> entry : newPeers.entrySet()) {
+ ActorRef exampleActor = actorSystem.actorOf(
+ ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey());
+ newActorRefs.put(entry.getKey(), exampleActor);
+
+ //now also add these new nodes as peers from the previous nodes
+ for (ActorRef actor : actorRefs.values()) {
+ actor.tell(new AddRaftPeer(entry.getKey(), entry.getValue()), null);
+ }
+
+ System.out.println("Added node:" + entry);
+ }
+
+ actorRefs.putAll(newActorRefs);
+ }
+
+
+ // add num clients to all nodes in the system
+ public void addClients(int num) {
+ for(Map.Entry<String,ActorRef> actorRefEntry : actorRefs.entrySet()) {
+ for (int i=0; i < num; i++) {
+ String clientName = "client-" + i + "-" + actorRefEntry.getKey();
+ ActorRef clientActor = actorSystem.actorOf(
+ ClientActor.props(actorRefEntry.getValue()), clientName);
+ clientActorRefs.put(clientName, clientActor);
+ System.out.println("Created client-node:" + clientName);
+ }
+ }
+ }
+
+ // add num clients to a node
+ public void addClientsToNode(String actorName, int num) {
+ ActorRef actorRef = actorRefs.get(actorName);
+ for (int i=0; i < num; i++) {
+ String clientName = "client-" + i + "-" + actorRef;
+ clientActorRefs.put(clientName,
+ actorSystem.actorOf(ClientActor.props(actorRef), clientName));
+ System.out.println("Added client-node:" + clientName);
+ }
+ }
+
+ public void stopNode(String actorName) {
+ ActorRef actorRef = actorRefs.get(actorName);
+ String clientName = "client-"+actorName;
+ if(clientActorRefs.containsKey(clientName)) {
+ actorSystem.stop(clientActorRefs.get(clientName));
+ clientActorRefs.remove(clientName);
+ }
+ actorSystem.stop(actorRef);
+ actorRefs.remove(actorName);
+
+ for (ActorRef actor : actorRefs.values()) {
+ actor.tell(new RemoveRaftPeer(actorName), null);
+ }
+
+ allPeers.remove(actorName);
+
+ }
+
+ public void startAllLogging() {
+ if(!clientActorRefs.isEmpty()) {
+ for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
+ logGenerator.startLoggingForClient(client.getValue());
+ System.out.println("Started logging for client:"+client.getKey());
+ }
+ } else {
+ System.out.println("There are no clients for any nodes. First create clients using commands- addClients:<num> or addClientsToNode:<nodename>:<num>");
+ }
+
+ }
+
+ public void startLoggingForClient(ActorRef client) {
+ logGenerator.startLoggingForClient(client);
+ }
+
+ public void stopAllLogging() {
+ for(Map.Entry<String,ActorRef> client : clientActorRefs.entrySet()) {
+ logGenerator.stopLoggingForClient(client.getValue());
+ }
+ }
+
+ public void stopLoggingForClient(ActorRef client) {
+ logGenerator.stopLoggingForClient(client);
+ }
+
+ public void printState() {
+ for (ActorRef ref : actorRefs.values()) {
+ ref.tell(new PrintState(), null);
+ }
+ }
+
+ public void printNodes() {
+ for (ActorRef ref : actorRefs.values()) {
+ ref.tell(new PrintRole(), null);
+ }
+ }
+
+ public ActorRef getLeader() {
+ return null;
+ }
+
+ private int getUnusedRandom(int num) {
+ int rand = -1;
+ do {
+ rand = (new Random()).nextInt(num * num);
+ } while (allPeers.keySet().contains("example-"+rand));
+
+ return rand;
+ }
+
+ private static Map<String, String> withoutPeer(String peerId) {
+ Map<String, String> without = new ConcurrentHashMap<>(allPeers);
+ without.remove(peerId);
+
+ return without;
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.io.Serializable;
+
+public class KeyValue implements Serializable{
+ private final String key;
+ private final String value;
+
+ public KeyValue(String key, String value){
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override public String toString() {
+ return "KeyValue{" +
+ "key='" + key + '\'' +
+ ", value='" + value + '\'' +
+ '}';
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example.messages;
+
+public class KeyValueSaved {
+}
--- /dev/null
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Created by kramesha on 7/17/14.
+ */
+public class PrintRole {
+}
--- /dev/null
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Created by kramesha on 7/17/14.
+ */
+public class PrintState {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+public interface ClientRequestTracker {
+ /**
+ * The client actor who is waiting for a response
+ *
+ * @return
+ */
+ ActorRef getClientActor();
+
+ /**
+ *
+ * @return
+ */
+ String getIdentifier();
+
+ /**
+ * The index of the log entry which needs to be replicated
+ *
+ * @return
+ */
+ long getIndex();
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+public class ClientRequestTrackerImpl implements ClientRequestTracker {
+
+ private final ActorRef clientActor;
+ private final String identifier;
+ private final long logIndex;
+
+ public ClientRequestTrackerImpl(ActorRef clientActor, String identifier,
+ long logIndex) {
+
+ this.clientActor = clientActor;
+
+ this.identifier = identifier;
+
+ this.logIndex = logIndex;
+ }
+
+ @Override public ActorRef getClientActor() {
+ return clientActor;
+ }
+
+ @Override public long getIndex() {
+ return logIndex;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * ElectionTerm contains information about a RaftActors election term.
+ * <p>
+ * This information includes the last known current term of the RaftActor
+ * and which peer was voted for by the RaftActor in that term
+ * <p>
+ * This class ensures that election term information is persisted
+ */
+public interface ElectionTerm {
+ /**
+ * latest term server has seen (initialized to 0
+ * on first boot, increases monotonically)
+ */
+ long getCurrentTerm();
+
+ /**
+ * candidateId that received vote in current
+ * term (or null if none)
+ */
+ String getVotedFor();
+
+ /**
+ * To be called mainly when we are recovering in-memory election state from
+ * persistent storage
+ *
+ * @param currentTerm
+ * @param votedFor
+ */
+ void update(long currentTerm, String votedFor);
+
+ /**
+ * To be called when we need to update the current term either because we
+ * received a message from someone with a more up-to-date term or because we
+ * just voted for someone
+ * <p>
+ * This information needs to be persisted so that on recovery the replica
+ * can start itself in the right term and know if it has already voted in
+ * that term or not
+ *
+ * @param currentTerm
+ * @param votedFor
+ */
+ void updateAndPersist(long currentTerm, String votedFor);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The state of the followers log as known by the Leader
+ */
+public interface FollowerLogInformation {
+
+ /**
+ * Increment the value of the nextIndex
+ * @return
+ */
+ public long incrNextIndex();
+
+ /**
+ * Decrement the value of the nextIndex
+ * @return
+ */
+ public long decrNextIndex();
+
+ /**
+ *
+ * @param nextIndex
+ */
+ void setNextIndex(long nextIndex);
+
+ /**
+ * Increment the value of the matchIndex
+ * @return
+ */
+ public long incrMatchIndex();
+
+ public void setMatchIndex(long matchIndex);
+
+ /**
+ * The identifier of the follower
+ * This could simply be the url of the remote actor
+ */
+ public String getId();
+
+ /**
+ * for each server, index of the next log entry
+ * to send to that server (initialized to leader
+ * last log index + 1)
+ */
+ public AtomicLong getNextIndex();
+
+ /**
+ * for each server, index of highest log entry
+ * known to be replicated on server
+ * (initialized to 0, increases monotonically)
+ */
+ public AtomicLong getMatchIndex();
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FollowerLogInformationImpl implements FollowerLogInformation{
+
+ private final String id;
+
+ private final AtomicLong nextIndex;
+
+ private final AtomicLong matchIndex;
+
+ public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
+ AtomicLong matchIndex) {
+ this.id = id;
+ this.nextIndex = nextIndex;
+ this.matchIndex = matchIndex;
+ }
+
+ public long incrNextIndex(){
+ return nextIndex.incrementAndGet();
+ }
+
+ @Override public long decrNextIndex() {
+ return nextIndex.decrementAndGet();
+ }
+
+ @Override public void setNextIndex(long nextIndex) {
+ this.nextIndex.set(nextIndex);
+ }
+
+ public long incrMatchIndex(){
+ return matchIndex.incrementAndGet();
+ }
+
+ @Override public void setMatchIndex(long matchIndex) {
+ this.matchIndex.set(matchIndex);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public AtomicLong getNextIndex() {
+ return nextIndex;
+ }
+
+ public AtomicLong getMatchIndex() {
+ return matchIndex;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.UntypedPersistentActor;
+import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RaftActor encapsulates a state machine that needs to be kept synchronized
+ * in a cluster. It implements the RAFT algorithm as described in the paper
+ * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
+ * In Search of an Understandable Consensus Algorithm</a>
+ * <p/>
+ * RaftActor has 3 states and each state has a certain behavior associated
+ * with it. A Raft actor can behave as,
+ * <ul>
+ * <li> A Leader </li>
+ * <li> A Follower (or) </li>
+ * <li> A Candidate </li>
+ * </ul>
+ * <p/>
+ * <p/>
+ * A RaftActor MUST be a Leader in order to accept requests from clients to
+ * change the state of it's encapsulated state machine. Once a RaftActor becomes
+ * a Leader it is also responsible for ensuring that all followers ultimately
+ * have the same log and therefore the same state machine as itself.
+ * <p/>
+ * <p/>
+ * The current behavior of a RaftActor determines how election for leadership
+ * is initiated and how peer RaftActors react to request for votes.
+ * <p/>
+ * <p/>
+ * Each RaftActor also needs to know the current election term. It uses this
+ * information for a couple of things. One is to simply figure out who it
+ * voted for in the last election. Another is to figure out if the message
+ * it received to update it's state is stale.
+ * <p/>
+ * <p/>
+ * The RaftActor uses akka-persistence to store it's replicated log.
+ * Furthermore through it's behaviors a Raft Actor determines
+ * <p/>
+ * <ul>
+ * <li> when a log entry should be persisted </li>
+ * <li> when a log entry should be applied to the state machine (and) </li>
+ * <li> when a snapshot should be saved </li>
+ * </ul>
+ */
+public abstract class RaftActor extends UntypedPersistentActor {
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+ /**
+ * The current state determines the current behavior of a RaftActor
+ * A Raft Actor always starts off in the Follower State
+ */
+ private RaftActorBehavior currentBehavior;
+
+ /**
+ * This context should NOT be passed directly to any other actor it is
+ * only to be consumed by the RaftActorBehaviors
+ */
+ private RaftActorContext context;
+
+ /**
+ * The in-memory journal
+ */
+ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
+
+
+ public RaftActor(String id, Map<String, String> peerAddresses) {
+ context = new RaftActorContextImpl(this.getSelf(),
+ this.getContext(),
+ id, new ElectionTermImpl(),
+ -1, -1, replicatedLog, peerAddresses, LOG);
+ }
+
+ @Override public void onReceiveRecover(Object message) {
+ if (message instanceof SnapshotOffer) {
+ SnapshotOffer offer = (SnapshotOffer) message;
+ Snapshot snapshot = (Snapshot) offer.snapshot();
+
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+ replicatedLog = new ReplicatedLogImpl(snapshot);
+
+ // Apply the snapshot to the actors state
+ applySnapshot(snapshot.getState());
+
+ } else if (message instanceof ReplicatedLogEntry) {
+ replicatedLog.append((ReplicatedLogEntry) message);
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+ } else if (message instanceof RecoveryCompleted) {
+ LOG.debug(
+ "Last index in log : " + replicatedLog.lastIndex());
+ currentBehavior = switchBehavior(RaftState.Follower);
+ }
+ }
+
+ @Override public void onReceiveCommand(Object message) {
+ if (message instanceof ApplyState){
+ ApplyState applyState = (ApplyState) message;
+
+ LOG.debug("Applying state for log index {} data {}",
+ applyState.getReplicatedLogEntry().getIndex(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState(applyState.getClientActor(), applyState.getIdentifier(),
+ applyState.getReplicatedLogEntry().getData());
+ } else if(message instanceof ApplySnapshot ) {
+ applySnapshot(((ApplySnapshot) message).getSnapshot());
+ } else if (message instanceof FindLeader) {
+ getSender().tell(
+ new FindLeaderReply(
+ context.getPeerAddress(currentBehavior.getLeaderId())),
+ getSelf()
+ );
+ } else if (message instanceof SaveSnapshotSuccess) {
+ SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+
+ // TODO: Not sure if we want to be this aggressive with trimming stuff
+ trimPersistentData(success.metadata().sequenceNr());
+
+ } else if (message instanceof SaveSnapshotFailure) {
+
+ // TODO: Handle failure in saving the snapshot
+ // Maybe do retries on failure
+
+ } else if (message instanceof AddRaftPeer){
+
+ // FIXME : Do not add raft peers like this.
+ // When adding a new Peer we have to ensure that the a majority of
+ // the peers know about the new Peer. Doing it this way may cause
+ // a situation where multiple Leaders may emerge
+ AddRaftPeer arp = (AddRaftPeer)message;
+ context.addToPeers(arp.getName(), arp.getAddress());
+
+ } else if (message instanceof RemoveRaftPeer){
+
+ RemoveRaftPeer rrp = (RemoveRaftPeer)message;
+ context.removePeer(rrp.getName());
+
+ } else {
+
+ RaftState state =
+ currentBehavior.handleMessage(getSender(), message);
+ currentBehavior = switchBehavior(state);
+ }
+ }
+
+
+
+ /**
+ * When a derived RaftActor needs to persist something it must call
+ * persistData.
+ *
+ * @param clientActor
+ * @param identifier
+ * @param data
+ */
+ protected void persistData(ActorRef clientActor, String identifier,
+ Object data) {
+
+ ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+ context.getReplicatedLog().lastIndex() + 1,
+ context.getTermInformation().getCurrentTerm(), data);
+
+ LOG.debug("Persist data {}", replicatedLogEntry);
+
+ replicatedLog
+ .appendAndPersist(clientActor, identifier, replicatedLogEntry);
+ }
+
+ protected String getId() {
+ return context.getId();
+ }
+
+ /**
+ * Derived actors can call the isLeader method to check if the current
+ * RaftActor is the Leader or not
+ *
+ * @return true it this RaftActor is a Leader false otherwise
+ */
+ protected boolean isLeader() {
+ return context.getId().equals(currentBehavior.getLeaderId());
+ }
+
+ /**
+ * Derived actor can call getLeader if they need a reference to the Leader.
+ * This would be useful for example in forwarding a request to an actor
+ * which is the leader
+ *
+ * @return A reference to the leader if known, null otherwise
+ */
+ protected ActorSelection getLeader(){
+ String leaderId = currentBehavior.getLeaderId();
+ if (leaderId == null) {
+ return null;
+ }
+ String peerAddress = context.getPeerAddress(leaderId);
+ LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+ return context.actorSelection(peerAddress);
+ }
+
+ protected RaftState getRaftState() {
+ return currentBehavior.state();
+ }
+
+
+
+ /**
+ * The applyState method will be called by the RaftActor when some data
+ * needs to be applied to the actor's state
+ *
+ * @param clientActor A reference to the client who sent this message. This
+ * is the same reference that was passed to persistData
+ * by the derived actor. clientActor may be null when
+ * the RaftActor is behaving as a follower or during
+ * recovery.
+ * @param identifier The identifier of the persisted data. This is also
+ * the same identifier that was passed to persistData by
+ * the derived actor. identifier may be null when
+ * the RaftActor is behaving as a follower or during
+ * recovery
+ * @param data A piece of data that was persisted by the persistData call.
+ * This should NEVER be null.
+ */
+ protected abstract void applyState(ActorRef clientActor, String identifier,
+ Object data);
+
+ /**
+ * This method will be called by the RaftActor when a snapshot needs to be
+ * created. The derived actor should respond with its current state.
+ * <p/>
+ * During recovery the state that is returned by the derived actor will
+ * be passed back to it by calling the applySnapshot method
+ *
+ * @return The current state of the actor
+ */
+ protected abstract Object createSnapshot();
+
+ /**
+ * This method will be called by the RaftActor during recovery to
+ * reconstruct the state of the actor.
+ * <p/>
+ * This method may also be called at any other point during normal
+ * operations when the derived actor is out of sync with it's peers
+ * and the only way to bring it in sync is by applying a snapshot
+ *
+ * @param snapshot A snapshot of the state of the actor
+ */
+ protected abstract void applySnapshot(Object snapshot);
+
+ private RaftActorBehavior switchBehavior(RaftState state) {
+ if (currentBehavior != null) {
+ if (currentBehavior.state() == state) {
+ return currentBehavior;
+ }
+ LOG.info("Switching from state " + currentBehavior.state() + " to "
+ + state);
+
+ try {
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.error(e,
+ "Failed to close behavior : " + currentBehavior.state());
+ }
+
+ } else {
+ LOG.info("Switching behavior to " + state);
+ }
+ RaftActorBehavior behavior = null;
+ if (state == RaftState.Candidate) {
+ behavior = new Candidate(context);
+ } else if (state == RaftState.Follower) {
+ behavior = new Follower(context);
+ } else {
+ behavior = new Leader(context);
+ }
+ return behavior;
+ }
+
+ private void trimPersistentData(long sequenceNumber) {
+ // Trim snapshots
+ // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
+ // For now guessing that it is ANDed.
+ deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - 100000, 43200000));
+
+ // Trim journal
+ deleteMessages(sequenceNumber);
+ }
+
+
+ private class ReplicatedLogImpl implements ReplicatedLog {
+ private final List<ReplicatedLogEntry> journal;
+ private final Object snapshot;
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
+
+ public ReplicatedLogImpl(Snapshot snapshot) {
+ this.snapshot = snapshot.getState();
+ this.snapshotIndex = snapshot.getLastAppliedIndex();
+ this.snapshotTerm = snapshot.getLastAppliedTerm();
+
+ this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
+ }
+
+ public ReplicatedLogImpl() {
+ this.snapshot = null;
+ this.journal = new ArrayList<>();
+ }
+
+ @Override public ReplicatedLogEntry get(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return null;
+ }
+
+ return journal.get(adjustedIndex);
+ }
+
+ @Override public ReplicatedLogEntry last() {
+ if (journal.size() == 0) {
+ return null;
+ }
+ return get(journal.size() - 1);
+ }
+
+ @Override public long lastIndex() {
+ if (journal.size() == 0) {
+ return -1;
+ }
+
+ return last().getIndex();
+ }
+
+ @Override public long lastTerm() {
+ if (journal.size() == 0) {
+ return -1;
+ }
+
+ return last().getTerm();
+ }
+
+
+ @Override public void removeFrom(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return;
+ }
+
+ journal.subList(adjustedIndex , journal.size()).clear();
+ }
+
+
+ @Override public void removeFromAndPersist(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return;
+ }
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+ @Override public void apply(DeleteEntries param)
+ throws Exception {
+ //FIXME : Doing nothing for now
+ }
+ });
+
+
+ }
+
+ @Override public void append(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ journal.add(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return entries;
+ }
+
+
+ for (int i = adjustedIndex;
+ i < journal.size(); i++) {
+ entries.add(journal.get(i));
+ }
+ return entries;
+ }
+
+ @Override public void appendAndPersist(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(null, null, replicatedLogEntry);
+ }
+
+ public void appendAndPersist(final ActorRef clientActor,
+ final String identifier,
+ final ReplicatedLogEntry replicatedLogEntry) {
+ context.getLogger().debug(
+ "Append log entry and persist " + replicatedLogEntry);
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
+ persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
+ if (size() > 100000) {
+ ReplicatedLogEntry lastAppliedEntry =
+ get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ }
+
+ saveSnapshot(Snapshot.create(createSnapshot(),
+ getFrom(context.getLastApplied() + 1),
+ lastIndex(), lastTerm(), lastAppliedIndex,
+ lastAppliedTerm));
+ }
+ // Send message for replication
+ if (clientActor != null) {
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier,
+ replicatedLogEntry)
+ );
+ }
+ }
+ }
+ );
+ }
+
+ @Override public long size() {
+ return journal.size() + snapshotIndex + 1;
+ }
+
+ @Override public boolean isPresent(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override public boolean isInSnapshot(long index) {
+ return index <= snapshotIndex;
+ }
+
+ @Override public Object getSnapshot() {
+ return snapshot;
+ }
+
+ @Override public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ @Override public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ private int adjustedIndex(long index) {
+ if(snapshotIndex < 0){
+ return (int) index;
+ }
+ return (int) (index - snapshotIndex);
+ }
+ }
+
+
+ private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
+ Serializable {
+
+ private final long index;
+ private final long term;
+ private final Object payload;
+
+ public ReplicatedLogImplEntry(long index, long term, Object payload) {
+
+ this.index = index;
+ this.term = term;
+ this.payload = payload;
+ }
+
+ @Override public Object getData() {
+ return payload;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public long getIndex() {
+ return index;
+ }
+
+ @Override public String toString() {
+ return "Entry{" +
+ "index=" + index +
+ ", term=" + term +
+ '}';
+ }
+ }
+
+ private static class DeleteEntries implements Serializable {
+ private final int fromIndex;
+
+
+ public DeleteEntries(int fromIndex) {
+ this.fromIndex = fromIndex;
+ }
+
+ public int getFromIndex() {
+ return fromIndex;
+ }
+ }
+
+
+ private static class Snapshot implements Serializable {
+ private final Object state;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
+ private final long lastIndex;
+ private final long lastTerm;
+ private final long lastAppliedIndex;
+ private final long lastAppliedTerm;
+
+ private Snapshot(Object state,
+ List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+ long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+ this.state = state;
+ this.unAppliedEntries = unAppliedEntries;
+ this.lastIndex = lastIndex;
+ this.lastTerm = lastTerm;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+
+ public static Snapshot create(Object state,
+ List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ return new Snapshot(state, entries, lastIndex, lastTerm,
+ lastAppliedIndex, lastAppliedTerm);
+ }
+
+ public Object getState() {
+ return state;
+ }
+
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long getLastAppliedTerm() {
+ return lastAppliedTerm;
+ }
+ }
+
+ private class ElectionTermImpl implements ElectionTerm {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private long currentTerm = 0;
+ private String votedFor = null;
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override public void update(long currentTerm, String votedFor) {
+ LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ }
+
+ @Override
+ public void updateAndPersist(long currentTerm, String votedFor){
+ update(currentTerm, votedFor);
+ // FIXME : Maybe first persist then update the state
+ persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+
+ @Override public void apply(UpdateElectionTerm param)
+ throws Exception {
+
+ }
+ });
+ }
+ }
+
+ private static class UpdateElectionTerm implements Serializable {
+ private final long currentTerm;
+ private final String votedFor;
+
+ public UpdateElectionTerm(long currentTerm, String votedFor) {
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ }
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.event.LoggingAdapter;
+
+import java.util.Map;
+
+/**
+ * The RaftActorContext contains that portion of the RaftActors state that
+ * needs to be shared with it's behaviors. A RaftActorContext should NEVER be
+ * used in any actor context outside the RaftActor that constructed it.
+ */
+public interface RaftActorContext {
+ /**
+ * Create a new local actor
+ * @param props
+ * @return
+ */
+ ActorRef actorOf(Props props);
+
+ /**
+ * Create a actor selection
+ * @param path
+ * @return
+ */
+ ActorSelection actorSelection(String path);
+
+ /**
+ * Get the identifier for the RaftActor. This identifier represents the
+ * name of the actor whose common state is being shared. For example the
+ * id could be 'inventory'
+ * @return the identifier
+ */
+ String getId();
+
+ /**
+ * A reference to the RaftActor itself. This could be used to send messages
+ * to the RaftActor
+ * @return
+ */
+ ActorRef getActor();
+
+ /**
+ * Get the ElectionTerm information
+ * @return
+ */
+ ElectionTerm getTermInformation();
+
+ /**
+ * index of highest log entry known to be
+ * committed (initialized to 0, increases
+ * monotonically)
+ * @return
+ */
+ long getCommitIndex();
+
+
+ /**
+ *
+ */
+ void setCommitIndex(long commitIndex);
+
+ /**
+ * index of highest log entry applied to state
+ * machine (initialized to 0, increases
+ * monotonically)
+ * @return
+ */
+ long getLastApplied();
+
+
+ /**
+ *
+ */
+ void setLastApplied(long lastApplied);
+
+ /**
+ * @return A representation of the log
+ */
+ ReplicatedLog getReplicatedLog();
+
+ /**
+ * @return The ActorSystem associated with this context
+ */
+ ActorSystem getActorSystem();
+
+ /**
+ *
+ * @return
+ */
+ LoggingAdapter getLogger();
+
+ /**
+ * Get a mapping of peer id's their addresses
+ * @return
+ */
+ Map<String, String> getPeerAddresses();
+
+ /**
+ *
+ * @param peerId
+ * @return
+ */
+ String getPeerAddress(String peerId);
+
+ /**
+ * Add to actor peers
+ * @param name
+ * @param address
+ */
+ void addToPeers(String name, String address);
+
+ /**
+ *
+ * @param name
+ */
+ public void removePeer(String name);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActorContext;
+import akka.event.LoggingAdapter;
+
+import java.util.Map;
+
+public class RaftActorContextImpl implements RaftActorContext{
+
+ private final ActorRef actor;
+
+ private final UntypedActorContext context;
+
+ private final String id;
+
+ private final ElectionTerm termInformation;
+
+ private long commitIndex;
+
+ private long lastApplied;
+
+ private final ReplicatedLog replicatedLog;
+
+ private final Map<String, String> peerAddresses;
+
+ private final LoggingAdapter LOG;
+
+
+ public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
+ String id,
+ ElectionTerm termInformation, long commitIndex,
+ long lastApplied, ReplicatedLog replicatedLog, Map<String, String> peerAddresses, LoggingAdapter logger) {
+ this.actor = actor;
+ this.context = context;
+ this.id = id;
+ this.termInformation = termInformation;
+ this.commitIndex = commitIndex;
+ this.lastApplied = lastApplied;
+ this.replicatedLog = replicatedLog;
+ this.peerAddresses = peerAddresses;
+ this.LOG = logger;
+ }
+
+ public ActorRef actorOf(Props props){
+ return context.actorOf(props);
+ }
+
+ public ActorSelection actorSelection(String path){
+ return context.actorSelection(path);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public ActorRef getActor() {
+ return actor;
+ }
+
+ public ElectionTerm getTermInformation() {
+ return termInformation;
+ }
+
+ public long getCommitIndex() {
+ return commitIndex;
+ }
+
+ @Override public void setCommitIndex(long commitIndex) {
+ this.commitIndex = commitIndex;
+ }
+
+ public long getLastApplied() {
+ return lastApplied;
+ }
+
+ @Override public void setLastApplied(long lastApplied) {
+ this.lastApplied = lastApplied;
+ }
+
+ @Override public ReplicatedLog getReplicatedLog() {
+ return replicatedLog;
+ }
+
+ @Override public ActorSystem getActorSystem() {
+ return context.system();
+ }
+
+ @Override public LoggingAdapter getLogger() {
+ return this.LOG;
+ }
+
+ @Override public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ @Override public String getPeerAddress(String peerId) {
+ return peerAddresses.get(peerId);
+ }
+
+ @Override public void addToPeers(String name, String address) {
+ LOG.debug("Kamal--> addToPeer for:"+name);
+ peerAddresses.put(name, address);
+ }
+
+ @Override public void removePeer(String name) {
+ LOG.debug("Kamal--> removePeer for:"+name);
+ peerAddresses.remove(name);
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+public enum RaftState {
+ Candidate,
+ Follower,
+ Leader
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.List;
+
+/**
+ * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor
+ */
+public interface ReplicatedLog {
+ /**
+ * Get a replicated log entry at the specified index
+ *
+ * @param index the index of the log entry
+ * @return the ReplicatedLogEntry at index. null if index less than 0 or
+ * greater than the size of the in-memory journal.
+ */
+ ReplicatedLogEntry get(long index);
+
+
+ /**
+ * Get the last replicated log entry
+ *
+ * @return
+ */
+ ReplicatedLogEntry last();
+
+ /**
+ *
+ * @return
+ */
+ long lastIndex();
+
+ /**
+ *
+ * @return
+ */
+ long lastTerm();
+
+ /**
+ * To be called when we need to remove entries from the in-memory log.
+ * This method will remove all entries >= index. This method should be used
+ * during recovery to appropriately trim the log based on persisted
+ * information
+ *
+ * @param index the index of the log entry
+ */
+ void removeFrom(long index);
+
+
+ /**
+ * To be called when we need to remove entries from the in-memory log and we
+ * need that information persisted to disk. This method will remove all
+ * entries >= index.
+ * <p>
+ * The persisted information would then be used during recovery to properly
+ * reconstruct the state of the in-memory replicated log
+ *
+ * @param index the index of the log entry
+ */
+ void removeFromAndPersist(long index);
+
+ /**
+ * Append an entry to the log
+ * @param replicatedLogEntry
+ */
+ void append(ReplicatedLogEntry replicatedLogEntry);
+
+ /**
+ *
+ * @param replicatedLogEntry
+ */
+ void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
+
+ /**
+ *
+ * @param index the index of the log entry
+ */
+ List<ReplicatedLogEntry> getFrom(long index);
+
+
+ /**
+ *
+ * @return
+ */
+ long size();
+
+ /**
+ * Checks if the entry at the specified index is present or not
+ *
+ * @param index the index of the log entry
+ * @return true if the entry is present in the in-memory journal
+ */
+ boolean isPresent(long index);
+
+ /**
+ * Checks if the entry is present in a snapshot
+ *
+ * @param index the index of the log entry
+ * @return true if the entry is in the snapshot. false if the entry is not
+ * in the snapshot even if the entry may be present in the replicated log
+ */
+ boolean isInSnapshot(long index);
+
+ /**
+ * Get the snapshot
+ *
+ * @return an object representing the snapshot if it exists. null otherwise
+ */
+ Object getSnapshot();
+
+ /**
+ * Get the index of the snapshot
+ *
+ * @return the index from which the snapshot was created. -1 otherwise.
+ */
+ long getSnapshotIndex();
+
+ /**
+ * Get the term of the snapshot
+ *
+ * @return the term of the index from which the snapshot was created. -1
+ * otherwise
+ */
+ long getSnapshotTerm();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Represents one entry in the replicated log
+ */
+public interface ReplicatedLogEntry {
+ /**
+ * The data stored in that entry
+ *
+ * @return
+ */
+ Object getData();
+
+ /**
+ * The term stored in that entry
+ *
+ * @return
+ */
+ long getTerm();
+
+ /**
+ * The index of the entry
+ *
+ * @return
+ */
+ long getIndex();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstract class that represents the behavior of a RaftActor
+ * <p/>
+ * All Servers:
+ * <ul>
+ * <li> If commitIndex > lastApplied: increment lastApplied, apply
+ * log[lastApplied] to state machine (§5.3)
+ * <li> If RPC request or response contains term T > currentTerm:
+ * set currentTerm = T, convert to follower (§5.1)
+ */
+public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+
+ /**
+ * Information about the RaftActor whose behavior this class represents
+ */
+ protected final RaftActorContext context;
+
+ /**
+ * The maximum election time variance
+ */
+ private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+ /**
+ * The interval at which a heart beat message will be sent to the remote
+ * RaftActor
+ * <p/>
+ * Since this is set to 100 milliseconds the Election timeout should be
+ * at least 200 milliseconds
+ */
+ protected static final FiniteDuration HEART_BEAT_INTERVAL =
+ new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
+ /**
+ * The interval in which a new election would get triggered if no leader is found
+ */
+ private static final long ELECTION_TIME_INTERVAL =
+ HEART_BEAT_INTERVAL.toMillis() * 2;
+
+ /**
+ *
+ */
+ private Cancellable electionCancel = null;
+
+ /**
+ *
+ */
+ protected String leaderId = null;
+
+
+ protected AbstractRaftActorBehavior(RaftActorContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Derived classes should not directly handle AppendEntries messages it
+ * should let the base class handle it first. Once the base class handles
+ * the AppendEntries message and does the common actions that are applicable
+ * in all RaftState's it will delegate the handling of the AppendEntries
+ * message to the derived class to do more state specific handling by calling
+ * this method
+ *
+ * @param sender The actor that sent this message
+ * @param appendEntries The AppendEntries message
+ * @return
+ */
+ protected abstract RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries);
+
+
+ /**
+ * appendEntries first processes the AppendEntries message and then
+ * delegates handling to a specific behavior
+ *
+ * @param sender
+ * @param appendEntries
+ * @return
+ */
+ protected RaftState appendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ // 1. Reply false if term < currentTerm (§5.1)
+ if (appendEntries.getTerm() < currentTerm()) {
+ context.getLogger().debug(
+ "Cannot append entries because sender term " + appendEntries
+ .getTerm() + " is less than " + currentTerm());
+ sender.tell(
+ new AppendEntriesReply(context.getId(), currentTerm(), false,
+ lastIndex(), lastTerm()), actor()
+ );
+ return state();
+ }
+
+
+ return handleAppendEntries(sender, appendEntries);
+ }
+
+ /**
+ * Derived classes should not directly handle AppendEntriesReply messages it
+ * should let the base class handle it first. Once the base class handles
+ * the AppendEntriesReply message and does the common actions that are
+ * applicable in all RaftState's it will delegate the handling of the
+ * AppendEntriesReply message to the derived class to do more state specific
+ * handling by calling this method
+ *
+ * @param sender The actor that sent this message
+ * @param appendEntriesReply The AppendEntriesReply message
+ * @return
+ */
+ protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply);
+
+ /**
+ * requestVote handles the RequestVote message. This logic is common
+ * for all behaviors
+ *
+ * @param sender
+ * @param requestVote
+ * @return
+ */
+ protected RaftState requestVote(ActorRef sender,
+ RequestVote requestVote) {
+
+ boolean grantVote = false;
+
+ // Reply false if term < currentTerm (§5.1)
+ if (requestVote.getTerm() < currentTerm()) {
+ grantVote = false;
+
+ // If votedFor is null or candidateId, and candidate’s log is at
+ // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
+ } else if (votedFor() == null || votedFor()
+ .equals(requestVote.getCandidateId())) {
+
+ boolean candidateLatest = false;
+
+ // From §5.4.1
+ // Raft determines which of two logs is more up-to-date
+ // by comparing the index and term of the last entries in the
+ // logs. If the logs have last entries with different terms, then
+ // the log with the later term is more up-to-date. If the logs
+ // end with the same term, then whichever log is longer is
+ // more up-to-date.
+ if (requestVote.getLastLogTerm() > lastTerm()) {
+ candidateLatest = true;
+ } else if ((requestVote.getLastLogTerm() == lastTerm())
+ && requestVote.getLastLogIndex() >= lastIndex()) {
+ candidateLatest = true;
+ }
+
+ if (candidateLatest) {
+ grantVote = true;
+ context.getTermInformation().updateAndPersist(requestVote.getTerm(),
+ requestVote.getCandidateId());
+ }
+ }
+
+ sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+
+ return state();
+ }
+
+ /**
+ * Derived classes should not directly handle RequestVoteReply messages it
+ * should let the base class handle it first. Once the base class handles
+ * the RequestVoteReply message and does the common actions that are
+ * applicable in all RaftState's it will delegate the handling of the
+ * RequestVoteReply message to the derived class to do more state specific
+ * handling by calling this method
+ *
+ * @param sender The actor that sent this message
+ * @param requestVoteReply The RequestVoteReply message
+ * @return
+ */
+ protected abstract RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply);
+
+ /**
+ * Creates a random election duration
+ *
+ * @return
+ */
+ protected FiniteDuration electionDuration() {
+ long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
+ return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * stop the scheduled election
+ */
+ protected void stopElection() {
+ if (electionCancel != null && !electionCancel.isCancelled()) {
+ electionCancel.cancel();
+ }
+ }
+
+ /**
+ * schedule a new election
+ *
+ * @param interval
+ */
+ protected void scheduleElection(FiniteDuration interval) {
+ stopElection();
+
+ // Schedule an election. When the scheduler triggers an ElectionTimeout
+ // message is sent to itself
+ electionCancel =
+ context.getActorSystem().scheduler().scheduleOnce(interval,
+ context.getActor(), new ElectionTimeout(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+ /**
+ * Get the current term
+ * @return
+ */
+ protected long currentTerm() {
+ return context.getTermInformation().getCurrentTerm();
+ }
+
+ /**
+ * Get the candidate for whom we voted in the current term
+ * @return
+ */
+ protected String votedFor() {
+ return context.getTermInformation().getVotedFor();
+ }
+
+ /**
+ * Get the actor associated with this behavior
+ * @return
+ */
+ protected ActorRef actor() {
+ return context.getActor();
+ }
+
+ /**
+ * Get the term from the last entry in the log
+ *
+ * @return
+ */
+ protected long lastTerm() {
+ return context.getReplicatedLog().lastTerm();
+ }
+
+ /**
+ * Get the index from the last entry in the log
+ *
+ * @return
+ */
+ protected long lastIndex() {
+ return context.getReplicatedLog().lastIndex();
+ }
+
+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+ /**
+ * Find the log index from the previous to last entry in the log
+ *
+ * @return
+ */
+ protected long prevLogIndex(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getIndex();
+ }
+ return -1;
+ }
+
+ /**
+ * Find the log term from the previous to last entry in the log
+ * @return
+ */
+ protected long prevLogTerm(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getTerm();
+ }
+ return -1;
+ }
+
+ /**
+ * Apply the provided index to the state machine
+ *
+ * @param index a log index that is known to be committed
+ */
+ protected void applyLogToStateMachine(long index) {
+ // Now maybe we apply to the state machine
+ for (long i = context.getLastApplied() + 1;
+ i < index + 1; i++) {
+ ActorRef clientActor = null;
+ String identifier = null;
+ ClientRequestTracker tracker = findClientRequestTracker(i);
+
+ if (tracker != null) {
+ clientActor = tracker.getClientActor();
+ identifier = tracker.getIdentifier();
+ }
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(i);
+
+ if (replicatedLogEntry != null) {
+ actor().tell(new ApplyState(clientActor, identifier,
+ replicatedLogEntry), actor());
+ } else {
+ context.getLogger().error(
+ "Missing index " + i + " from log. Cannot apply state.");
+ }
+ }
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
+ context.setLastApplied(index);
+ }
+
+ @Override
+ public RaftState handleMessage(ActorRef sender, Object message) {
+ if (message instanceof AppendEntries) {
+ return appendEntries(sender, (AppendEntries) message);
+ } else if (message instanceof AppendEntriesReply) {
+ return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
+ } else if (message instanceof RequestVote) {
+ return requestVote(sender, (RequestVote) message);
+ } else if (message instanceof RequestVoteReply) {
+ return handleRequestVoteReply(sender, (RequestVoteReply) message);
+ }
+ return state();
+ }
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The behavior of a RaftActor when it is in the CandidateState
+ * <p/>
+ * Candidates (§5.2):
+ * <ul>
+ * <li> On conversion to candidate, start election:
+ * <ul>
+ * <li> Increment currentTerm
+ * <li> Vote for self
+ * <li> Reset election timer
+ * <li> Send RequestVote RPCs to all other servers
+ * </ul>
+ * <li> If votes received from majority of servers: become leader
+ * <li> If AppendEntries RPC received from new leader: convert to
+ * follower
+ * <li> If election timeout elapses: start new election
+ * </ul>
+ */
+public class Candidate extends AbstractRaftActorBehavior {
+
+ private final Map<String, ActorSelection> peerToActor = new HashMap<>();
+
+ private int voteCount;
+
+ private final int votesRequired;
+
+ public Candidate(RaftActorContext context) {
+ super(context);
+
+ Collection<String> peerPaths = context.getPeerAddresses().values();
+
+ for (String peerPath : peerPaths) {
+ peerToActor.put(peerPath,
+ context.actorSelection(peerPath));
+ }
+
+ context.getLogger().debug("Election:Candidate has following peers:"+peerToActor.keySet());
+ if(peerPaths.size() > 0) {
+ // Votes are required from a majority of the peers including self.
+ // The votesRequired field therefore stores a calculated value
+ // of the number of votes required for this candidate to win an
+ // election based on it's known peers.
+ // If a peer was added during normal operation and raft replicas
+ // came to know about them then the new peer would also need to be
+ // taken into consideration when calculating this value.
+ // Here are some examples for what the votesRequired would be for n
+ // peers
+ // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
+ // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
+ // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
+ int noOfPeers = peerPaths.size();
+ int self = 1;
+ votesRequired = (noOfPeers + self) / 2 + 1;
+ } else {
+ votesRequired = 0;
+ }
+
+ startNewTerm();
+ scheduleElection(electionDuration());
+ }
+
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ return state();
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply) {
+
+ return state();
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply) {
+
+ if (requestVoteReply.isVoteGranted()) {
+ voteCount++;
+ }
+
+ if (voteCount >= votesRequired) {
+ return RaftState.Leader;
+ }
+
+ return state();
+ }
+
+ @Override public RaftState state() {
+ return RaftState.Candidate;
+ }
+
+ @Override
+ public RaftState handleMessage(ActorRef sender, Object message) {
+
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ }
+
+ if (message instanceof ElectionTimeout) {
+ if (votesRequired == 0) {
+ // If there are no peers then we should be a Leader
+ // We wait for the election timeout to occur before declare
+ // ourselves the leader. This gives enough time for a leader
+ // who we do not know about (as a peer)
+ // to send a message to the candidate
+ return RaftState.Leader;
+ }
+ startNewTerm();
+ scheduleElection(electionDuration());
+ return state();
+ }
+ return super.handleMessage(sender, message);
+ }
+
+
+ private void startNewTerm() {
+
+
+ // set voteCount back to 1 (that is voting for self)
+ voteCount = 1;
+
+ // Increment the election term and vote for self
+ long currentTerm = context.getTermInformation().getCurrentTerm();
+ context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId());
+
+ context.getLogger().debug("Starting new term " + (currentTerm+1));
+
+ // Request for a vote
+ // TODO: Retry request for vote if replies do not arrive in a reasonable
+ // amount of time TBD
+ for (ActorSelection peerActor : peerToActor.values()) {
+ peerActor.tell(new RequestVote(
+ context.getTermInformation().getCurrentTerm(),
+ context.getId(),
+ context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().lastTerm()),
+ context.getActor()
+ );
+ }
+
+
+ }
+
+ @Override public void close() throws Exception {
+ stopElection();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+
+/**
+ * The behavior of a RaftActor in the Follower state
+ * <p/>
+ * <ul>
+ * <li> Respond to RPCs from candidates and leaders
+ * <li> If election timeout elapses without receiving AppendEntries
+ * RPC from current leader or granting vote to candidate:
+ * convert to candidate
+ * </ul>
+ */
+public class Follower extends AbstractRaftActorBehavior {
+ public Follower(RaftActorContext context) {
+ super(context);
+
+ scheduleElection(electionDuration());
+ }
+
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ // TODO : Refactor this method into a bunch of smaller methods
+ // to make it easier to read. Before refactoring ensure tests
+ // cover the code properly
+
+ // 1. Reply false if term < currentTerm (§5.1)
+ // This is handled in the appendEntries method of the base class
+
+ // If we got here then we do appear to be talking to the leader
+ leaderId = appendEntries.getLeaderId();
+
+ // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+ // whose term matches prevLogTerm (§5.3)
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(appendEntries.getPrevLogIndex());
+
+
+ boolean outOfSync = true;
+
+ // First check if the logs are in sync or not
+ if (lastIndex() == -1
+ && appendEntries.getPrevLogIndex() != -1) {
+
+ // The follower's log is out of sync because the leader does have
+ // an entry at prevLogIndex and this follower has no entries in
+ // it's log.
+
+ context.getLogger().debug(
+ "The followers log is empty and the senders prevLogIndex is {}",
+ appendEntries.getPrevLogIndex());
+
+ } else if (lastIndex() > -1
+ && appendEntries.getPrevLogIndex() != -1
+ && previousEntry == null) {
+
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry was not found in it's log
+
+ context.getLogger().debug(
+ "The log is not empty but the prevLogIndex {} was not found in it",
+ appendEntries.getPrevLogIndex());
+
+ } else if (lastIndex() > -1
+ && previousEntry != null
+ && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+
+ // The follower's log is out of sync because the Leader's
+ // prevLogIndex entry does exist in the follower's log but it has
+ // a different term in it
+
+ context.getLogger().debug(
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , previousEntry.getTerm()
+ , appendEntries.getPrevLogTerm());
+ } else {
+ outOfSync = false;
+ }
+
+ if (outOfSync) {
+ // We found that the log was out of sync so just send a negative
+ // reply and return
+ sender.tell(
+ new AppendEntriesReply(context.getId(), currentTerm(), false,
+ lastIndex(), lastTerm()), actor()
+ );
+ return state();
+ }
+
+ if (appendEntries.getEntries() != null
+ && appendEntries.getEntries().size() > 0) {
+ context.getLogger().debug(
+ "Number of entries to be appended = " + appendEntries
+ .getEntries().size()
+ );
+
+ // 3. If an existing entry conflicts with a new one (same index
+ // but different terms), delete the existing entry and all that
+ // follow it (§5.3)
+ int addEntriesFrom = 0;
+ if (context.getReplicatedLog().size() > 0) {
+
+ // Find the entry up until which the one that is not in the
+ // follower's log
+ for (int i = 0;
+ i < appendEntries.getEntries()
+ .size(); i++, addEntriesFrom++) {
+ ReplicatedLogEntry matchEntry =
+ appendEntries.getEntries().get(i);
+ ReplicatedLogEntry newEntry = context.getReplicatedLog()
+ .get(matchEntry.getIndex());
+
+ if (newEntry == null) {
+ //newEntry not found in the log
+ break;
+ }
+
+ if (newEntry.getTerm() == matchEntry
+ .getTerm()) {
+ continue;
+ }
+
+ context.getLogger().debug(
+ "Removing entries from log starting at "
+ + matchEntry.getIndex()
+ );
+
+ // Entries do not match so remove all subsequent entries
+ context.getReplicatedLog()
+ .removeFromAndPersist(matchEntry.getIndex());
+ break;
+ }
+ }
+
+ context.getLogger().debug(
+ "After cleanup entries to be added from = " + (addEntriesFrom
+ + lastIndex())
+ );
+
+ // 4. Append any new entries not already in the log
+ for (int i = addEntriesFrom;
+ i < appendEntries.getEntries().size(); i++) {
+
+ context.getLogger().debug(
+ "Append entry to log " + appendEntries.getEntries().get(i).getData()
+ .toString()
+ );
+ context.getReplicatedLog()
+ .appendAndPersist(appendEntries.getEntries().get(i));
+ }
+
+ context.getLogger().debug(
+ "Log size is now " + context.getReplicatedLog().size());
+ }
+
+
+ // 5. If leaderCommit > commitIndex, set commitIndex =
+ // min(leaderCommit, index of last new entry)
+
+ long prevCommitIndex = context.getCommitIndex();
+
+ context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+ context.getReplicatedLog().lastIndex()));
+
+ if (prevCommitIndex != context.getCommitIndex()) {
+ context.getLogger()
+ .debug("Commit index set to " + context.getCommitIndex());
+ }
+
+ // If commitIndex > lastApplied: increment lastApplied, apply
+ // log[lastApplied] to state machine (§5.3)
+ if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+ applyLogToStateMachine(appendEntries.getLeaderCommit());
+ }
+
+ sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm()), actor());
+
+ return state();
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply) {
+ return state();
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply) {
+ return state();
+ }
+
+ @Override public RaftState state() {
+ return RaftState.Follower;
+ }
+
+ @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ }
+ }
+
+ if (message instanceof ElectionTimeout) {
+ return RaftState.Candidate;
+ } else if (message instanceof InstallSnapshot) {
+ InstallSnapshot snapshot = (InstallSnapshot) message;
+ actor().tell(new ApplySnapshot(snapshot), actor());
+ }
+
+ scheduleElection(electionDuration());
+
+ return super.handleMessage(sender, message);
+ }
+
+ @Override public void close() throws Exception {
+ stopElection();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The behavior of a RaftActor when it is in the Leader state
+ * <p/>
+ * Leaders:
+ * <ul>
+ * <li> Upon election: send initial empty AppendEntries RPCs
+ * (heartbeat) to each server; repeat during idle periods to
+ * prevent election timeouts (§5.2)
+ * <li> If command received from client: append entry to local log,
+ * respond after entry applied to state machine (§5.3)
+ * <li> If last log index ≥ nextIndex for a follower: send
+ * AppendEntries RPC with log entries starting at nextIndex
+ * <ul>
+ * <li> If successful: update nextIndex and matchIndex for
+ * follower (§5.3)
+ * <li> If AppendEntries fails because of log inconsistency:
+ * decrement nextIndex and retry (§5.3)
+ * </ul>
+ * <li> If there exists an N such that N > commitIndex, a majority
+ * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ * set commitIndex = N (§5.3, §5.4).
+ */
+public class Leader extends AbstractRaftActorBehavior {
+
+
+ private final Map<String, FollowerLogInformation> followerToLog =
+ new HashMap();
+
+ private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+
+ private Cancellable heartbeatSchedule = null;
+ private Cancellable appendEntriesSchedule = null;
+ private Cancellable installSnapshotSchedule = null;
+
+ private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+ private final int minReplicationCount;
+
+ public Leader(RaftActorContext context) {
+ super(context);
+
+ if (lastIndex() >= 0) {
+ context.setCommitIndex(lastIndex());
+ }
+
+ for (String followerId : context.getPeerAddresses().keySet()) {
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(followerId,
+ new AtomicLong(lastIndex()),
+ new AtomicLong(-1));
+
+ followerToActor.put(followerId,
+ context.actorSelection(context.getPeerAddress(followerId)));
+
+ followerToLog.put(followerId, followerLogInformation);
+ }
+
+ context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
+
+ if (followerToActor.size() > 0) {
+ minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+ } else {
+ minReplicationCount = 0;
+ }
+
+
+ // Immediately schedule a heartbeat
+ // Upon election: send initial empty AppendEntries RPCs
+ // (heartbeat) to each server; repeat during idle periods to
+ // prevent election timeouts (§5.2)
+ scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+
+ scheduleInstallSnapshotCheck(
+ new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
+ HEART_BEAT_INTERVAL.unit())
+ );
+
+ }
+
+ @Override protected RaftState handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ return state();
+ }
+
+ @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply) {
+
+ // Update the FollowerLogInformation
+ String followerId = appendEntriesReply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+ if (appendEntriesReply.isSuccess()) {
+ followerLogInformation
+ .setMatchIndex(appendEntriesReply.getLogLastIndex());
+ followerLogInformation
+ .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+ } else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
+ followerLogInformation.decrNextIndex();
+ }
+
+ // Now figure out if this reply warrants a change in the commitIndex
+ // If there exists an N such that N > commitIndex, a majority
+ // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
+ for (long N = context.getCommitIndex() + 1; ; N++) {
+ int replicatedCount = 1;
+
+ for (FollowerLogInformation info : followerToLog.values()) {
+ if (info.getMatchIndex().get() >= N) {
+ replicatedCount++;
+ }
+ }
+
+ if (replicatedCount >= minReplicationCount) {
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(N);
+ if (replicatedLogEntry != null
+ && replicatedLogEntry.getTerm()
+ == currentTerm()) {
+ context.setCommitIndex(N);
+ }
+ } else {
+ break;
+ }
+ }
+
+ // Apply the change to the state machine
+ if (context.getCommitIndex() > context.getLastApplied()) {
+ applyLogToStateMachine(context.getCommitIndex());
+ }
+
+ return state();
+ }
+
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ for (ClientRequestTracker tracker : trackerList) {
+ if (tracker.getIndex() == logIndex) {
+ return tracker;
+ }
+ }
+
+ return null;
+ }
+
+ @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply) {
+ return state();
+ }
+
+ @Override public RaftState state() {
+ return RaftState.Leader;
+ }
+
+ @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
+
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ return RaftState.Follower;
+ }
+ }
+
+ try {
+ if (message instanceof SendHeartBeat) {
+ return sendHeartBeat();
+ } else if(message instanceof SendInstallSnapshot) {
+ installSnapshotIfNeeded();
+ } else if (message instanceof Replicate) {
+ replicate((Replicate) message);
+ } else if (message instanceof InstallSnapshotReply){
+ handleInstallSnapshotReply(
+ (InstallSnapshotReply) message);
+ }
+ } finally {
+ scheduleHeartBeat(HEART_BEAT_INTERVAL);
+ }
+
+ return super.handleMessage(sender, message);
+ }
+
+ private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+ InstallSnapshotReply reply = message;
+ String followerId = reply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ followerLogInformation
+ .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation
+ .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+ }
+
+ private void replicate(Replicate replicate) {
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+ context.getLogger().debug("Replicate message " + logIndex);
+
+ if (followerToActor.size() == 0) {
+ context.setCommitIndex(
+ replicate.getReplicatedLogEntry().getIndex());
+
+ context.getActor()
+ .tell(new ApplyState(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ replicate.getReplicatedLogEntry()),
+ context.getActor()
+ );
+ } else {
+
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
+ sendAppendEntries();
+ }
+ }
+
+ private void sendAppendEntries() {
+ // Send an AppendEntries to all followers
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ if(context.getReplicatedLog().isPresent(nextIndex)){
+ // TODO: Instead of sending all entries from nextIndex
+ // only send a fixed number of entries to each follower
+ // This is to avoid the situation where there are a lot of
+ // entries to install for a fresh follower or to a follower
+ // that has fallen too far behind with the log but yet is not
+ // eligible to receive a snapshot
+ entries =
+ context.getReplicatedLog().getFrom(nextIndex);
+ }
+
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(nextIndex), prevLogTerm(nextIndex),
+ entries, context.getCommitIndex()
+ ),
+ actor()
+ );
+ }
+ }
+
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ */
+ private void installSnapshotIfNeeded(){
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().getSnapshot()
+ ),
+ actor()
+ );
+ }
+ }
+ }
+
+ private RaftState sendHeartBeat() {
+ if (followerToActor.size() > 0) {
+ sendAppendEntries();
+ }
+ return state();
+ }
+
+ private void stopHeartBeat() {
+ if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+ heartbeatSchedule.cancel();
+ }
+ }
+
+ private void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
+ }
+ }
+
+ private void scheduleHeartBeat(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopHeartBeat();
+
+ // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
+ // message is sent to itself.
+ // Scheduling the heartbeat only once here because heartbeats do not
+ // need to be sent if there are other messages being sent to the remote
+ // actor.
+ heartbeatSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new SendHeartBeat(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+
+ private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopInstallSnapshotSchedule();
+
+ // Schedule a message to send append entries to followers that can
+ // accept an append entries with some data in it
+ installSnapshotSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new SendInstallSnapshot(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+
+
+ @Override public void close() throws Exception {
+ stopHeartBeat();
+ }
+
+ @Override public String getLeaderId() {
+ return context.getId();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * A RaftActorBehavior represents the specific behavior of a RaftActor
+ * <p>
+ * A RaftActor can behave as one of the following,
+ * <ul>
+ * <li> Follower </li>
+ * <li> Candidate </li>
+ * <li> Leader </li>
+ * </ul>
+ * <p>
+ * In each of these behaviors the Raft Actor handles the same Raft messages
+ * differently.
+ */
+public interface RaftActorBehavior extends AutoCloseable{
+ /**
+ * Handle a message. If the processing of the message warrants a state
+ * change then a new state should be returned otherwise this method should
+ * return the state for the current behavior.
+ *
+ * @param sender The sender of the message
+ * @param message A message that needs to be processed
+ *
+ * @return The new state or self (this)
+ */
+ RaftState handleMessage(ActorRef sender, Object message);
+
+ /**
+ * The state associated with a given behavior
+ *
+ * @return
+ */
+ RaftState state();
+
+ /**
+ *
+ * @return
+ */
+ String getLeaderId();
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Created by kramesha on 7/17/14.
+ */
+public class AddRaftPeer {
+
+ private String name;
+ private String address;
+
+ public AddRaftPeer(String name, String address) {
+ this.name = name;
+ this.address = address;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+public class FindLeader {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+public class FindLeaderReply {
+ private final String leaderActor;
+
+ public FindLeaderReply(String leaderActor) {
+ this.leaderActor = leaderActor;
+ }
+
+ public String getLeaderActor() {
+ return leaderActor;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Created by kramesha on 7/17/14.
+ */
+public class RemoveRaftPeer {
+ private String name;
+
+ public RemoveRaftPeer(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+public class ApplySnapshot {
+ private final Object snapshot;
+
+ public ApplySnapshot(Object snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ public Object getSnapshot() {
+ return snapshot;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+public class ApplyState {
+ private final ActorRef clientActor;
+ private final String identifier;
+ private final ReplicatedLogEntry replicatedLogEntry;
+
+ public ApplyState(ActorRef clientActor, String identifier,
+ ReplicatedLogEntry replicatedLogEntry) {
+ this.clientActor = clientActor;
+ this.identifier = identifier;
+ this.replicatedLogEntry = replicatedLogEntry;
+ }
+
+ public ActorRef getClientActor() {
+ return clientActor;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public ReplicatedLogEntry getReplicatedLogEntry() {
+ return replicatedLogEntry;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Message sent to commit an entry to the log
+ */
+public class CommitEntry {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+public class ElectionTimeout {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Message sent to Persist an entry into the transaction journal
+ */
+public class PersistEntry {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+public class Replicate {
+ private final ActorRef clientActor;
+ private final String identifier;
+ private final ReplicatedLogEntry replicatedLogEntry;
+
+ public Replicate(ActorRef clientActor, String identifier,
+ ReplicatedLogEntry replicatedLogEntry) {
+
+ this.clientActor = clientActor;
+ this.identifier = identifier;
+ this.replicatedLogEntry = replicatedLogEntry;
+ }
+
+ public ActorRef getClientActor() {
+ return clientActor;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public ReplicatedLogEntry getReplicatedLogEntry() {
+ return replicatedLogEntry;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * This message is sent by a RaftActor to itself so that a subclass can process
+ * it and use it to save it's state
+ */
+public class SaveSnapshot {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * This messages is sent to the Leader to prompt it to send a heartbeat
+ * to it's followers.
+ *
+ * Typically the Leader to itself on a schedule
+ */
+public class SendHeartBeat {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+public class SendInstallSnapshot {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class AbstractRaftRPC implements RaftRPC {
+ // term
+ protected long term;
+
+ protected AbstractRaftRPC(long term){
+ this.term = term;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+import java.util.List;
+
+/**
+ * Invoked by leader to replicate log entries (§5.3); also used as
+ * heartbeat (§5.2).
+ */
+public class AppendEntries extends AbstractRaftRPC {
+ // So that follower can redirect clients
+ private final String leaderId;
+
+ // Index of log entry immediately preceding new ones
+ private final long prevLogIndex;
+
+ // term of prevLogIndex entry
+ private final long prevLogTerm;
+
+ // log entries to store (empty for heartbeat;
+ // may send more than one for efficiency)
+ private final List<ReplicatedLogEntry> entries;
+
+ // leader's commitIndex
+ private final long leaderCommit;
+
+ public AppendEntries(long term, String leaderId, long prevLogIndex,
+ long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+ super(term);
+ this.leaderId = leaderId;
+ this.prevLogIndex = prevLogIndex;
+ this.prevLogTerm = prevLogTerm;
+ this.entries = entries;
+ this.leaderCommit = leaderCommit;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ public long getPrevLogIndex() {
+ return prevLogIndex;
+ }
+
+ public long getPrevLogTerm() {
+ return prevLogTerm;
+ }
+
+ public List<ReplicatedLogEntry> getEntries() {
+ return entries;
+ }
+
+ public long getLeaderCommit() {
+ return leaderCommit;
+ }
+
+ @Override public String toString() {
+ return "AppendEntries{" +
+ "leaderId='" + leaderId + '\'' +
+ ", prevLogIndex=" + prevLogIndex +
+ ", prevLogTerm=" + prevLogTerm +
+ ", entries=" + entries +
+ ", leaderCommit=" + leaderCommit +
+ '}';
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+/**
+ * Reply for the AppendEntriesRpc message
+ */
+public class AppendEntriesReply extends AbstractRaftRPC{
+
+ // true if follower contained entry matching
+ // prevLogIndex and prevLogTerm
+ private final boolean success;
+
+ // The index of the last entry in the followers log
+ // This will be used to set the matchIndex for the follower on the
+ // Leader
+ private final long logLastIndex;
+
+ private final long logLastTerm;
+
+ // The followerId - this will be used to figure out which follower is
+ // responding
+ private final String followerId;
+
+ public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) {
+ super(term);
+
+ this.followerId = followerId;
+ this.success = success;
+ this.logLastIndex = logLastIndex;
+ this.logLastTerm = logLastTerm;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public long getLogLastIndex() {
+ return logLastIndex;
+ }
+
+ public long getLogLastTerm() {
+ return logLastTerm;
+ }
+
+ public String getFollowerId() {
+ return followerId;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class InstallSnapshot extends AbstractRaftRPC {
+
+ private final String leaderId;
+ private final long lastIncludedIndex;
+ private final long lastIncludedTerm;
+ private final Object data;
+
+ public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+ super(term);
+ this.leaderId = leaderId;
+ this.lastIncludedIndex = lastIncludedIndex;
+ this.lastIncludedTerm = lastIncludedTerm;
+ this.data = data;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ public long getLastIncludedIndex() {
+ return lastIncludedIndex;
+ }
+
+ public long getLastIncludedTerm() {
+ return lastIncludedTerm;
+ }
+
+ public Object getData() {
+ return data;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class InstallSnapshotReply extends AbstractRaftRPC {
+
+ // The followerId - this will be used to figure out which follower is
+ // responding
+ private final String followerId;
+
+ protected InstallSnapshotReply(long term, String followerId) {
+ super(term);
+ this.followerId = followerId;
+ }
+
+ public String getFollowerId() {
+ return followerId;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public interface RaftRPC {
+ public long getTerm();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+/**
+ * Invoked by candidates to gather votes (§5.2).
+ */
+public class RequestVote extends AbstractRaftRPC{
+
+ // candidate requesting vote
+ private final String candidateId;
+
+ // index of candidate’s last log entry (§5.4)
+ private final long lastLogIndex;
+
+ // term of candidate’s last log entry (§5.4)
+ private final long lastLogTerm;
+
+ public RequestVote(long term, String candidateId, long lastLogIndex,
+ long lastLogTerm) {
+ super(term);
+ this.candidateId = candidateId;
+ this.lastLogIndex = lastLogIndex;
+ this.lastLogTerm = lastLogTerm;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public String getCandidateId() {
+ return candidateId;
+ }
+
+ public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class RequestVoteReply extends AbstractRaftRPC{
+
+ // true means candidate received vot
+ private final boolean voteGranted;
+
+ public RequestVoteReply(long term, boolean voteGranted) {
+ super(term);
+ this.voteGranted = voteGranted;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public boolean isVoteGranted() {
+ return voteGranted;
+ }
+}
--- /dev/null
+akka {
+ loglevel = "DEBUG"
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.raft.RaftActor$ReplicatedLogImplEntry" = java
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class AbstractActorTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setUpClass() {
+ System.setProperty("shard.persistent", "false");
+ system = ActorSystem.create("test");
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected ActorSystem getSystem() {
+ return system;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockRaftActorContext implements RaftActorContext {
+
+ private String id;
+ private ActorSystem system;
+ private ActorRef actor;
+ private long index = 0;
+ private long lastApplied = 0;
+ private final ElectionTerm electionTerm;
+ private ReplicatedLog replicatedLog;
+ private Map<String, String> peerAddresses = new HashMap();
+
+ public MockRaftActorContext(){
+ electionTerm = null;
+
+ initReplicatedLog();
+ }
+
+ public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+ this.id = id;
+ this.system = system;
+ this.actor = actor;
+
+ final String id1 = id;
+ electionTerm = new ElectionTerm() {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private final String id = id1;
+ private long currentTerm = 0;
+ private String votedFor = "";
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public void update(long currentTerm, String votedFor){
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+
+ // TODO : Write to some persistent state
+ }
+
+ @Override public void updateAndPersist(long currentTerm,
+ String votedFor) {
+ update(currentTerm, votedFor);
+ }
+ };
+
+ initReplicatedLog();
+ }
+
+
+ public void initReplicatedLog(){
+ this.replicatedLog = new SimpleReplicatedLog();
+ this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, ""));
+ }
+
+ @Override public ActorRef actorOf(Props props) {
+ return system.actorOf(props);
+ }
+
+ @Override public ActorSelection actorSelection(String path) {
+ return system.actorSelection(path);
+ }
+
+ @Override public String getId() {
+ return id;
+ }
+
+ @Override public ActorRef getActor() {
+ return actor;
+ }
+
+ @Override public ElectionTerm getTermInformation() {
+ return electionTerm;
+ }
+
+ public void setIndex(long index){
+ this.index = index;
+ }
+
+ @Override public long getCommitIndex() {
+ return index;
+ }
+
+ @Override public void setCommitIndex(long commitIndex) {
+ this.index = commitIndex;
+ }
+
+ @Override public void setLastApplied(long lastApplied){
+ this.lastApplied = lastApplied;
+ }
+
+ @Override public long getLastApplied() {
+ return lastApplied;
+ }
+
+ public void setReplicatedLog(ReplicatedLog replicatedLog) {
+ this.replicatedLog = replicatedLog;
+ }
+
+ @Override public ReplicatedLog getReplicatedLog() {
+ return replicatedLog;
+ }
+
+ @Override public ActorSystem getActorSystem() {
+ return this.system;
+ }
+
+ @Override public LoggingAdapter getLogger() {
+ return Logging.getLogger(system, this);
+ }
+
+ @Override public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ @Override public String getPeerAddress(String peerId) {
+ return peerAddresses.get(peerId);
+ }
+
+ @Override public void addToPeers(String name, String address) {
+ peerAddresses.put(name, address);
+ }
+
+ @Override public void removePeer(String name) {
+ peerAddresses.remove(name);
+ }
+
+ public void setPeerAddresses(Map<String, String> peerAddresses) {
+ this.peerAddresses = peerAddresses;
+ }
+
+
+
+ public static class SimpleReplicatedLog implements ReplicatedLog {
+ private final List<ReplicatedLogEntry> log = new ArrayList<>();
+
+ @Override public ReplicatedLogEntry get(long index) {
+ if(index >= log.size() || index < 0){
+ return null;
+ }
+ return log.get((int) index);
+ }
+
+ @Override public ReplicatedLogEntry last() {
+ if(log.size() == 0){
+ return null;
+ }
+ return log.get(log.size()-1);
+ }
+
+ @Override public long lastIndex() {
+ if(log.size() == 0){
+ return -1;
+ }
+
+ return last().getIndex();
+ }
+
+ @Override public long lastTerm() {
+ if(log.size() == 0){
+ return -1;
+ }
+
+ return last().getTerm();
+ }
+
+ @Override public void removeFrom(long index) {
+ if(index >= log.size() || index < 0){
+ return;
+ }
+
+ log.subList((int) index, log.size()).clear();
+ //log.remove((int) index);
+ }
+
+ @Override public void removeFromAndPersist(long index) {
+ removeFrom(index);
+ }
+
+ @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
+ log.add(replicatedLogEntry);
+ }
+
+ @Override public void appendAndPersist(
+ ReplicatedLogEntry replicatedLogEntry) {
+ append(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ if(index >= log.size() || index < 0){
+ return Collections.EMPTY_LIST;
+ }
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ for(int i=(int) index ; i < log.size() ; i++) {
+ entries.add(get(i));
+ }
+ return entries;
+ }
+
+ @Override public long size() {
+ return log.size();
+ }
+
+ @Override public boolean isPresent(long index) {
+ if(index >= log.size() || index < 0){
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public boolean isInSnapshot(long index) {
+ return false;
+ }
+
+ @Override public Object getSnapshot() {
+ return null;
+ }
+
+ @Override public long getSnapshotIndex() {
+ return -1;
+ }
+
+ @Override public long getSnapshotTerm() {
+ return -1;
+ }
+ }
+
+ public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
+
+ private final long term;
+ private final long index;
+ private final Object data;
+
+ public MockReplicatedLogEntry(long term, long index, Object data){
+
+ this.term = term;
+ this.index = index;
+ this.data = data;
+ }
+
+ @Override public Object getData() {
+ return data;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public long getIndex() {
+ return index;
+ }
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
+
+ private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ /**
+ * This test checks that when a new Raft RPC message is received with a newer
+ * term the RaftActor gets into the Follower state.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleRaftRPCWithNewerTerm() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createAppendEntriesWithNewerTerm());
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createAppendEntriesReplyWithNewerTerm());
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createRequestVoteWithNewerTerm());
+
+ assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+ createRequestVoteReplyWithNewerTerm());
+
+
+ }};
+ }
+
+
+ /**
+ * This test verifies that when an AppendEntries is received with a term that
+ * is less that the currentTerm of the RaftActor then the RaftActor does not
+ * change it's state and it responds back with a failure
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesSenderTermLessThanReceiverTerm()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to a high number (1000)
+ context.getTermInformation().update(1000, "test");
+
+ AppendEntries appendEntries =
+ new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+
+
+ }};
+ }
+
+
+ @Test
+ public void testHandleAppendEntriesAddSameEntryToLog(){
+ new JavaTestKit(getSystem()) {
+ {
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(2, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+
+ context.setReplicatedLog(log);
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+
+ AppendEntries appendEntries =
+ new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
+ // Resetting the Candidates term to make sure it will match
+ // the term sent by AppendEntries. If this was not done then
+ // the test will fail because the Candidate will assume that
+ // the message was sent to it from a lower term peer and will
+ // thus respond with a failure
+ context.getTermInformation().update(2, "test");
+ }
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+
+ assertEquals(1, log.size());
+
+
+ }};
+ }
+
+ /**
+ * This test verifies that when a RequestVote is received by the RaftActor
+ * with a term which is greater than the RaftActors' currentTerm and the
+ * senders' log is more upto date than the receiver that the receiver grants
+ * the vote to the sender
+ */
+ @Test
+ public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorBehavior behavior = createBehavior(
+ createActorContext(behaviorActor));
+
+ RaftState raftState = behavior.handleMessage(getTestActor(),
+ new RequestVote(1000, "test", 10000, 999));
+
+ if(behavior.state() != RaftState.Follower){
+ assertEquals(RaftState.Follower, raftState);
+ } else {
+
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ }
+ };
+ }};
+ }
+
+ /**
+ * This test verifies that when a RaftActor receives a RequestVote message
+ * with a term that is greater than it's currentTerm but a less up-to-date
+ * log then the receiving RaftActor will not grant the vote to the sender
+ */
+ @Test
+ public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext actorContext =
+ createActorContext(behaviorActor);
+
+ MockRaftActorContext.SimpleReplicatedLog
+ log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(20000,
+ 1000000, ""));
+
+ ((MockRaftActorContext) actorContext).setReplicatedLog(log);
+
+ RaftActorBehavior behavior = createBehavior(actorContext);
+
+ RaftState raftState = behavior.handleMessage(getTestActor(),
+ new RequestVote(1000, "test", 10000, 999));
+
+ if(behavior.state() != RaftState.Follower){
+ assertEquals(RaftState.Follower, raftState);
+ } else {
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ }
+ };
+ }};
+ }
+
+
+
+ /**
+ * This test verifies that the receiving RaftActor will not grant a vote
+ * to a sender if the sender's term is lesser than the currentTerm of the
+ * recipient RaftActor
+ */
+ @Test
+ public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context =
+ createActorContext(behaviorActor);
+
+ context.getTermInformation().update(1000, null);
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(),
+ new RequestVote(999, "test", 10000, 999));
+
+ final Boolean out =
+ new ExpectMsg<Boolean>(duration("1 seconds"),
+ "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply =
+ (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
+ ActorRef actorRef, RaftRPC rpc) {
+
+ RaftActorContext actorContext = createActorContext();
+ setLastLogEntry(
+ (MockRaftActorContext) actorContext, 0, 0, "");
+
+ RaftState raftState = createBehavior(actorContext)
+ .handleMessage(actorRef, rpc);
+
+ assertEquals(RaftState.Follower, raftState);
+ }
+
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
+ MockRaftActorContext actorContext, long term, long index, Object data) {
+ return setLastLogEntry(actorContext,
+ new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
+ }
+
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
+ MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) {
+ MockRaftActorContext.SimpleReplicatedLog
+ log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(logEntry);
+ actorContext.setReplicatedLog(log);
+
+ return log;
+ }
+
+ protected abstract RaftActorBehavior createBehavior(
+ RaftActorContext actorContext);
+
+ protected RaftActorBehavior createBehavior() {
+ return createBehavior(createActorContext());
+ }
+
+ protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext();
+ }
+
+ protected RaftActorContext createActorContext(ActorRef actor) {
+ return new MockRaftActorContext("test", getSystem(), actor);
+ }
+
+ protected AppendEntries createAppendEntriesWithNewerTerm() {
+ return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+ }
+
+ protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
+ return new AppendEntriesReply("follower-1", 100, false, 100, 100);
+ }
+
+ protected RequestVote createRequestVoteWithNewerTerm() {
+ return new RequestVote(100, "candidate-1", 10, 100);
+ }
+
+ protected RequestVoteReply createRequestVoteReplyWithNewerTerm() {
+ return new RequestVoteReply(100, false);
+ }
+
+
+
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class CandidateTest extends AbstractRaftActorBehaviorTest {
+
+ private final ActorRef candidateActor = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor1 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor2 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor3 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final ActorRef peerActor4 = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+ private final Map<String, String> onePeer = new HashMap<>();
+ private final Map<String, String> twoPeers = new HashMap<>();
+ private final Map<String, String> fourPeers = new HashMap<>();
+
+ @Before
+ public void setUp(){
+ onePeer.put(peerActor1.path().toString(),
+ peerActor1.path().toString());
+
+ twoPeers.put(peerActor1.path().toString(),
+ peerActor1.path().toString());
+ twoPeers.put(peerActor2.path().toString(),
+ peerActor2.path().toString());
+
+ fourPeers.put(peerActor1.path().toString(),
+ peerActor1.path().toString());
+ fourPeers.put(peerActor2.path().toString(),
+ peerActor2.path().toString());
+ fourPeers.put(peerActor3.path().toString(),
+ peerActor3.path().toString());
+ fourPeers.put(peerActor4.path().toString(),
+ peerActor3.path().toString());
+
+
+ }
+
+ @Test
+ public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){
+ RaftActorContext raftActorContext = createActorContext();
+ long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
+
+ new Candidate(raftActorContext);
+
+ assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
+ assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
+ }
+
+ @Test
+ public void testThatAnElectionTimeoutIsTriggered(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Candidate candidate = new Candidate(createActorContext(getTestActor()));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof ElectionTimeout) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
+ RaftActorContext raftActorContext = createActorContext();
+ Candidate candidate =
+ new Candidate(raftActorContext);
+
+ RaftState raftState =
+ candidate.handleMessage(candidateActor, new ElectionTimeout());
+
+ Assert.assertEquals(RaftState.Leader, raftState);
+ }
+
+ @Test
+ public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){
+ MockRaftActorContext raftActorContext =
+ (MockRaftActorContext) createActorContext();
+ raftActorContext.setPeerAddresses(onePeer);
+ Candidate candidate =
+ new Candidate(raftActorContext);
+
+ RaftState raftState =
+ candidate.handleMessage(candidateActor, new ElectionTimeout());
+
+ Assert.assertEquals(RaftState.Candidate, raftState);
+ }
+
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){
+ MockRaftActorContext raftActorContext =
+ (MockRaftActorContext) createActorContext();
+ raftActorContext.setPeerAddresses(twoPeers);
+ Candidate candidate =
+ new Candidate(raftActorContext);
+
+ RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+ Assert.assertEquals(RaftState.Leader, stateOnFirstVote);
+
+ }
+
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){
+ MockRaftActorContext raftActorContext =
+ (MockRaftActorContext) createActorContext();
+ raftActorContext.setPeerAddresses(fourPeers);
+ Candidate candidate =
+ new Candidate(raftActorContext);
+
+ RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+
+ RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
+
+ Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
+ Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
+
+ }
+
+ @Test
+ public void testResponseToAppendEntriesWithLowerTerm(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Candidate candidate = new Candidate(createActorContext(getTestActor()));
+
+ candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testResponseToRequestVoteWithLowerTerm(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Candidate candidate = new Candidate(createActorContext(getTestActor()));
+
+ candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, null);
+
+ // Once a candidate is created it will immediately increment the current term so after
+ // construction the currentTerm should be 1001
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1001, "test", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, "test");
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1001, "candidate", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+
+
+ @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ return new Candidate(actorContext);
+ }
+
+ @Override protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext("test", getSystem(), candidateActor);
+ }
+
+
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class FollowerTest extends AbstractRaftActorBehaviorTest {
+
+ private final ActorRef followerActor = getSystem().actorOf(Props.create(
+ DoNothingActor.class));
+
+
+ @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ return new Follower(actorContext);
+ }
+
+ @Override protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext("test", getSystem(), followerActor);
+ }
+
+ @Test
+ public void testThatAnElectionTimeoutIsTriggered(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ Follower follower = new Follower(createActorContext(getTestActor()));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof ElectionTimeout) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleElectionTimeout(){
+ RaftActorContext raftActorContext = createActorContext();
+ Follower follower =
+ new Follower(raftActorContext);
+
+ RaftState raftState =
+ follower.handleMessage(followerActor, new ElectionTimeout());
+
+ Assert.assertEquals(RaftState.Candidate, raftState);
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, null);
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ RaftActorContext context = createActorContext(getTestActor());
+
+ context.getTermInformation().update(1000, "test");
+
+ RaftActorBehavior follower = createBehavior(context);
+
+ follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
+
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof RequestVoteReply) {
+ RequestVoteReply reply = (RequestVoteReply) in;
+ return reply.isVoteGranted();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+ }
+ };
+ }};
+ }
+
+ /**
+ * This test verifies that when an AppendEntries RPC is received by a RaftActor
+ * with a commitIndex that is greater than what has been applied to the
+ * state machine of the RaftActor, the RaftActor applies the state and
+ * sets it current applied state to the commitIndex of the sender.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ RaftActorContext context =
+ createActorContext();
+
+ context.setLastApplied(100);
+ setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
+
+ List<ReplicatedLogEntry> entries =
+ Arrays.asList(
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+ "foo")
+ );
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries =
+ new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+
+ RaftState raftState =
+ createBehavior(context).handleMessage(getRef(), appendEntries);
+
+ assertEquals(101L, context.getLastApplied());
+
+ }};
+ }
+
+ /**
+ * This test verifies that when an AppendEntries is received a specific prevLogTerm
+ * which does not match the term that is in RaftActors log entry at prevLogIndex
+ * then the RaftActor does not change it's state and it returns a failure.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(95, "test");
+
+ // Set the last log entry term for the receiver to be greater than
+ // what we will be sending as the prevLogTerm in AppendEntries
+ MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
+ setLastLogEntry(context, 20, 0, "");
+
+ // AppendEntries is now sent with a bigger term
+ // this will set the receivers term to be the same as the sender's term
+ AppendEntries appendEntries =
+ new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+
+
+ }};
+ }
+
+
+
+ /**
+ * This test verifies that when a new AppendEntries message is received with
+ * new entries and the logs of the sender and receiver match that the new
+ * entries get added to the log and the log is incremented by the number of
+ * entries received in appendEntries
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesAddNewEntries() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(1, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+ assertEquals(5, log.last().getIndex() + 1);
+ assertNotNull(log.get(3));
+ assertNotNull(log.get(4));
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+
+ }};
+ }
+
+
+
+ /**
+ * This test verifies that when a new AppendEntries message is received with
+ * new entries and the logs of the sender and receiver are out-of-sync that
+ * the log is first corrected by removing the out of sync entries from the
+ * log and then adding in the new entries sent with the AppendEntries message
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleAppendEntriesCorrectReceiverLogEntries()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(2, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries =
+ new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftState raftState =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftState);
+
+ // The entry at index 2 will be found out-of-sync with the leader
+ // and will be removed
+ // Then the two new entries will be added to the log
+ // Thus making the log to have 4 entries
+ assertEquals(4, log.last().getIndex() + 1);
+ assertNotNull(log.get(2));
+
+
+ assertEquals("one", log.get(1).getData());
+
+ // Check that the entry at index 2 has the new data
+ assertEquals("two-1", log.get(2).getData());
+
+ assertEquals("three", log.get(3).getData());
+ assertNotNull(log.get(3));
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+
+ }};
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class LeaderTest extends AbstractRaftActorBehaviorTest {
+
+ private ActorRef leaderActor =
+ getSystem().actorOf(Props.create(DoNothingActor.class));
+ private ActorRef senderActor =
+ getSystem().actorOf(Props.create(DoNothingActor.class));
+
+ @Test
+ public void testHandleMessageForUnknownMessage() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ Leader leader =
+ new Leader(createActorContext());
+
+ // handle message should return the Leader state when it receives an
+ // unknown message
+ RaftState state = leader.handleMessage(senderActor, "foo");
+ Assert.assertEquals(RaftState.Leader, state);
+ }};
+ }
+
+
+ @Test
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef followerActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap();
+
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(actorContext);
+ leader.handleMessage(senderActor, new SendHeartBeat());
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof AppendEntries) {
+ if (((AppendEntries) in).getTerm()
+ == 0) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef followerActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap();
+
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ actorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(actorContext);
+ RaftState raftState = leader
+ .handleMessage(senderActor, new Replicate(null, null,
+ new MockRaftActorContext.MockReplicatedLogEntry(1,
+ 100,
+ "foo")
+ ));
+
+ // State should not change
+ assertEquals(RaftState.Leader, raftState);
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof AppendEntries) {
+ if (((AppendEntries) in).getTerm()
+ == 0) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef raftActor = getTestActor();
+
+ MockRaftActorContext actorContext =
+ new MockRaftActorContext("test", getSystem(), raftActor);
+
+ Leader leader = new Leader(actorContext);
+ RaftState raftState = leader
+ .handleMessage(senderActor, new Replicate(null, "state-id",
+ new MockRaftActorContext.MockReplicatedLogEntry(1,
+ 100,
+ "foo")
+ ));
+
+ // State should not change
+ assertEquals(RaftState.Leader, raftState);
+
+ assertEquals(100, actorContext.getCommitIndex());
+
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"),
+ "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof ApplyState) {
+ if (((ApplyState) in).getIdentifier().equals("state-id")) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+
+ @Override protected RaftActorBehavior createBehavior(
+ RaftActorContext actorContext) {
+ return new Leader(actorContext);
+ }
+
+ @Override protected RaftActorContext createActorContext() {
+ return new MockRaftActorContext("test", getSystem(), leaderActor);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.actor.UntypedActor;
+
+public class DoNothingActor extends UntypedActor{
+ @Override public void onReceive(Object message) throws Exception {
+
+ }
+}
--- /dev/null
+akka {
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+ }
+ }
+}
\ No newline at end of file