826faf7414395bdb1eefcc59906c3cd1f6ea0e16
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.UntypedPersistentActor;
18 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
19 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
20 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
21 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
22 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
23 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
24 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
26
27 import java.io.Serializable;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Map;
31
32 /**
33  * RaftActor encapsulates a state machine that needs to be kept synchronized
34  * in a cluster. It implements the RAFT algorithm as described in the paper
35  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
36  *     In Search of an Understandable Consensus Algorithm</a>
37  * <p>
38  * RaftActor has 3 states and each state has a certain behavior associated
39  * with it. A Raft actor can behave as,
40  * <ul>
41  *     <li> A Leader </li>
42  *     <li> A Follower (or) </li>
43  *     <li> A Candidate </li>
44  * </ul>
45  *
46  * <p>
47  * A RaftActor MUST be a Leader in order to accept requests from clients to
48  * change the state of it's encapsulated state machine. Once a RaftActor becomes
49  * a Leader it is also responsible for ensuring that all followers ultimately
50  * have the same log and therefore the same state machine as itself.
51  *
52  * <p>
53  * The current behavior of a RaftActor determines how election for leadership
54  * is initiated and how peer RaftActors react to request for votes.
55  *
56  * <p>
57  * Each RaftActor also needs to know the current election term. It uses this
58  * information for a couple of things. One is to simply figure out who it
59  * voted for in the last election. Another is to figure out if the message
60  * it received to update it's state is stale.
61  *
62  * <p>
63  * The RaftActor uses akka-persistence to store it's replicated log.
64  * Furthermore through it's behaviors a Raft Actor determines
65  *
66  * <ul>
67  * <li> when a log entry should be persisted </li>
68  * <li> when a log entry should be applied to the state machine (and) </li>
69  * <li> when a snapshot should be saved </li>
70  * </ul>
71  *
72  * <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
73  */
74 public abstract class RaftActor extends UntypedPersistentActor {
75     protected final LoggingAdapter LOG =
76         Logging.getLogger(getContext().system(), this);
77
78     /**
79      *  The current state determines the current behavior of a RaftActor
80      * A Raft Actor always starts off in the Follower State
81      */
82     private RaftActorBehavior currentBehavior;
83
84     /**
85      * This context should NOT be passed directly to any other actor it is
86      * only to be consumed by the RaftActorBehaviors
87      */
88     private RaftActorContext context;
89
90     /**
91      * The in-memory journal
92      */
93     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
94
95
96
97     public RaftActor(String id, Map<String, String> peerAddresses){
98         context = new RaftActorContextImpl(this.getSelf(),
99             this.getContext(),
100             id, new ElectionTermImpl(getSelf().path().toString()),
101             -1, -1, replicatedLog, peerAddresses, LOG);
102         currentBehavior = switchBehavior(RaftState.Follower);
103     }
104
105     @Override public void onReceiveRecover(Object message) {
106         if(message instanceof ReplicatedLogEntry) {
107             replicatedLog.append((ReplicatedLogEntry) message);
108         } else if(message instanceof RecoveryCompleted){
109             LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex());
110         }
111     }
112
113     @Override public void onReceiveCommand(Object message) {
114         if(message instanceof ApplyState){
115
116             ApplyState applyState = (ApplyState)  message;
117
118             LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex());
119
120             applyState(applyState.getClientActor(), applyState.getIdentifier(),
121                 applyState.getReplicatedLogEntry().getData());
122         } else if(message instanceof FindLeader){
123             getSender().tell(new FindLeaderReply(
124                 context.getPeerAddress(currentBehavior.getLeaderId())),
125                 getSelf());
126         } else {
127             RaftState state =
128                 currentBehavior.handleMessage(getSender(), message);
129             currentBehavior = switchBehavior(state);
130         }
131     }
132
133     private RaftActorBehavior switchBehavior(RaftState state){
134         if(currentBehavior != null) {
135             if (currentBehavior.state() == state) {
136                 return currentBehavior;
137             }
138             LOG.info("Switching from state " + currentBehavior.state() + " to "
139                 + state);
140
141             try {
142                 currentBehavior.close();
143             } catch (Exception e) {
144                 LOG.error(e, "Failed to close behavior : " + currentBehavior.state());
145             }
146
147         } else {
148             LOG.info("Switching behavior to " + state);
149         }
150         RaftActorBehavior behavior = null;
151         if(state == RaftState.Candidate){
152             behavior = new Candidate(context);
153         } else if(state == RaftState.Follower){
154             behavior = new Follower(context);
155         } else {
156             behavior = new Leader(context);
157         }
158         return behavior;
159     }
160
161     /**
162      * When a derived RaftActor needs to persist something it must call
163      * persistData.
164      *
165      * @param clientActor
166      * @param identifier
167      * @param data
168      */
169     protected void persistData(ActorRef clientActor, String identifier, Object data){
170         LOG.debug("Persist data " + identifier);
171         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
172             context.getReplicatedLog().lastIndex() + 1,
173             context.getTermInformation().getCurrentTerm(), data);
174
175         replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry);
176     }
177
178     protected abstract void applyState(ActorRef clientActor, String identifier, Object data);
179
180     protected String getId(){
181         return context.getId();
182     }
183
184     protected boolean isLeader(){
185         return context.getId().equals(currentBehavior.getLeaderId());
186     }
187
188     protected ActorSelection getLeader(){
189         String leaderId = currentBehavior.getLeaderId();
190         String peerAddress = context.getPeerAddress(leaderId);
191         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress);
192         return context.actorSelection(peerAddress);
193     }
194
195     private class ReplicatedLogImpl implements ReplicatedLog {
196         private final List<ReplicatedLogEntry> journal = new ArrayList();
197         private long snapshotIndex = 0;
198         private Object snapShot = null;
199
200
201         @Override public ReplicatedLogEntry get(long index) {
202             if(index < 0 || index >= journal.size()){
203                 return null;
204             }
205
206             return journal.get((int) (index - snapshotIndex));
207         }
208
209         @Override public ReplicatedLogEntry last() {
210             if(journal.size() == 0){
211                 return null;
212             }
213             return get(journal.size() - 1);
214         }
215
216         @Override public long lastIndex() {
217             if(journal.size() == 0){
218                 return -1;
219             }
220
221             return last().getIndex();
222         }
223
224         @Override public long lastTerm() {
225             if(journal.size() == 0){
226                 return -1;
227             }
228
229             return last().getTerm();
230         }
231
232
233         @Override public void removeFrom(long index) {
234             if(index < 0 || index >= journal.size()){
235                 return;
236             }
237             for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){
238                 deleteMessage(i);
239                 journal.remove(i);
240             }
241         }
242
243         @Override public void append(final ReplicatedLogEntry replicatedLogEntry) {
244             journal.add(replicatedLogEntry);
245         }
246
247         @Override public List<ReplicatedLogEntry> getFrom(long index) {
248             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
249             if(index < 0 || index >= journal.size()){
250                 return entries;
251             }
252             for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){
253                 entries.add(journal.get(i));
254             }
255             return entries;
256         }
257
258         @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){
259             appendAndPersist(null, null, replicatedLogEntry);
260         }
261
262         public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){
263             context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex());
264             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
265             journal.add(replicatedLogEntry);
266             persist(replicatedLogEntry,
267                 new Procedure<ReplicatedLogEntry>() {
268                     public void apply(ReplicatedLogEntry evt) throws Exception {
269                         // Send message for replication
270                         if(clientActor != null) {
271                             currentBehavior.handleMessage(getSelf(),
272                                 new Replicate(clientActor, identifier,
273                                     replicatedLogEntry));
274                         }
275                     }
276                 });
277         }
278
279         @Override public long size() {
280             return journal.size() + snapshotIndex;
281         }
282     }
283
284     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
285         Serializable {
286
287         private final long index;
288         private final long term;
289         private final Object payload;
290
291         public ReplicatedLogImplEntry(long index, long term, Object payload){
292
293             this.index = index;
294             this.term = term;
295             this.payload = payload;
296         }
297
298         @Override public Object getData() {
299             return payload;
300         }
301
302         @Override public long getTerm() {
303             return term;
304         }
305
306         @Override public long getIndex() {
307             return index;
308         }
309     }
310
311
312 }