Optimizations, Monitoring and Logging
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
23 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import scala.concurrent.duration.FiniteDuration;
32
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Set;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 /**
43  * The behavior of a RaftActor when it is in the Leader state
44  * <p/>
45  * Leaders:
46  * <ul>
47  * <li> Upon election: send initial empty AppendEntries RPCs
48  * (heartbeat) to each server; repeat during idle periods to
49  * prevent election timeouts (§5.2)
50  * <li> If command received from client: append entry to local log,
51  * respond after entry applied to state machine (§5.3)
52  * <li> If last log index ≥ nextIndex for a follower: send
53  * AppendEntries RPC with log entries starting at nextIndex
54  * <ul>
55  * <li> If successful: update nextIndex and matchIndex for
56  * follower (§5.3)
57  * <li> If AppendEntries fails because of log inconsistency:
58  * decrement nextIndex and retry (§5.3)
59  * </ul>
60  * <li> If there exists an N such that N > commitIndex, a majority
61  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
62  * set commitIndex = N (§5.3, §5.4).
63  */
64 public class Leader extends AbstractRaftActorBehavior {
65
66
67     private final Map<String, FollowerLogInformation> followerToLog =
68         new HashMap();
69
70     private final Set<String> followers;
71
72     private Cancellable heartbeatSchedule = null;
73     private Cancellable appendEntriesSchedule = null;
74     private Cancellable installSnapshotSchedule = null;
75
76     private List<ClientRequestTracker> trackerList = new ArrayList<>();
77
78     private final int minReplicationCount;
79
80     public Leader(RaftActorContext context) {
81         super(context);
82
83         if (lastIndex() >= 0) {
84             context.setCommitIndex(lastIndex());
85         }
86
87         followers = context.getPeerAddresses().keySet();
88
89         for (String followerId : followers) {
90             FollowerLogInformation followerLogInformation =
91                 new FollowerLogInformationImpl(followerId,
92                     new AtomicLong(lastIndex()),
93                     new AtomicLong(-1));
94
95             followerToLog.put(followerId, followerLogInformation);
96         }
97
98         context.getLogger().debug("Election:Leader has following peers:"+ followers);
99
100         if (followers.size() > 0) {
101             minReplicationCount = (followers.size() + 1) / 2 + 1;
102         } else {
103             minReplicationCount = 0;
104         }
105
106
107         // Immediately schedule a heartbeat
108         // Upon election: send initial empty AppendEntries RPCs
109         // (heartbeat) to each server; repeat during idle periods to
110         // prevent election timeouts (§5.2)
111         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
112
113         scheduleInstallSnapshotCheck(
114             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
115                 context.getConfigParams().getHeartBeatInterval().unit())
116         );
117
118     }
119
120     @Override protected RaftState handleAppendEntries(ActorRef sender,
121         AppendEntries appendEntries) {
122
123         context.getLogger().debug(appendEntries.toString());
124
125         return state();
126     }
127
128     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
129         AppendEntriesReply appendEntriesReply) {
130
131         if(! appendEntriesReply.isSuccess()) {
132             context.getLogger()
133                 .debug(appendEntriesReply.toString());
134         }
135
136         // Update the FollowerLogInformation
137         String followerId = appendEntriesReply.getFollowerId();
138         FollowerLogInformation followerLogInformation =
139             followerToLog.get(followerId);
140
141         if(followerLogInformation == null){
142             context.getLogger().error("Unknown follower {}", followerId);
143             return state();
144         }
145
146         if (appendEntriesReply.isSuccess()) {
147             followerLogInformation
148                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
149             followerLogInformation
150                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
151         } else {
152
153             // TODO: When we find that the follower is out of sync with the
154             // Leader we simply decrement that followers next index by 1.
155             // Would it be possible to do better than this? The RAFT spec
156             // does not explicitly deal with it but may be something for us to
157             // think about
158
159             followerLogInformation.decrNextIndex();
160         }
161
162         // Now figure out if this reply warrants a change in the commitIndex
163         // If there exists an N such that N > commitIndex, a majority
164         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
165         // set commitIndex = N (§5.3, §5.4).
166         for (long N = context.getCommitIndex() + 1; ; N++) {
167             int replicatedCount = 1;
168
169             for (FollowerLogInformation info : followerToLog.values()) {
170                 if (info.getMatchIndex().get() >= N) {
171                     replicatedCount++;
172                 }
173             }
174
175             if (replicatedCount >= minReplicationCount) {
176                 ReplicatedLogEntry replicatedLogEntry =
177                     context.getReplicatedLog().get(N);
178                 if (replicatedLogEntry != null
179                     && replicatedLogEntry.getTerm()
180                     == currentTerm()) {
181                     context.setCommitIndex(N);
182                 }
183             } else {
184                 break;
185             }
186         }
187
188         // Apply the change to the state machine
189         if (context.getCommitIndex() > context.getLastApplied()) {
190             applyLogToStateMachine(context.getCommitIndex());
191         }
192
193         return state();
194     }
195
196     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
197         for (ClientRequestTracker tracker : trackerList) {
198             if (tracker.getIndex() == logIndex) {
199                 return tracker;
200             }
201         }
202
203         return null;
204     }
205
206     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
207         RequestVoteReply requestVoteReply) {
208         return state();
209     }
210
211     @Override public RaftState state() {
212         return RaftState.Leader;
213     }
214
215     @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
216         Preconditions.checkNotNull(sender, "sender should not be null");
217
218         Object message = fromSerializableMessage(originalMessage);
219
220         if (message instanceof RaftRPC) {
221             RaftRPC rpc = (RaftRPC) message;
222             // If RPC request or response contains term T > currentTerm:
223             // set currentTerm = T, convert to follower (§5.1)
224             // This applies to all RPC messages and responses
225             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
226                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
227                 return RaftState.Follower;
228             }
229         }
230
231         try {
232             if (message instanceof SendHeartBeat) {
233                 return sendHeartBeat();
234             } else if(message instanceof SendInstallSnapshot) {
235                 installSnapshotIfNeeded();
236             } else if (message instanceof Replicate) {
237                 replicate((Replicate) message);
238             } else if (message instanceof InstallSnapshotReply){
239                 handleInstallSnapshotReply(
240                     (InstallSnapshotReply) message);
241             }
242         } finally {
243             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
244         }
245
246         return super.handleMessage(sender, message);
247     }
248
249     private void handleInstallSnapshotReply(InstallSnapshotReply message) {
250         InstallSnapshotReply reply = message;
251         String followerId = reply.getFollowerId();
252         FollowerLogInformation followerLogInformation =
253             followerToLog.get(followerId);
254
255         followerLogInformation
256             .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
257         followerLogInformation
258             .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
259     }
260
261     private void replicate(Replicate replicate) {
262         long logIndex = replicate.getReplicatedLogEntry().getIndex();
263
264         context.getLogger().debug("Replicate message " + logIndex);
265
266         // Create a tracker entry we will use this later to notify the
267         // client actor
268         trackerList.add(
269             new ClientRequestTrackerImpl(replicate.getClientActor(),
270                 replicate.getIdentifier(),
271                 logIndex)
272         );
273
274         if (followers.size() == 0) {
275             context.setCommitIndex(logIndex);
276             applyLogToStateMachine(logIndex);
277         } else {
278             sendAppendEntries();
279         }
280     }
281
282     private void sendAppendEntries() {
283         // Send an AppendEntries to all followers
284         for (String followerId : followers) {
285             ActorSelection followerActor =
286                 context.getPeerActorSelection(followerId);
287
288             if (followerActor != null) {
289                 FollowerLogInformation followerLogInformation =
290                     followerToLog.get(followerId);
291
292                 long nextIndex = followerLogInformation.getNextIndex().get();
293
294                 List<ReplicatedLogEntry> entries = Collections.emptyList();
295
296                 if (context.getReplicatedLog().isPresent(nextIndex)) {
297                     // FIXME : Sending one entry at a time
298                     entries =
299                         context.getReplicatedLog().getFrom(nextIndex, 1);
300                 }
301
302                 followerActor.tell(
303                     new AppendEntries(currentTerm(), context.getId(),
304                         prevLogIndex(nextIndex),
305                         prevLogTerm(nextIndex), entries,
306                         context.getCommitIndex()).toSerializable(),
307                     actor()
308                 );
309             }
310         }
311     }
312
313     /**
314      * An installSnapshot is scheduled at a interval that is a multiple of
315      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
316      * snapshots at every heartbeat.
317      */
318     private void installSnapshotIfNeeded(){
319         for (String followerId : followers) {
320             ActorSelection followerActor =
321                 context.getPeerActorSelection(followerId);
322
323             if(followerActor != null) {
324                 FollowerLogInformation followerLogInformation =
325                     followerToLog.get(followerId);
326
327                 long nextIndex = followerLogInformation.getNextIndex().get();
328
329                 if (!context.getReplicatedLog().isPresent(nextIndex) && context
330                     .getReplicatedLog().isInSnapshot(nextIndex)) {
331                     followerActor.tell(
332                         new InstallSnapshot(currentTerm(), context.getId(),
333                             context.getReplicatedLog().getSnapshotIndex(),
334                             context.getReplicatedLog().getSnapshotTerm(),
335                             context.getReplicatedLog().getSnapshot()
336                         ),
337                         actor()
338                     );
339                 }
340             }
341         }
342     }
343
344     private RaftState sendHeartBeat() {
345         if (followers.size() > 0) {
346             sendAppendEntries();
347         }
348         return state();
349     }
350
351     private void stopHeartBeat() {
352         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
353             heartbeatSchedule.cancel();
354         }
355     }
356
357     private void stopInstallSnapshotSchedule() {
358         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
359             installSnapshotSchedule.cancel();
360         }
361     }
362
363     private void scheduleHeartBeat(FiniteDuration interval) {
364         if(followers.size() == 0){
365             // Optimization - do not bother scheduling a heartbeat as there are
366             // no followers
367             return;
368         }
369
370         stopHeartBeat();
371
372         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
373         // message is sent to itself.
374         // Scheduling the heartbeat only once here because heartbeats do not
375         // need to be sent if there are other messages being sent to the remote
376         // actor.
377         heartbeatSchedule =
378             context.getActorSystem().scheduler().scheduleOnce(
379                 interval,
380                 context.getActor(), new SendHeartBeat(),
381                 context.getActorSystem().dispatcher(), context.getActor());
382     }
383
384
385     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
386         if(followers.size() == 0){
387             // Optimization - do not bother scheduling a heartbeat as there are
388             // no followers
389             return;
390         }
391
392         stopInstallSnapshotSchedule();
393
394         // Schedule a message to send append entries to followers that can
395         // accept an append entries with some data in it
396         installSnapshotSchedule =
397             context.getActorSystem().scheduler().scheduleOnce(
398                 interval,
399                 context.getActor(), new SendInstallSnapshot(),
400                 context.getActorSystem().dispatcher(), context.getActor());
401     }
402
403
404
405     @Override public void close() throws Exception {
406         stopHeartBeat();
407     }
408
409     @Override public String getLeaderId() {
410         return context.getId();
411     }
412
413 }