Test RaftActor using a simple program
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import org.opendaylight.controller.cluster.raft.RaftActorContext;
13 import org.opendaylight.controller.cluster.raft.RaftState;
14 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
15 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
18 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
19
20 /**
21  * The behavior of a RaftActor in the Follower state
22  *
23  * <ul>
24  * <li> Respond to RPCs from candidates and leaders
25  * <li> If election timeout elapses without receiving AppendEntries
26  * RPC from current leader or granting vote to candidate:
27  * convert to candidate
28  * </ul>
29  *
30  */
31 public class Follower extends AbstractRaftActorBehavior {
32     public Follower(RaftActorContext context) {
33         super(context);
34
35         scheduleElection(electionDuration());
36     }
37
38     @Override protected RaftState handleAppendEntries(ActorRef sender,
39         AppendEntries appendEntries, RaftState suggestedState) {
40
41         // If we got here then we do appear to be talking to the leader
42         leaderId = appendEntries.getLeaderId();
43
44         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
45         // whose term matches prevLogTerm (§5.3)
46         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
47             .get(appendEntries.getPrevLogIndex());
48
49
50         if (lastIndex() > -1 && previousEntry != null
51             && previousEntry.getTerm() != appendEntries
52             .getPrevLogTerm()) {
53
54             context.getLogger().debug(
55                 "Cannot append entries because previous entry term "
56                     + previousEntry.getTerm()
57                     + " is not equal to append entries prevLogTerm "
58                     + appendEntries.getPrevLogTerm());
59
60             sender.tell(
61                 new AppendEntriesReply(context.getId(), currentTerm(), false,
62                     lastIndex(), lastTerm()), actor()
63             );
64             return state();
65         }
66
67         if (appendEntries.getEntries() != null
68             && appendEntries.getEntries().size() > 0) {
69             context.getLogger().debug(
70                 "Number of entries to be appended = " + appendEntries
71                     .getEntries().size());
72
73             // 3. If an existing entry conflicts with a new one (same index
74             // but different terms), delete the existing entry and all that
75             // follow it (§5.3)
76             int addEntriesFrom = 0;
77             if (context.getReplicatedLog().size() > 0) {
78                 for (int i = 0;
79                      i < appendEntries.getEntries()
80                          .size(); i++, addEntriesFrom++) {
81                     ReplicatedLogEntry matchEntry =
82                         appendEntries.getEntries().get(i);
83                     ReplicatedLogEntry newEntry = context.getReplicatedLog()
84                         .get(matchEntry.getIndex());
85
86                     if (newEntry == null) {
87                         //newEntry not found in the log
88                         break;
89                     }
90
91                     if (newEntry != null && newEntry.getTerm() == matchEntry
92                         .getTerm()) {
93                         continue;
94                     }
95                     if (newEntry != null && newEntry.getTerm() != matchEntry
96                         .getTerm()) {
97                         context.getLogger().debug(
98                             "Removing entries from log starting at "
99                                 + matchEntry.getIndex());
100                         context.getReplicatedLog()
101                             .removeFrom(matchEntry.getIndex());
102                         break;
103                     }
104                 }
105             }
106
107             context.getLogger().debug(
108                 "After cleanup entries to be added from = " + (addEntriesFrom
109                     + lastIndex()));
110
111             // 4. Append any new entries not already in the log
112             for (int i = addEntriesFrom;
113                  i < appendEntries.getEntries().size(); i++) {
114                 context.getLogger().debug(
115                     "Append entry to log " + appendEntries.getEntries().get(i)
116                         .toString());
117                 context.getReplicatedLog()
118                     .appendAndPersist(appendEntries.getEntries().get(i));
119             }
120
121             context.getLogger().debug(
122                 "Log size is now " + context.getReplicatedLog().size());
123         }
124
125
126         // 5. If leaderCommit > commitIndex, set commitIndex =
127         // min(leaderCommit, index of last new entry)
128
129         long prevCommitIndex = context.getCommitIndex();
130
131         context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
132             context.getReplicatedLog().lastIndex()));
133
134         if (prevCommitIndex != context.getCommitIndex()) {
135             context.getLogger()
136                 .debug("Commit index set to " + context.getCommitIndex());
137         }
138
139         // If commitIndex > lastApplied: increment lastApplied, apply
140         // log[lastApplied] to state machine (§5.3)
141         if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
142             applyLogToStateMachine(appendEntries.getLeaderCommit());
143         }
144
145         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
146             lastIndex(), lastTerm()), actor());
147
148         return suggestedState;
149     }
150
151     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
152         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
153         return suggestedState;
154     }
155
156     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
157         RequestVoteReply requestVoteReply, RaftState suggestedState) {
158         return suggestedState;
159     }
160
161     @Override public RaftState state() {
162         return RaftState.Follower;
163     }
164
165     @Override public RaftState handleMessage(ActorRef sender, Object message) {
166         if(message instanceof ElectionTimeout){
167             return RaftState.Candidate;
168         }
169
170         scheduleElection(electionDuration());
171
172         return super.handleMessage(sender, message);
173     }
174
175     @Override public void close() throws Exception {
176         stopElection();
177     }
178 }