2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.UntypedPersistentActor;
18 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
19 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
20 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
21 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
22 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
23 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
24 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
27 import java.io.Serializable;
28 import java.util.ArrayList;
29 import java.util.List;
33 * RaftActor encapsulates a state machine that needs to be kept synchronized
34 * in a cluster. It implements the RAFT algorithm as described in the paper
35 * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
36 * In Search of an Understandable Consensus Algorithm</a>
38 * RaftActor has 3 states and each state has a certain behavior associated
39 * with it. A Raft actor can behave as,
42 * <li> A Follower (or) </li>
43 * <li> A Candidate </li>
47 * A RaftActor MUST be a Leader in order to accept requests from clients to
48 * change the state of it's encapsulated state machine. Once a RaftActor becomes
49 * a Leader it is also responsible for ensuring that all followers ultimately
50 * have the same log and therefore the same state machine as itself.
53 * The current behavior of a RaftActor determines how election for leadership
54 * is initiated and how peer RaftActors react to request for votes.
57 * Each RaftActor also needs to know the current election term. It uses this
58 * information for a couple of things. One is to simply figure out who it
59 * voted for in the last election. Another is to figure out if the message
60 * it received to update it's state is stale.
63 * The RaftActor uses akka-persistence to store it's replicated log.
64 * Furthermore through it's behaviors a Raft Actor determines
67 * <li> when a log entry should be persisted </li>
68 * <li> when a log entry should be applied to the state machine (and) </li>
69 * <li> when a snapshot should be saved </li>
72 * <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
74 public abstract class RaftActor extends UntypedPersistentActor {
75 protected final LoggingAdapter LOG =
76 Logging.getLogger(getContext().system(), this);
79 * The current state determines the current behavior of a RaftActor
80 * A Raft Actor always starts off in the Follower State
82 private RaftActorBehavior currentBehavior;
85 * This context should NOT be passed directly to any other actor it is
86 * only to be consumed by the RaftActorBehaviors
88 private RaftActorContext context;
91 * The in-memory journal
93 private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
97 public RaftActor(String id, Map<String, String> peerAddresses){
98 context = new RaftActorContextImpl(this.getSelf(),
100 id, new ElectionTermImpl(getSelf().path().toString()),
101 -1, -1, replicatedLog, peerAddresses, LOG);
102 currentBehavior = switchBehavior(RaftState.Follower);
105 @Override public void onReceiveRecover(Object message) {
106 if(message instanceof ReplicatedLogEntry) {
107 replicatedLog.append((ReplicatedLogEntry) message);
108 } else if(message instanceof RecoveryCompleted){
109 LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex());
113 @Override public void onReceiveCommand(Object message) {
114 if(message instanceof ApplyState){
116 ApplyState applyState = (ApplyState) message;
118 LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex());
120 applyState(applyState.getClientActor(), applyState.getIdentifier(),
121 applyState.getReplicatedLogEntry().getData());
122 } else if(message instanceof FindLeader){
123 getSender().tell(new FindLeaderReply(
124 context.getPeerAddress(currentBehavior.getLeaderId())),
128 currentBehavior.handleMessage(getSender(), message);
129 currentBehavior = switchBehavior(state);
133 private RaftActorBehavior switchBehavior(RaftState state){
134 if(currentBehavior != null) {
135 if (currentBehavior.state() == state) {
136 return currentBehavior;
138 LOG.info("Switching from state " + currentBehavior.state() + " to "
142 currentBehavior.close();
143 } catch (Exception e) {
144 LOG.error(e, "Failed to close behavior : " + currentBehavior.state());
148 LOG.info("Switching behavior to " + state);
150 RaftActorBehavior behavior = null;
151 if(state == RaftState.Candidate){
152 behavior = new Candidate(context);
153 } else if(state == RaftState.Follower){
154 behavior = new Follower(context);
156 behavior = new Leader(context);
162 * When a derived RaftActor needs to persist something it must call
169 protected void persistData(ActorRef clientActor, String identifier, Object data){
170 LOG.debug("Persist data " + identifier);
171 ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
172 context.getReplicatedLog().lastIndex() + 1,
173 context.getTermInformation().getCurrentTerm(), data);
175 replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry);
178 protected abstract void applyState(ActorRef clientActor, String identifier, Object data);
180 protected String getId(){
181 return context.getId();
184 protected boolean isLeader(){
185 return context.getId().equals(currentBehavior.getLeaderId());
188 protected ActorSelection getLeader(){
189 String leaderId = currentBehavior.getLeaderId();
190 String peerAddress = context.getPeerAddress(leaderId);
191 LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress);
192 return context.actorSelection(peerAddress);
195 private class ReplicatedLogImpl implements ReplicatedLog {
196 private final List<ReplicatedLogEntry> journal = new ArrayList();
197 private long snapshotIndex = 0;
198 private Object snapShot = null;
201 @Override public ReplicatedLogEntry get(long index) {
202 if(index < 0 || index >= journal.size()){
206 return journal.get((int) (index - snapshotIndex));
209 @Override public ReplicatedLogEntry last() {
210 if(journal.size() == 0){
213 return get(journal.size() - 1);
216 @Override public long lastIndex() {
217 if(journal.size() == 0){
221 return last().getIndex();
224 @Override public long lastTerm() {
225 if(journal.size() == 0){
229 return last().getTerm();
233 @Override public void removeFrom(long index) {
234 if(index < 0 || index >= journal.size()){
237 for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){
243 @Override public void append(final ReplicatedLogEntry replicatedLogEntry) {
244 journal.add(replicatedLogEntry);
247 @Override public List<ReplicatedLogEntry> getFrom(long index) {
248 List<ReplicatedLogEntry> entries = new ArrayList<>(100);
249 if(index < 0 || index >= journal.size()){
252 for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){
253 entries.add(journal.get(i));
258 @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){
259 appendAndPersist(null, null, replicatedLogEntry);
262 public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){
263 context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex());
264 // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
265 journal.add(replicatedLogEntry);
266 persist(replicatedLogEntry,
267 new Procedure<ReplicatedLogEntry>() {
268 public void apply(ReplicatedLogEntry evt) throws Exception {
269 // Send message for replication
270 if(clientActor != null) {
271 currentBehavior.handleMessage(getSelf(),
272 new Replicate(clientActor, identifier,
273 replicatedLogEntry));
279 @Override public long size() {
280 return journal.size() + snapshotIndex;
284 private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
287 private final long index;
288 private final long term;
289 private final Object payload;
291 public ReplicatedLogImplEntry(long index, long term, Object payload){
295 this.payload = payload;
298 @Override public Object getData() {
302 @Override public long getTerm() {
306 @Override public long getIndex() {