Merge "Initial clustering feature"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import com.google.protobuf.ByteString;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
16 import org.opendaylight.controller.cluster.raft.Snapshot;
17 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
18 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
21 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
22 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
23 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
24 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
25
26 import java.util.ArrayList;
27
28 /**
29  * The behavior of a RaftActor in the Follower state
30  * <p/>
31  * <ul>
32  * <li> Respond to RPCs from candidates and leaders
33  * <li> If election timeout elapses without receiving AppendEntries
34  * RPC from current leader or granting vote to candidate:
35  * convert to candidate
36  * </ul>
37  */
38 public class Follower extends AbstractRaftActorBehavior {
39     private ByteString snapshotChunksCollected = ByteString.EMPTY;
40
41     public Follower(RaftActorContext context) {
42         super(context);
43
44         scheduleElection(electionDuration());
45     }
46
47     @Override protected RaftState handleAppendEntries(ActorRef sender,
48         AppendEntries appendEntries) {
49
50         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
51             context.getLogger()
52                 .debug(appendEntries.toString());
53         }
54
55         // TODO : Refactor this method into a bunch of smaller methods
56         // to make it easier to read. Before refactoring ensure tests
57         // cover the code properly
58
59         // 1. Reply false if term < currentTerm (§5.1)
60         // This is handled in the appendEntries method of the base class
61
62         // If we got here then we do appear to be talking to the leader
63         leaderId = appendEntries.getLeaderId();
64
65         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
66         // whose term matches prevLogTerm (§5.3)
67
68         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
69             .get(appendEntries.getPrevLogIndex());
70
71
72         boolean outOfSync = true;
73
74         // First check if the logs are in sync or not
75         if (lastIndex() == -1
76             && appendEntries.getPrevLogIndex() != -1) {
77
78             // The follower's log is out of sync because the leader does have
79             // an entry at prevLogIndex and this follower has no entries in
80             // it's log.
81
82             context.getLogger().debug(
83                 "The followers log is empty and the senders prevLogIndex is {}",
84                 appendEntries.getPrevLogIndex());
85
86         } else if (lastIndex() > -1
87             && appendEntries.getPrevLogIndex() != -1
88             && previousEntry == null) {
89
90             // The follower's log is out of sync because the Leader's
91             // prevLogIndex entry was not found in it's log
92
93             context.getLogger().debug(
94                 "The log is not empty but the prevLogIndex {} was not found in it",
95                 appendEntries.getPrevLogIndex());
96
97         } else if (lastIndex() > -1
98             && previousEntry != null
99             && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
100
101             // The follower's log is out of sync because the Leader's
102             // prevLogIndex entry does exist in the follower's log but it has
103             // a different term in it
104
105             context.getLogger().debug(
106                 "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
107                 , previousEntry.getTerm()
108                 , appendEntries.getPrevLogTerm());
109         } else {
110             outOfSync = false;
111         }
112
113         if (outOfSync) {
114             // We found that the log was out of sync so just send a negative
115             // reply and return
116             context.getLogger().debug("Follower is out-of-sync, " +
117                 "so sending negative reply, lastIndex():{}, lastTerm():{}",
118                 lastIndex(), lastTerm());
119             sender.tell(
120                 new AppendEntriesReply(context.getId(), currentTerm(), false,
121                     lastIndex(), lastTerm()), actor()
122             );
123             return state();
124         }
125
126         if (appendEntries.getEntries() != null
127             && appendEntries.getEntries().size() > 0) {
128             context.getLogger().debug(
129                 "Number of entries to be appended = " + appendEntries
130                     .getEntries().size()
131             );
132
133             // 3. If an existing entry conflicts with a new one (same index
134             // but different terms), delete the existing entry and all that
135             // follow it (§5.3)
136             int addEntriesFrom = 0;
137             if (context.getReplicatedLog().size() > 0) {
138
139                 // Find the entry up until which the one that is not in the follower's log
140                 for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
141                     ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
142                     ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
143
144                     if (newEntry == null) {
145                         //newEntry not found in the log
146                         break;
147                     }
148
149                     if (newEntry.getTerm() == matchEntry
150                         .getTerm()) {
151                         continue;
152                     }
153
154                     context.getLogger().debug(
155                         "Removing entries from log starting at "
156                             + matchEntry.getIndex()
157                     );
158
159                     // Entries do not match so remove all subsequent entries
160                     context.getReplicatedLog()
161                         .removeFromAndPersist(matchEntry.getIndex());
162                     break;
163                 }
164             }
165
166             context.getLogger().debug(
167                 "After cleanup entries to be added from = " + (addEntriesFrom
168                     + lastIndex())
169             );
170
171             // 4. Append any new entries not already in the log
172             for (int i = addEntriesFrom;
173                  i < appendEntries.getEntries().size(); i++) {
174
175                 context.getLogger().info(
176                     "Append entry to log " + appendEntries.getEntries().get(
177                         i).getData()
178                         .toString()
179                 );
180                 context.getReplicatedLog()
181                     .appendAndPersist(appendEntries.getEntries().get(i));
182             }
183
184             context.getLogger().debug(
185                 "Log size is now " + context.getReplicatedLog().size());
186         }
187
188
189         // 5. If leaderCommit > commitIndex, set commitIndex =
190         // min(leaderCommit, index of last new entry)
191
192         long prevCommitIndex = context.getCommitIndex();
193
194         context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
195             context.getReplicatedLog().lastIndex()));
196
197         if (prevCommitIndex != context.getCommitIndex()) {
198             context.getLogger()
199                 .debug("Commit index set to " + context.getCommitIndex());
200         }
201
202         // If commitIndex > lastApplied: increment lastApplied, apply
203         // log[lastApplied] to state machine (§5.3)
204         // check if there are any entries to be applied. last-applied can be equal to last-index
205         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
206             context.getLastApplied() < lastIndex()) {
207             context.getLogger().debug("applyLogToStateMachine, " +
208                 "appendEntries.getLeaderCommit():{}," +
209                 "context.getLastApplied():{}, lastIndex():{}",
210                 appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
211             applyLogToStateMachine(appendEntries.getLeaderCommit());
212         }
213
214         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
215             lastIndex(), lastTerm()), actor());
216
217         return state();
218     }
219
220     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
221         AppendEntriesReply appendEntriesReply) {
222         return state();
223     }
224
225     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
226         RequestVoteReply requestVoteReply) {
227         return state();
228     }
229
230     @Override public RaftState state() {
231         return RaftState.Follower;
232     }
233
234     @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
235
236         Object message = fromSerializableMessage(originalMessage);
237
238         if (message instanceof RaftRPC) {
239             RaftRPC rpc = (RaftRPC) message;
240             // If RPC request or response contains term T > currentTerm:
241             // set currentTerm = T, convert to follower (§5.1)
242             // This applies to all RPC messages and responses
243             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
244                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
245             }
246         }
247
248         if (message instanceof ElectionTimeout) {
249             return RaftState.Candidate;
250
251         } else if (message instanceof InstallSnapshot) {
252             InstallSnapshot installSnapshot = (InstallSnapshot) message;
253             handleInstallSnapshot(sender, installSnapshot);
254         }
255
256         scheduleElection(electionDuration());
257
258         return super.handleMessage(sender, message);
259     }
260
261     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
262         context.getLogger().debug("InstallSnapshot received by follower " +
263             "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
264             installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
265
266         try {
267             if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
268                 // this is the last chunk, create a snapshot object and apply
269
270                 snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
271                 context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
272                     snapshotChunksCollected.size());
273
274                 Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
275                     new ArrayList<ReplicatedLogEntry>(),
276                     installSnapshot.getLastIncludedIndex(),
277                     installSnapshot.getLastIncludedTerm(),
278                     installSnapshot.getLastIncludedIndex(),
279                     installSnapshot.getLastIncludedTerm());
280
281                 actor().tell(new ApplySnapshot(snapshot), actor());
282
283             } else {
284                 // we have more to go
285                 snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
286                 context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
287                     installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
288             }
289
290             sender.tell(new InstallSnapshotReply(
291                 currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
292                 true), actor());
293
294         } catch (Exception e) {
295             context.getLogger().error("Exception in InstallSnapshot of follower", e);
296             //send reply with success as false. The chunk will be sent again on failure
297             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
298                 installSnapshot.getChunkIndex(), false), actor());
299         }
300     }
301
302     @Override public void close() throws Exception {
303         stopElection();
304     }
305 }