Test driver and changes related to it
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import scala.concurrent.duration.FiniteDuration;
32
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40
41 /**
42  * The behavior of a RaftActor when it is in the Leader state
43  * <p/>
44  * Leaders:
45  * <ul>
46  * <li> Upon election: send initial empty AppendEntries RPCs
47  * (heartbeat) to each server; repeat during idle periods to
48  * prevent election timeouts (§5.2)
49  * <li> If command received from client: append entry to local log,
50  * respond after entry applied to state machine (§5.3)
51  * <li> If last log index ≥ nextIndex for a follower: send
52  * AppendEntries RPC with log entries starting at nextIndex
53  * <ul>
54  * <li> If successful: update nextIndex and matchIndex for
55  * follower (§5.3)
56  * <li> If AppendEntries fails because of log inconsistency:
57  * decrement nextIndex and retry (§5.3)
58  * </ul>
59  * <li> If there exists an N such that N > commitIndex, a majority
60  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
61  * set commitIndex = N (§5.3, §5.4).
62  */
63 public class Leader extends AbstractRaftActorBehavior {
64
65
66     private final Map<String, FollowerLogInformation> followerToLog =
67         new HashMap();
68
69     private final Map<String, ActorSelection> followerToActor = new HashMap<>();
70
71     private Cancellable heartbeatSchedule = null;
72     private Cancellable appendEntriesSchedule = null;
73     private Cancellable installSnapshotSchedule = null;
74
75     private List<ClientRequestTracker> trackerList = new ArrayList<>();
76
77     private final int minReplicationCount;
78
79     public Leader(RaftActorContext context) {
80         super(context);
81
82         if (lastIndex() >= 0) {
83             context.setCommitIndex(lastIndex());
84         }
85
86         for (String followerId : context.getPeerAddresses().keySet()) {
87             FollowerLogInformation followerLogInformation =
88                 new FollowerLogInformationImpl(followerId,
89                     new AtomicLong(lastIndex()),
90                     new AtomicLong(-1));
91
92             followerToActor.put(followerId,
93                 context.actorSelection(context.getPeerAddress(followerId)));
94
95             followerToLog.put(followerId, followerLogInformation);
96         }
97
98         context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
99
100         if (followerToActor.size() > 0) {
101             minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
102         } else {
103             minReplicationCount = 0;
104         }
105
106
107         // Immediately schedule a heartbeat
108         // Upon election: send initial empty AppendEntries RPCs
109         // (heartbeat) to each server; repeat during idle periods to
110         // prevent election timeouts (§5.2)
111         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
112
113         scheduleInstallSnapshotCheck(
114             new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
115                 HEART_BEAT_INTERVAL.unit())
116         );
117
118     }
119
120     @Override protected RaftState handleAppendEntries(ActorRef sender,
121         AppendEntries appendEntries, RaftState suggestedState) {
122
123         context.getLogger()
124             .error("An unexpected AppendEntries received in state " + state());
125
126         return suggestedState;
127     }
128
129     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
130         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
131
132         // Do not take any other action since a behavior change is coming
133         if (suggestedState != state())
134             return suggestedState;
135
136         // Update the FollowerLogInformation
137         String followerId = appendEntriesReply.getFollowerId();
138         FollowerLogInformation followerLogInformation =
139             followerToLog.get(followerId);
140         if (appendEntriesReply.isSuccess()) {
141             followerLogInformation
142                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
143             followerLogInformation
144                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
145         } else {
146             followerLogInformation.decrNextIndex();
147         }
148
149         // Now figure out if this reply warrants a change in the commitIndex
150         // If there exists an N such that N > commitIndex, a majority
151         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
152         // set commitIndex = N (§5.3, §5.4).
153         for (long N = context.getCommitIndex() + 1; ; N++) {
154             int replicatedCount = 1;
155
156             for (FollowerLogInformation info : followerToLog.values()) {
157                 if (info.getMatchIndex().get() >= N) {
158                     replicatedCount++;
159                 }
160             }
161
162             if (replicatedCount >= minReplicationCount) {
163                 ReplicatedLogEntry replicatedLogEntry =
164                     context.getReplicatedLog().get(N);
165                 if (replicatedLogEntry != null
166                     && replicatedLogEntry.getTerm()
167                     == currentTerm()) {
168                     context.setCommitIndex(N);
169                 }
170             } else {
171                 break;
172             }
173         }
174
175         // Apply the change to the state machine
176         if (context.getCommitIndex() > context.getLastApplied()) {
177             applyLogToStateMachine(context.getCommitIndex());
178         }
179
180         return suggestedState;
181     }
182
183     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
184         for (ClientRequestTracker tracker : trackerList) {
185             if (tracker.getIndex() == logIndex) {
186                 return tracker;
187             }
188         }
189
190         return null;
191     }
192
193     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
194         RequestVoteReply requestVoteReply, RaftState suggestedState) {
195         return suggestedState;
196     }
197
198     @Override public RaftState state() {
199         return RaftState.Leader;
200     }
201
202     @Override public RaftState handleMessage(ActorRef sender, Object message) {
203         Preconditions.checkNotNull(sender, "sender should not be null");
204
205         try {
206             if (message instanceof SendHeartBeat) {
207                 return sendHeartBeat();
208             } else if(message instanceof SendInstallSnapshot) {
209                 installSnapshotIfNeeded();
210             } else if (message instanceof Replicate) {
211                 replicate((Replicate) message);
212             } else if (message instanceof InstallSnapshotReply){
213                 // FIXME : Should I be checking the term here too?
214                 handleInstallSnapshotReply(
215                     (InstallSnapshotReply) message);
216             }
217         } finally {
218             scheduleHeartBeat(HEART_BEAT_INTERVAL);
219         }
220
221         return super.handleMessage(sender, message);
222     }
223
224     private void handleInstallSnapshotReply(InstallSnapshotReply message) {
225         InstallSnapshotReply reply = message;
226         String followerId = reply.getFollowerId();
227         FollowerLogInformation followerLogInformation =
228             followerToLog.get(followerId);
229
230         followerLogInformation
231             .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
232         followerLogInformation
233             .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
234     }
235
236     private void replicate(Replicate replicate) {
237         long logIndex = replicate.getReplicatedLogEntry().getIndex();
238
239         context.getLogger().debug("Replicate message " + logIndex);
240
241         if (followerToActor.size() == 0) {
242             context.setCommitIndex(
243                 replicate.getReplicatedLogEntry().getIndex());
244
245             context.getActor()
246                 .tell(new ApplyState(replicate.getClientActor(),
247                         replicate.getIdentifier(),
248                         replicate.getReplicatedLogEntry()),
249                     context.getActor()
250                 );
251         } else {
252
253             // Create a tracker entry we will use this later to notify the
254             // client actor
255             trackerList.add(
256                 new ClientRequestTrackerImpl(replicate.getClientActor(),
257                     replicate.getIdentifier(),
258                     logIndex)
259             );
260
261             sendAppendEntries();
262         }
263     }
264
265     private void sendAppendEntries() {
266         // Send an AppendEntries to all followers
267         for (String followerId : followerToActor.keySet()) {
268             ActorSelection followerActor =
269                 followerToActor.get(followerId);
270
271             FollowerLogInformation followerLogInformation =
272                 followerToLog.get(followerId);
273
274             long nextIndex = followerLogInformation.getNextIndex().get();
275
276             List<ReplicatedLogEntry> entries = Collections.emptyList();
277
278             if(context.getReplicatedLog().isPresent(nextIndex)){
279                 entries =
280                     context.getReplicatedLog().getFrom(nextIndex);
281             }
282
283             followerActor.tell(
284                 new AppendEntries(currentTerm(), context.getId(),
285                     prevLogIndex(nextIndex), prevLogTerm(nextIndex),
286                     entries, context.getCommitIndex()
287                 ),
288                 actor()
289             );
290         }
291     }
292
293     private void installSnapshotIfNeeded(){
294         for (String followerId : followerToActor.keySet()) {
295             ActorSelection followerActor =
296                 followerToActor.get(followerId);
297
298             FollowerLogInformation followerLogInformation =
299                 followerToLog.get(followerId);
300
301             long nextIndex = followerLogInformation.getNextIndex().get();
302
303             if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
304                 followerActor.tell(
305                     new InstallSnapshot(currentTerm(), context.getId(),
306                         context.getReplicatedLog().getSnapshotIndex(),
307                         context.getReplicatedLog().getSnapshotTerm(),
308                         context.getReplicatedLog().getSnapshot()
309                     ),
310                     actor()
311                 );
312             }
313         }
314     }
315
316     private RaftState sendHeartBeat() {
317         if (followerToActor.size() > 0) {
318             sendAppendEntries();
319         }
320         return state();
321     }
322
323     private void stopHeartBeat() {
324         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
325             heartbeatSchedule.cancel();
326         }
327     }
328
329     private void stopInstallSnapshotSchedule() {
330         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
331             installSnapshotSchedule.cancel();
332         }
333     }
334
335     private void scheduleHeartBeat(FiniteDuration interval) {
336         if(followerToActor.keySet().size() == 0){
337             // Optimization - do not bother scheduling a heartbeat as there are
338             // no followers
339             return;
340         }
341
342         stopHeartBeat();
343
344         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
345         // message is sent to itself.
346         // Scheduling the heartbeat only once here because heartbeats do not
347         // need to be sent if there are other messages being sent to the remote
348         // actor.
349         heartbeatSchedule =
350             context.getActorSystem().scheduler().scheduleOnce(
351                 interval,
352                 context.getActor(), new SendHeartBeat(),
353                 context.getActorSystem().dispatcher(), context.getActor());
354     }
355
356
357     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
358         if(followerToActor.keySet().size() == 0){
359             // Optimization - do not bother scheduling a heartbeat as there are
360             // no followers
361             return;
362         }
363
364         stopInstallSnapshotSchedule();
365
366         // Schedule a message to send append entries to followers that can
367         // accept an append entries with some data in it
368         installSnapshotSchedule =
369             context.getActorSystem().scheduler().scheduleOnce(
370                 interval,
371                 context.getActor(), new SendInstallSnapshot(),
372                 context.getActorSystem().dispatcher(), context.getActor());
373     }
374
375
376
377     @Override public void close() throws Exception {
378         stopHeartBeat();
379     }
380
381     @Override public String getLeaderId() {
382         return context.getId();
383     }
384
385 }