1cfc2e0eb9565a04a451d9301b9f6a988961983d
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import org.opendaylight.controller.cluster.raft.RaftActorContext;
13 import org.opendaylight.controller.cluster.raft.RaftState;
14 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
15 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
16 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
19 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21
22 /**
23  * The behavior of a RaftActor in the Follower state
24  *
25  * <ul>
26  * <li> Respond to RPCs from candidates and leaders
27  * <li> If election timeout elapses without receiving AppendEntries
28  * RPC from current leader or granting vote to candidate:
29  * convert to candidate
30  * </ul>
31  *
32  */
33 public class Follower extends AbstractRaftActorBehavior {
34     public Follower(RaftActorContext context) {
35         super(context);
36
37         scheduleElection(electionDuration());
38     }
39
40     @Override protected RaftState handleAppendEntries(ActorRef sender,
41         AppendEntries appendEntries, RaftState suggestedState) {
42
43         // If we got here then we do appear to be talking to the leader
44         leaderId = appendEntries.getLeaderId();
45
46         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
47         // whose term matches prevLogTerm (§5.3)
48         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
49             .get(appendEntries.getPrevLogIndex());
50
51
52         if (lastIndex() > -1 && previousEntry != null
53             && previousEntry.getTerm() != appendEntries
54             .getPrevLogTerm()) {
55
56             context.getLogger().debug(
57                 "Cannot append entries because previous entry term "
58                     + previousEntry.getTerm()
59                     + " is not equal to append entries prevLogTerm "
60                     + appendEntries.getPrevLogTerm());
61
62             sender.tell(
63                 new AppendEntriesReply(context.getId(), currentTerm(), false,
64                     lastIndex(), lastTerm()), actor()
65             );
66             return state();
67         }
68
69         if (appendEntries.getEntries() != null
70             && appendEntries.getEntries().size() > 0) {
71             context.getLogger().debug(
72                 "Number of entries to be appended = " + appendEntries
73                     .getEntries().size());
74
75             // 3. If an existing entry conflicts with a new one (same index
76             // but different terms), delete the existing entry and all that
77             // follow it (§5.3)
78             int addEntriesFrom = 0;
79             if (context.getReplicatedLog().size() > 0) {
80                 for (int i = 0;
81                      i < appendEntries.getEntries()
82                          .size(); i++, addEntriesFrom++) {
83                     ReplicatedLogEntry matchEntry =
84                         appendEntries.getEntries().get(i);
85                     ReplicatedLogEntry newEntry = context.getReplicatedLog()
86                         .get(matchEntry.getIndex());
87
88                     if (newEntry == null) {
89                         //newEntry not found in the log
90                         break;
91                     }
92
93                     if (newEntry != null && newEntry.getTerm() == matchEntry
94                         .getTerm()) {
95                         continue;
96                     }
97                     if (newEntry != null && newEntry.getTerm() != matchEntry
98                         .getTerm()) {
99                         context.getLogger().debug(
100                             "Removing entries from log starting at "
101                                 + matchEntry.getIndex());
102                         context.getReplicatedLog()
103                             .removeFrom(matchEntry.getIndex());
104                         break;
105                     }
106                 }
107             }
108
109             context.getLogger().debug(
110                 "After cleanup entries to be added from = " + (addEntriesFrom
111                     + lastIndex()));
112
113             // 4. Append any new entries not already in the log
114             for (int i = addEntriesFrom;
115                  i < appendEntries.getEntries().size(); i++) {
116                 context.getLogger().debug(
117                     "Append entry to log " + appendEntries.getEntries().get(i)
118                         .toString());
119                 context.getReplicatedLog()
120                     .appendAndPersist(appendEntries.getEntries().get(i));
121             }
122
123             context.getLogger().debug(
124                 "Log size is now " + context.getReplicatedLog().size());
125         }
126
127
128         // 5. If leaderCommit > commitIndex, set commitIndex =
129         // min(leaderCommit, index of last new entry)
130
131         long prevCommitIndex = context.getCommitIndex();
132
133         context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
134             context.getReplicatedLog().lastIndex()));
135
136         if (prevCommitIndex != context.getCommitIndex()) {
137             context.getLogger()
138                 .debug("Commit index set to " + context.getCommitIndex());
139         }
140
141         // If commitIndex > lastApplied: increment lastApplied, apply
142         // log[lastApplied] to state machine (§5.3)
143         if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
144             applyLogToStateMachine(appendEntries.getLeaderCommit());
145         }
146
147         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
148             lastIndex(), lastTerm()), actor());
149
150         return suggestedState;
151     }
152
153     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
154         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
155         return suggestedState;
156     }
157
158     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
159         RequestVoteReply requestVoteReply, RaftState suggestedState) {
160         return suggestedState;
161     }
162
163     @Override public RaftState state() {
164         return RaftState.Follower;
165     }
166
167     @Override public RaftState handleMessage(ActorRef sender, Object message) {
168         if(message instanceof ElectionTimeout){
169             return RaftState.Candidate;
170         } else if(message instanceof InstallSnapshot){
171             InstallSnapshot snapshot = (InstallSnapshot) message;
172             actor().tell(new ApplySnapshot(snapshot), actor());
173         }
174
175         scheduleElection(electionDuration());
176
177         return super.handleMessage(sender, message);
178     }
179
180     @Override public void close() throws Exception {
181         stopElection();
182     }
183 }