--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>sal-akka-raft</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-cluster_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <Export-package></Export-package>
+ <Private-Package></Private-Package>
+ <Import-Package></Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>org.opendaylight.controller.*</include>
+ </includes>
+ <check>false</check>
+ </configuration>
+ <executions>
+ <execution>
+ <id>pre-test</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>post-test</id>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+ </scm>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.example;
+
+import org.opendaylight.controller.cluster.raft.RaftActor;
+
+/**
+ * A sample actor showing how the RaftActor is to be extended
+ */
+public class ExampleActor extends RaftActor {
+ public ExampleActor(String id) {
+ super(id);
+ }
+
+ @Override public void onReceiveCommand(Object message){
+ /*
+ Here the extended class does whatever it needs to do.
+ If it cannot handle a message then it passes it on to the super
+ class for handling
+ */
+ super.onReceiveCommand(message);
+ }
+
+ @Override public void onReceiveRecover(Object message) {
+ super.onReceiveRecover(message);
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * ElectionTerm contains information about a RaftActors election term.
+ * <p>
+ * This information includes the last known current term of the RaftActor
+ * and which peer was voted for by the RaftActor in that term
+ * <p>
+ * This class ensures that election term information is persisted
+ */
+public interface ElectionTerm {
+ /**
+ * latest term server has seen (initialized to 0
+ * on first boot, increases monotonically)
+ */
+ AtomicLong getCurrentTerm();
+
+ /**
+ * candidateId that received vote in current
+ * term (or null if none)
+ */
+ String getVotedFor();
+
+ /**
+ * Called when we need to update the current term either because we received
+ * a message from someone with a more uptodate term or because we just voted
+ * for someone
+ *
+ * @param currentTerm
+ * @param votedFor
+ */
+ void update(AtomicLong currentTerm, String votedFor);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ElectionTermImpl implements ElectionTerm{
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private final String id;
+
+ private AtomicLong currentTerm;
+
+ private String votedFor;
+
+ public ElectionTermImpl(String id) {
+ this.id = id;
+
+ // TODO: Read currentTerm from some persistent state
+ currentTerm = new AtomicLong(0);
+
+ // TODO: Read votedFor from some file
+ votedFor = "";
+ }
+
+ public AtomicLong getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public void update(AtomicLong currentTerm, String votedFor){
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+
+ // TODO : Write to some persistent state
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The state of the followers log as known by the Leader
+ */
+public interface FollowerLogInformation {
+
+ /**
+ * Increment the value of the nextIndex
+ * @return
+ */
+ public long incrNextIndex();
+
+ /**
+ * Increment the value of the matchIndex
+ * @return
+ */
+ public long incrMatchIndex();
+
+ /**
+ * The identifier of the follower
+ * This could simply be the url of the remote actor
+ */
+ public String getId();
+
+ /**
+ * for each server, index of the next log entry
+ * to send to that server (initialized to leader
+ * last log index + 1)
+ */
+ public AtomicLong getNextIndex();
+
+ /**
+ * for each server, index of highest log entry
+ * known to be replicated on server
+ * (initialized to 0, increases monotonically)
+ */
+ public AtomicLong getMatchIndex();
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FollowerLogInformationImpl implements FollowerLogInformation{
+
+ private final String id;
+
+ private final AtomicLong nextIndex;
+
+ private final AtomicLong matchIndex;
+
+ public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
+ AtomicLong matchIndex) {
+ this.id = id;
+ this.nextIndex = nextIndex;
+ this.matchIndex = matchIndex;
+ }
+
+ public long incrNextIndex(){
+ return nextIndex.incrementAndGet();
+ }
+
+ public long incrMatchIndex(){
+ return matchIndex.incrementAndGet();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public AtomicLong getNextIndex() {
+ return nextIndex;
+ }
+
+ public AtomicLong getMatchIndex() {
+ return matchIndex;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.persistence.UntypedEventsourcedProcessor;
+import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * RaftActor encapsulates a state machine that needs to be kept synchronized
+ * in a cluster. It implements the RAFT algorithm as described in the paper
+ * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
+ * In Search of an Understandable Consensus Algorithm</a>
+ * <p>
+ * RaftActor has 3 states and each state has a certain behavior associated
+ * with it. A Raft actor can behave as,
+ * <ul>
+ * <li> A Leader </li>
+ * <li> A Follower (or) </li>
+ * <li> A Candidate </li>
+ * </ul>
+ *
+ * <p>
+ * A RaftActor MUST be a Leader in order to accept requests from clients to
+ * change the state of it's encapsulated state machine. Once a RaftActor becomes
+ * a Leader it is also responsible for ensuring that all followers ultimately
+ * have the same log and therefore the same state machine as itself.
+ *
+ * <p>
+ * The current behavior of a RaftActor determines how election for leadership
+ * is initiated and how peer RaftActors react to request for votes.
+ *
+ * <p>
+ * Each RaftActor also needs to know the current election term. It uses this
+ * information for a couple of things. One is to simply figure out who it
+ * voted for in the last election. Another is to figure out if the message
+ * it received to update it's state is stale.
+ *
+ * <p>
+ * The RaftActor uses akka-persistence to store it's replicated log.
+ * Furthermore through it's behaviors a Raft Actor determines
+ *
+ * <ul>
+ * <li> when a log entry should be persisted </li>
+ * <li> when a log entry should be applied to the state machine (and) </li>
+ * <li> when a snapshot should be saved </li>
+ * </ul>
+ *
+ * <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
+ */
+public abstract class RaftActor extends UntypedEventsourcedProcessor {
+
+ /**
+ * The current state determines the current behavior of a RaftActor
+ * A Raft Actor always starts off in the Follower State
+ */
+ private RaftActorBehavior currentBehavior;
+
+ /**
+ * This context should NOT be passed directly to any other actor it is
+ * only to be consumed by the RaftActorBehaviors
+ */
+ private RaftActorContext context;
+
+ public RaftActor(String id){
+ context = new RaftActorContextImpl(this.getSelf(),
+ this.getContext(),
+ id, new ElectionTermImpl(id),
+ new AtomicLong(0), new AtomicLong(0), new ReplicatedLogImpl());
+ currentBehavior = switchBehavior(RaftState.Follower);
+ }
+
+ @Override public void onReceiveRecover(Object message) {
+ throw new UnsupportedOperationException("onReceiveRecover");
+ }
+
+ @Override public void onReceiveCommand(Object message) {
+ RaftState state = currentBehavior.handleMessage(getSender(), message);
+ currentBehavior = switchBehavior(state);
+ }
+
+ private RaftActorBehavior switchBehavior(RaftState state){
+ RaftActorBehavior behavior = null;
+ if(state == RaftState.Candidate){
+ behavior = new Candidate(context, Collections.EMPTY_LIST);
+ } else if(state == RaftState.Follower){
+ behavior = new Follower(context);
+ } else {
+ behavior = new Leader(context, Collections.EMPTY_LIST);
+ }
+ return behavior;
+ }
+
+ private class ReplicatedLogImpl implements ReplicatedLog {
+
+ @Override public ReplicatedLogEntry getReplicatedLogEntry(long index) {
+ throw new UnsupportedOperationException("getReplicatedLogEntry");
+ }
+
+ @Override public ReplicatedLogEntry last() {
+ throw new UnsupportedOperationException("last");
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The RaftActorContext contains that portion of the RaftActors state that
+ * needs to be shared with it's behaviors. A RaftActorContext should NEVER be
+ * used in any actor context outside the RaftActor that constructed it.
+ */
+public interface RaftActorContext {
+ /**
+ * Create a new local actor
+ * @param props
+ * @return
+ */
+ ActorRef actorOf(Props props);
+
+ /**
+ * Create a actor selection
+ * @param path
+ * @return
+ */
+ ActorSelection actorSelection(String path);
+
+ /**
+ * Get the identifier for the RaftActor. This identifier represents the
+ * name of the actor whose common state is being shared. For example the
+ * id could be 'inventory'
+ * @return the identifier
+ */
+ String getId();
+
+ /**
+ * A reference to the RaftActor itself. This could be used to send messages
+ * to the RaftActor
+ * @return
+ */
+ ActorRef getActor();
+
+ /**
+ * Get the ElectionTerm information
+ * @return
+ */
+ ElectionTerm getTermInformation();
+
+ /**
+ * index of highest log entry known to be
+ * committed (initialized to 0, increases
+ * monotonically)
+ * @return
+ */
+ AtomicLong getCommitIndex();
+
+ /**
+ * index of highest log entry applied to state
+ * machine (initialized to 0, increases
+ * monotonically)
+ * @return
+ */
+ AtomicLong getLastApplied();
+
+ /**
+ *
+ */
+ ReplicatedLog getReplicatedLog();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.actor.UntypedActorContext;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RaftActorContextImpl implements RaftActorContext{
+
+ private final ActorRef actor;
+
+ private final UntypedActorContext context;
+
+ private final String id;
+
+ private final ElectionTerm termInformation;
+
+ private final AtomicLong commitIndex;
+
+ private final AtomicLong lastApplied;
+
+ private final ReplicatedLog replicatedLog;
+
+ public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
+ String id,
+ ElectionTerm termInformation, AtomicLong commitIndex,
+ AtomicLong lastApplied, ReplicatedLog replicatedLog) {
+ this.actor = actor;
+ this.context = context;
+ this.id = id;
+ this.termInformation = termInformation;
+ this.commitIndex = commitIndex;
+ this.lastApplied = lastApplied;
+ this.replicatedLog = replicatedLog;
+ }
+
+ public ActorRef actorOf(Props props){
+ return context.actorOf(props);
+ }
+
+ public ActorSelection actorSelection(String path){
+ return context.actorSelection(path);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public ActorRef getActor() {
+ return actor;
+ }
+
+ public ElectionTerm getTermInformation() {
+ return termInformation;
+ }
+
+ public AtomicLong getCommitIndex() {
+ return commitIndex;
+ }
+
+ public AtomicLong getLastApplied() {
+ return lastApplied;
+ }
+
+ @Override public ReplicatedLog getReplicatedLog() {
+ return replicatedLog;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A RaftReplicator is responsible for replicating messages to any one follower.
+ * Once it gets a message for replication it should keep trying to replicate it
+ * to the remote follower indefinitely.
+ * <p>
+ * Any new messages that are sent to this actor while it is replicating a
+ * message may need to be stashed till the current message has been successfully
+ * replicated. When a message is successfully replicated the RaftReplicator
+ * needs to inform the RaftActor of it.
+ */
+public class RaftReplicator extends UntypedActor {
+
+ /**
+ * The interval at which a heart beat message will be sent to the remote
+ * RaftActor
+ *
+ * Since this is set to 100 milliseconds the Election timeout should be
+ * at least 200 milliseconds
+ *
+ */
+ private static final FiniteDuration HEART_BEAT_INTERVAL =
+ new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
+ /**
+ * The state of the follower as known to this replicator
+ */
+ private final FollowerLogInformation followerLogInformation;
+
+ /**
+ * The local RaftActor that created this replicator so that it could
+ * replicate messages to the follower
+ */
+ private final ActorRef leader;
+
+
+ /**
+ * The remote RaftActor to whom the messages need to be replicated
+ */
+ private ActorSelection follower;
+
+ private Cancellable heartbeatCancel = null;
+
+ public RaftReplicator(FollowerLogInformation followerLogInformation,
+ ActorRef leader) {
+
+ this.followerLogInformation = followerLogInformation;
+ this.leader = leader;
+ this.follower = getContext().actorSelection(followerLogInformation.getId());
+
+ // Immediately schedule a heartbeat
+ // Upon election: send initial empty AppendEntries RPCs
+ // (heartbeat) to each server; repeat during idle periods to
+ // prevent election timeouts (§5.2)
+ scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ }
+
+ private void scheduleHeartBeat(FiniteDuration interval) {
+ if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){
+ heartbeatCancel.cancel();
+ }
+
+ // Schedule a heartbeat. When the scheduler triggers the replicator
+ // will let the RaftActor (leader) know that a new heartbeat needs to be sent
+ // Scheduling the heartbeat only once here because heartbeats do not
+ // need to be sent if there are other messages being sent to the remote
+ // actor.
+ heartbeatCancel =
+ getContext().system().scheduler().scheduleOnce(interval,
+ leader, new SendHeartBeat(), getContext().dispatcher(), getSelf());
+ }
+
+
+
+ @Override public void onReceive(Object message) throws Exception {
+ scheduleHeartBeat(HEART_BEAT_INTERVAL);
+ follower.forward(message, getContext());
+ }
+
+ public static Props props(final FollowerLogInformation followerLogInformation,
+ final ActorRef leader) {
+ return Props.create(new Creator<RaftReplicator>() {
+
+ @Override public RaftReplicator create() throws Exception {
+ return new RaftReplicator(followerLogInformation, leader);
+ }
+ });
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+public enum RaftState {
+ Candidate,
+ Follower,
+ Leader
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor
+ */
+public interface ReplicatedLog {
+ /**
+ * Get a replicated log entry at the specified index
+ *
+ * @param index
+ * @return
+ */
+ ReplicatedLogEntry getReplicatedLogEntry(long index);
+
+
+ /**
+ * Get the last replicated log entry
+ *
+ * @return
+ */
+ ReplicatedLogEntry last();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Represents one entry in the replicated log
+ */
+public interface ReplicatedLogEntry {
+ /**
+ * The data stored in that entry
+ *
+ * @return
+ */
+ Object getData();
+
+ /**
+ * The term stored in that entry
+ *
+ * @return
+ */
+ long getTerm();
+
+ /**
+ * The index of the entry
+ *
+ * @return
+ */
+ long getIndex();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+
+/**
+ * Abstract class that represents the behavior of a RaftActor
+ * <p>
+ * All Servers:
+ * <ul>
+ * <li> If commitIndex > lastApplied: increment lastApplied, apply
+ * log[lastApplied] to state machine (§5.3)
+ * <li> If RPC request or response contains term T > currentTerm:
+ * set currentTerm = T, convert to follower (§5.1)
+ */
+public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+
+ /**
+ * Information about the RaftActor whose behavior this class represents
+ */
+ protected final RaftActorContext context;
+
+
+ protected AbstractRaftActorBehavior(RaftActorContext context) {
+ this.context = context;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+import java.util.List;
+
+/**
+ * The behavior of a RaftActor when it is in the CandidateState
+ * <p>
+ * Candidates (§5.2):
+ * <ul>
+ * <li> On conversion to candidate, start election:
+ * <ul>
+ * <li> Increment currentTerm
+ * <li> Vote for self
+ * <li> Reset election timer
+ * <li> Send RequestVote RPCs to all other servers
+ * </ul>
+ * <li> If votes received from majority of servers: become leader
+ * <li> If AppendEntries RPC received from new leader: convert to
+ * follower
+ * <li> If election timeout elapses: start new election
+ * </ul>
+ */
+public class Candidate extends AbstractRaftActorBehavior {
+ private final List<String> peers;
+
+ public Candidate(RaftActorContext context, List<String> peers) {
+ super(context);
+ this.peers = peers;
+ }
+
+ @Override
+ public RaftState handleMessage(ActorRef sender, Object message) {
+ return RaftState.Candidate;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * The behavior of a RaftActor in the Follower state
+ */
+public class Follower extends AbstractRaftActorBehavior {
+ public Follower(RaftActorContext context) {
+ super(context);
+ }
+
+ @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ return RaftState.Follower;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftReplicator;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The behavior of a RaftActor when it is in the Leader state
+ * <p>
+ * Leaders:
+ * <ul>
+ * <li> Upon election: send initial empty AppendEntries RPCs
+ * (heartbeat) to each server; repeat during idle periods to
+ * prevent election timeouts (§5.2)
+ * <li> If command received from client: append entry to local log,
+ * respond after entry applied to state machine (§5.3)
+ * <li> If last log index ≥ nextIndex for a follower: send
+ * AppendEntries RPC with log entries starting at nextIndex
+ * <ul>
+ * <li> If successful: update nextIndex and matchIndex for
+ * follower (§5.3)
+ * <li> If AppendEntries fails because of log inconsistency:
+ * decrement nextIndex and retry (§5.3)
+ * </ul>
+ * <li> If there exists an N such that N > commitIndex, a majority
+ * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ * set commitIndex = N (§5.3, §5.4).
+ */
+public class Leader extends AbstractRaftActorBehavior {
+
+
+ private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
+
+ public Leader(RaftActorContext context, List<String> followers){
+ super(context);
+
+ for(String follower : followers) {
+
+ ActorRef replicator = context.actorOf(
+ RaftReplicator.props(
+ new FollowerLogInformationImpl(follower,
+ new AtomicLong(0),
+ new AtomicLong(0)),
+ context.getActor()
+ )
+ );
+
+ // Create a replicator for each follower
+ followerToReplicator.put(follower, replicator);
+
+ }
+
+ }
+
+ @Override public RaftState handleMessage(ActorRef sender, Object message) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
+
+ if(message instanceof SendHeartBeat) {
+ sender.tell(new AppendEntries(
+ context.getTermInformation().getCurrentTerm().get() , context.getId(),
+ context.getReplicatedLog().last().getIndex(),
+ context.getReplicatedLog().last().getTerm(),
+ Collections.EMPTY_LIST), context.getActor());
+ }
+ return RaftState.Leader;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * A RaftActorBehavior represents the specific behavior of a RaftActor
+ * <p>
+ * A RaftActor can behave as one of the following,
+ * <ul>
+ * <li> Follower </li>
+ * <li> Candidate </li>
+ * <li> Leader </li>
+ * </ul>
+ * <p>
+ * In each of these behaviors the Raft Actor handles the same Raft messages
+ * differently.
+ */
+public interface RaftActorBehavior {
+ /**
+ * Handle a message. If the processing of the message warrants a state
+ * change then a new state should be returned otherwise this method should
+ * return the state for the current behavior.
+ *
+ * @param sender The sender of the message
+ * @param message A message that needs to be processed
+ *
+ * @return The new state or self (this)
+ */
+ RaftState handleMessage(ActorRef sender, Object message);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Message sent to commit an entry to the log
+ */
+public class CommitEntry {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Sent by a RaftReplicator to the RaftActor when it has successfully
+ * replicated an entry to a remote RaftActor
+ */
+public class EntryReplicated {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Message sent to Persist an entry into the transaction journal
+ */
+public class PersistEntry {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Sent to a replicator when log entries need to be replicated to other
+ * members in the cluster
+ */
+public class Replicate {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * Sent to a replicator when log entries need to be replicated to other
+ * members in the cluster
+ */
+public class ReplicateEntry {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * This message is sent by a RaftActor to itself so that a subclass can process
+ * it and use it to save it's state
+ */
+public class SaveSnapshot {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.internal.messages;
+
+/**
+ * This messages is sent to the Leader to prompt it to send a heartbeat
+ * to it's followers.
+ *
+ * Typically the RaftReplicator for a specific follower sends this message
+ * to the Leader
+ */
+public class SendHeartBeat {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import java.util.List;
+
+/**
+ * Invoked by leader to replicate log entries (§5.3); also used as
+ * heartbeat (§5.2).
+ */
+public class AppendEntries {
+ // Leaders term
+ private final long term;
+
+ // So that follower can redirect clients
+ private final String leaderId;
+
+ // Index of log entry immediately preceding new ones
+ private final long prevLogIndex;
+
+ // term of prevLogIndex entry
+ private final long prevLogTerm;
+
+ // log entries to store (empty for heartbeat;
+ // may send more than one for efficiency)
+ private final List<Object> entries;
+
+ public AppendEntries(long term, String leaderId, long prevLogIndex,
+ long prevLogTerm, List<Object> entries) {
+ this.term = term;
+ this.leaderId = leaderId;
+ this.prevLogIndex = prevLogIndex;
+ this.prevLogTerm = prevLogTerm;
+ this.entries = entries;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ public long getPrevLogIndex() {
+ return prevLogIndex;
+ }
+
+ public long getPrevLogTerm() {
+ return prevLogTerm;
+ }
+
+ public List<Object> getEntries() {
+ return entries;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+/**
+ * Reply for the AppendEntriesRpc message
+ */
+public class AppendEntriesReply {
+ // currentTerm, for leader to update itself
+ private final long term;
+
+ // true if follower contained entry matching
+ // prevLogIndex and prevLogTerm
+ private final boolean success;
+
+ public AppendEntriesReply(long term, boolean success) {
+ this.term = term;
+ this.success = success;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+/**
+ * Invoked by candidates to gather votes (§5.2).
+ */
+public class RequestVote {
+
+ // candidate’s term
+ private final long term;
+
+ // candidate requesting vote
+ private final String candidateId;
+
+ // index of candidate’s last log entry (§5.4)
+ private final long lastLogIndex;
+
+ // term of candidate’s last log entry (§5.4)
+ private final long lastLogTerm;
+
+ public RequestVote(long term, String candidateId, long lastLogIndex,
+ long lastLogTerm) {
+ this.term = term;
+ this.candidateId = candidateId;
+ this.lastLogIndex = lastLogIndex;
+ this.lastLogTerm = lastLogTerm;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public String getCandidateId() {
+ return candidateId;
+ }
+
+ public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+public class RequestVoteReply {
+
+ // currentTerm, for candidate to update itself
+ private final long term;
+
+ // true means candidate received vot
+ private final boolean voteGranted;
+
+ public RequestVoteReply(long term, boolean voteGranted) {
+ this.term = term;
+ this.voteGranted = voteGranted;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public boolean isVoteGranted() {
+ return voteGranted;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class AbstractActorTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setUpClass() {
+ System.setProperty("shard.persistent", "false");
+ system = ActorSystem.create("test");
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected ActorSystem getSystem() {
+ return system;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MockRaftActorContext implements RaftActorContext {
+
+ private String id;
+ private ActorSystem system;
+ private ActorRef actor;
+
+ public MockRaftActorContext(){
+
+ }
+
+ public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
+ this.id = id;
+ this.system = system;
+ this.actor = actor;
+ }
+
+ @Override public ActorRef actorOf(Props props) {
+ return system.actorOf(props);
+ }
+
+ @Override public ActorSelection actorSelection(String path) {
+ return system.actorSelection(path);
+ }
+
+ @Override public String getId() {
+ return id;
+ }
+
+ @Override public ActorRef getActor() {
+ return actor;
+ }
+
+ @Override public ElectionTerm getTermInformation() {
+ return new ElectionTermImpl(this.id);
+ }
+
+ @Override public AtomicLong getCommitIndex() {
+ throw new UnsupportedOperationException("getCommitIndex");
+ }
+
+ @Override public AtomicLong getLastApplied() {
+ throw new UnsupportedOperationException("getLastApplied");
+ }
+
+ @Override public ReplicatedLog getReplicatedLog() {
+ return new ReplicatedLog(){
+
+ @Override public ReplicatedLogEntry getReplicatedLogEntry(
+ long index) {
+ throw new UnsupportedOperationException(
+ "getReplicatedLogEntry");
+ }
+
+ @Override public ReplicatedLogEntry last() {
+ return new ReplicatedLogEntry() {
+ @Override public Object getData() {
+ return null;
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ };
+ }
+ };
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+public class RaftReplicatorTest extends AbstractActorTest {
+
+ @Test
+ public void testThatHeartBeatIsGenerated () throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ getSystem().actorOf(RaftReplicator.props(
+ new FollowerLogInformationImpl("test",
+ new AtomicLong(100), new AtomicLong(100)),
+ getRef()));
+
+ final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof SendHeartBeat) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class LeaderTest extends AbstractActorTest {
+
+ @Test
+ public void testHandleMessageForUnknownMessage() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ Leader leader =
+ new Leader(new MockRaftActorContext(), Collections.EMPTY_LIST);
+
+ // handle message should return the Leader state when it receives an
+ // unknown message
+ RaftState state = leader.handleMessage(getRef(), "foo");
+ Assert.assertEquals(RaftState.Leader, state);
+ }};
+ }
+
+
+ @Test
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ List<String> followers = new ArrayList();
+
+ followers.add(getTestActor().path().toString());
+
+ Leader leader = new Leader(new MockRaftActorContext("test", getSystem(), getTestActor()), followers);
+ leader.handleMessage(getRef(), new SendHeartBeat());
+
+ final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof AppendEntries) {
+ if (((AppendEntries) in).getTerm()
+ == 0) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ }
+
+
+ };
+ }};
+ }
+}
--- /dev/null
+akka {
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+ }
+ }
+}
\ No newline at end of file