Merge "BUG 2221 : Add metering to ShardTransaction actor"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import com.google.protobuf.ByteString;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
33
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 /**
45  * The behavior of a RaftActor when it is in the Leader state
46  * <p/>
47  * Leaders:
48  * <ul>
49  * <li> Upon election: send initial empty AppendEntries RPCs
50  * (heartbeat) to each server; repeat during idle periods to
51  * prevent election timeouts (§5.2)
52  * <li> If command received from client: append entry to local log,
53  * respond after entry applied to state machine (§5.3)
54  * <li> If last log index ≥ nextIndex for a follower: send
55  * AppendEntries RPC with log entries starting at nextIndex
56  * <ul>
57  * <li> If successful: update nextIndex and matchIndex for
58  * follower (§5.3)
59  * <li> If AppendEntries fails because of log inconsistency:
60  * decrement nextIndex and retry (§5.3)
61  * </ul>
62  * <li> If there exists an N such that N > commitIndex, a majority
63  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
64  * set commitIndex = N (§5.3, §5.4).
65  */
66 public class Leader extends AbstractRaftActorBehavior {
67
68
69     protected final Map<String, FollowerLogInformation> followerToLog =
70         new HashMap();
71     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
72
73     private final Set<String> followers;
74
75     private Cancellable heartbeatSchedule = null;
76     private Cancellable appendEntriesSchedule = null;
77     private Cancellable installSnapshotSchedule = null;
78
79     private List<ClientRequestTracker> trackerList = new ArrayList<>();
80
81     private final int minReplicationCount;
82
83     public Leader(RaftActorContext context) {
84         super(context);
85
86         followers = context.getPeerAddresses().keySet();
87
88         for (String followerId : followers) {
89             FollowerLogInformation followerLogInformation =
90                 new FollowerLogInformationImpl(followerId,
91                     new AtomicLong(context.getCommitIndex()),
92                     new AtomicLong(-1));
93
94             followerToLog.put(followerId, followerLogInformation);
95         }
96
97         if(LOG.isDebugEnabled()) {
98             LOG.debug("Election:Leader has following peers: {}", followers);
99         }
100
101         if (followers.size() > 0) {
102             minReplicationCount = (followers.size() + 1) / 2 + 1;
103         } else {
104             minReplicationCount = 0;
105         }
106
107
108         // Immediately schedule a heartbeat
109         // Upon election: send initial empty AppendEntries RPCs
110         // (heartbeat) to each server; repeat during idle periods to
111         // prevent election timeouts (§5.2)
112         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
113
114         scheduleInstallSnapshotCheck(
115             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
116                 context.getConfigParams().getHeartBeatInterval().unit())
117         );
118
119     }
120
121     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
122         AppendEntries appendEntries) {
123
124         if(LOG.isDebugEnabled()) {
125             LOG.debug(appendEntries.toString());
126         }
127
128         return this;
129     }
130
131     @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
132         AppendEntriesReply appendEntriesReply) {
133
134         if(! appendEntriesReply.isSuccess()) {
135             if(LOG.isDebugEnabled()) {
136                 LOG.debug(appendEntriesReply.toString());
137             }
138         }
139
140         // Update the FollowerLogInformation
141         String followerId = appendEntriesReply.getFollowerId();
142         FollowerLogInformation followerLogInformation =
143             followerToLog.get(followerId);
144
145         if(followerLogInformation == null){
146             LOG.error("Unknown follower {}", followerId);
147             return this;
148         }
149
150         if (appendEntriesReply.isSuccess()) {
151             followerLogInformation
152                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
153             followerLogInformation
154                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
155         } else {
156
157             // TODO: When we find that the follower is out of sync with the
158             // Leader we simply decrement that followers next index by 1.
159             // Would it be possible to do better than this? The RAFT spec
160             // does not explicitly deal with it but may be something for us to
161             // think about
162
163             followerLogInformation.decrNextIndex();
164         }
165
166         // Now figure out if this reply warrants a change in the commitIndex
167         // If there exists an N such that N > commitIndex, a majority
168         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
169         // set commitIndex = N (§5.3, §5.4).
170         for (long N = context.getCommitIndex() + 1; ; N++) {
171             int replicatedCount = 1;
172
173             for (FollowerLogInformation info : followerToLog.values()) {
174                 if (info.getMatchIndex().get() >= N) {
175                     replicatedCount++;
176                 }
177             }
178
179             if (replicatedCount >= minReplicationCount) {
180                 ReplicatedLogEntry replicatedLogEntry =
181                     context.getReplicatedLog().get(N);
182                 if (replicatedLogEntry != null
183                     && replicatedLogEntry.getTerm()
184                     == currentTerm()) {
185                     context.setCommitIndex(N);
186                 }
187             } else {
188                 break;
189             }
190         }
191
192         // Apply the change to the state machine
193         if (context.getCommitIndex() > context.getLastApplied()) {
194             applyLogToStateMachine(context.getCommitIndex());
195         }
196
197         return this;
198     }
199
200     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
201
202         ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
203         if(toRemove != null) {
204             trackerList.remove(toRemove);
205         }
206
207         return toRemove;
208     }
209
210     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
211         for (ClientRequestTracker tracker : trackerList) {
212             if (tracker.getIndex() == logIndex) {
213                 return tracker;
214             }
215         }
216
217         return null;
218     }
219
220     @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
221         RequestVoteReply requestVoteReply) {
222         return this;
223     }
224
225     @Override public RaftState state() {
226         return RaftState.Leader;
227     }
228
229     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
230         Preconditions.checkNotNull(sender, "sender should not be null");
231
232         Object message = fromSerializableMessage(originalMessage);
233
234         if (message instanceof RaftRPC) {
235             RaftRPC rpc = (RaftRPC) message;
236             // If RPC request or response contains term T > currentTerm:
237             // set currentTerm = T, convert to follower (§5.1)
238             // This applies to all RPC messages and responses
239             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
240                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
241
242                 return switchBehavior(new Follower(context));
243             }
244         }
245
246         try {
247             if (message instanceof SendHeartBeat) {
248                 sendHeartBeat();
249                 return this;
250             } else if(message instanceof SendInstallSnapshot) {
251                 installSnapshotIfNeeded();
252             } else if (message instanceof Replicate) {
253                 replicate((Replicate) message);
254             } else if (message instanceof InstallSnapshotReply){
255                 handleInstallSnapshotReply(
256                     (InstallSnapshotReply) message);
257             }
258         } finally {
259             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
260         }
261
262         return super.handleMessage(sender, message);
263     }
264
265     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
266         String followerId = reply.getFollowerId();
267         FollowerToSnapshot followerToSnapshot =
268             mapFollowerToSnapshot.get(followerId);
269
270         if (followerToSnapshot != null &&
271             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
272
273             if (reply.isSuccess()) {
274                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
275                     //this was the last chunk reply
276                     if(LOG.isDebugEnabled()) {
277                         LOG.debug("InstallSnapshotReply received, " +
278                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
279                             reply.getChunkIndex(), followerId,
280                             context.getReplicatedLog().getSnapshotIndex() + 1
281                         );
282                     }
283
284                     FollowerLogInformation followerLogInformation =
285                         followerToLog.get(followerId);
286                     followerLogInformation.setMatchIndex(
287                         context.getReplicatedLog().getSnapshotIndex());
288                     followerLogInformation.setNextIndex(
289                         context.getReplicatedLog().getSnapshotIndex() + 1);
290                     mapFollowerToSnapshot.remove(followerId);
291
292                     if(LOG.isDebugEnabled()) {
293                         LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
294                             followerToLog.get(followerId).getNextIndex().get());
295                     }
296
297                 } else {
298                     followerToSnapshot.markSendStatus(true);
299                 }
300             } else {
301                 LOG.info("InstallSnapshotReply received, " +
302                         "sending snapshot chunk failed, Will retry, Chunk:{}",
303                     reply.getChunkIndex()
304                 );
305                 followerToSnapshot.markSendStatus(false);
306             }
307
308         } else {
309             LOG.error("ERROR!!" +
310                     "FollowerId in InstallSnapshotReply not known to Leader" +
311                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
312                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
313             );
314         }
315     }
316
317     private void replicate(Replicate replicate) {
318         long logIndex = replicate.getReplicatedLogEntry().getIndex();
319
320         if(LOG.isDebugEnabled()) {
321             LOG.debug("Replicate message {}", logIndex);
322         }
323
324         // Create a tracker entry we will use this later to notify the
325         // client actor
326         trackerList.add(
327             new ClientRequestTrackerImpl(replicate.getClientActor(),
328                 replicate.getIdentifier(),
329                 logIndex)
330         );
331
332         if (followers.size() == 0) {
333             context.setCommitIndex(logIndex);
334             applyLogToStateMachine(logIndex);
335         } else {
336             sendAppendEntries();
337         }
338     }
339
340     private void sendAppendEntries() {
341         // Send an AppendEntries to all followers
342         for (String followerId : followers) {
343             ActorSelection followerActor = context.getPeerActorSelection(followerId);
344
345             if (followerActor != null) {
346                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
347                 long followerNextIndex = followerLogInformation.getNextIndex().get();
348                 List<ReplicatedLogEntry> entries = Collections.emptyList();
349
350                 if (mapFollowerToSnapshot.get(followerId) != null) {
351                     if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
352                         sendSnapshotChunk(followerActor, followerId);
353                     }
354
355                 } else {
356
357                     if (context.getReplicatedLog().isPresent(followerNextIndex)) {
358                         // FIXME : Sending one entry at a time
359                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
360
361                         followerActor.tell(
362                             new AppendEntries(currentTerm(), context.getId(),
363                                 prevLogIndex(followerNextIndex),
364                                 prevLogTerm(followerNextIndex), entries,
365                                 context.getCommitIndex()).toSerializable(),
366                             actor()
367                         );
368
369                     } else {
370                         // if the followers next index is not present in the leaders log, then snapshot should be sent
371                         long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
372                         long leaderLastIndex = context.getReplicatedLog().lastIndex();
373                         if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
374                             // if the follower is just not starting and leader's index
375                             // is more than followers index
376                             if(LOG.isDebugEnabled()) {
377                                 LOG.debug("SendInstallSnapshot to follower:{}," +
378                                         "follower-nextIndex:{}, leader-snapshot-index:{},  " +
379                                         "leader-last-index:{}", followerId,
380                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex
381                                 );
382                             }
383
384                             actor().tell(new SendInstallSnapshot(), actor());
385                         } else {
386                             followerActor.tell(
387                                 new AppendEntries(currentTerm(), context.getId(),
388                                     prevLogIndex(followerNextIndex),
389                                     prevLogTerm(followerNextIndex), entries,
390                                     context.getCommitIndex()).toSerializable(),
391                                 actor()
392                             );
393                         }
394                     }
395                 }
396             }
397         }
398     }
399
400     /**
401      * An installSnapshot is scheduled at a interval that is a multiple of
402      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
403      * snapshots at every heartbeat.
404      */
405     private void installSnapshotIfNeeded(){
406         for (String followerId : followers) {
407             ActorSelection followerActor =
408                 context.getPeerActorSelection(followerId);
409
410             if(followerActor != null) {
411                 FollowerLogInformation followerLogInformation =
412                     followerToLog.get(followerId);
413
414                 long nextIndex = followerLogInformation.getNextIndex().get();
415
416                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
417                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
418                     sendSnapshotChunk(followerActor, followerId);
419                 }
420             }
421         }
422     }
423
424     /**
425      *  Sends a snapshot chunk to a given follower
426      *  InstallSnapshot should qualify as a heartbeat too.
427      */
428     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
429         try {
430             followerActor.tell(
431                 new InstallSnapshot(currentTerm(), context.getId(),
432                     context.getReplicatedLog().getSnapshotIndex(),
433                     context.getReplicatedLog().getSnapshotTerm(),
434                     getNextSnapshotChunk(followerId,
435                         context.getReplicatedLog().getSnapshot()),
436                     mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
437                     mapFollowerToSnapshot.get(followerId).getTotalChunks()
438                 ).toSerializable(),
439                 actor()
440             );
441             LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
442                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
443                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
444         } catch (IOException e) {
445             LOG.error(e, "InstallSnapshot failed for Leader.");
446         }
447     }
448
449     /**
450      * Acccepts snaphot as ByteString, enters into map for future chunks
451      * creates and return a ByteString chunk
452      */
453     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
454         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
455         if (followerToSnapshot == null) {
456             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
457             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
458         }
459         ByteString nextChunk = followerToSnapshot.getNextChunk();
460         if(LOG.isDebugEnabled()) {
461             LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
462         }
463
464         return nextChunk;
465     }
466
467     private void sendHeartBeat() {
468         if (followers.size() > 0) {
469             sendAppendEntries();
470         }
471     }
472
473     private void stopHeartBeat() {
474         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
475             heartbeatSchedule.cancel();
476         }
477     }
478
479     private void stopInstallSnapshotSchedule() {
480         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
481             installSnapshotSchedule.cancel();
482         }
483     }
484
485     private void scheduleHeartBeat(FiniteDuration interval) {
486         if(followers.size() == 0){
487             // Optimization - do not bother scheduling a heartbeat as there are
488             // no followers
489             return;
490         }
491
492         stopHeartBeat();
493
494         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
495         // message is sent to itself.
496         // Scheduling the heartbeat only once here because heartbeats do not
497         // need to be sent if there are other messages being sent to the remote
498         // actor.
499         heartbeatSchedule =
500             context.getActorSystem().scheduler().scheduleOnce(
501                 interval,
502                 context.getActor(), new SendHeartBeat(),
503                 context.getActorSystem().dispatcher(), context.getActor());
504     }
505
506
507     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
508         if(followers.size() == 0){
509             // Optimization - do not bother scheduling a heartbeat as there are
510             // no followers
511             return;
512         }
513
514         stopInstallSnapshotSchedule();
515
516         // Schedule a message to send append entries to followers that can
517         // accept an append entries with some data in it
518         installSnapshotSchedule =
519             context.getActorSystem().scheduler().scheduleOnce(
520                 interval,
521                 context.getActor(), new SendInstallSnapshot(),
522                 context.getActorSystem().dispatcher(), context.getActor());
523     }
524
525
526
527     @Override public void close() throws Exception {
528         stopHeartBeat();
529     }
530
531     @Override public String getLeaderId() {
532         return context.getId();
533     }
534
535     /**
536      * Encapsulates the snapshot bytestring and handles the logic of sending
537      * snapshot chunks
538      */
539     protected class FollowerToSnapshot {
540         private ByteString snapshotBytes;
541         private int offset = 0;
542         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
543         private int replyReceivedForOffset;
544         // if replyStatus is false, the previous chunk is attempted
545         private boolean replyStatus = false;
546         private int chunkIndex;
547         private int totalChunks;
548
549         public FollowerToSnapshot(ByteString snapshotBytes) {
550             this.snapshotBytes = snapshotBytes;
551             replyReceivedForOffset = -1;
552             chunkIndex = 1;
553             int size = snapshotBytes.size();
554             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
555                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
556             if(LOG.isDebugEnabled()) {
557                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
558                     size, totalChunks);
559             }
560         }
561
562         public ByteString getSnapshotBytes() {
563             return snapshotBytes;
564         }
565
566         public int incrementOffset() {
567             if(replyStatus) {
568                 // if prev chunk failed, we would want to sent the same chunk again
569                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
570             }
571             return offset;
572         }
573
574         public int incrementChunkIndex() {
575             if (replyStatus) {
576                 // if prev chunk failed, we would want to sent the same chunk again
577                 chunkIndex =  chunkIndex + 1;
578             }
579             return chunkIndex;
580         }
581
582         public int getChunkIndex() {
583             return chunkIndex;
584         }
585
586         public int getTotalChunks() {
587             return totalChunks;
588         }
589
590         public boolean canSendNextChunk() {
591             // we only send a false if a chunk is sent but we have not received a reply yet
592             return replyReceivedForOffset == offset;
593         }
594
595         public boolean isLastChunk(int chunkIndex) {
596             return totalChunks == chunkIndex;
597         }
598
599         public void markSendStatus(boolean success) {
600             if (success) {
601                 // if the chunk sent was successful
602                 replyReceivedForOffset = offset;
603                 replyStatus = true;
604             } else {
605                 // if the chunk sent was failure
606                 replyReceivedForOffset = offset;
607                 replyStatus = false;
608             }
609         }
610
611         public ByteString getNextChunk() {
612             int snapshotLength = getSnapshotBytes().size();
613             int start = incrementOffset();
614             int size = context.getConfigParams().getSnapshotChunkSize();
615             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
616                 size = snapshotLength;
617             } else {
618                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
619                     size = snapshotLength - start;
620                 }
621             }
622
623             if(LOG.isDebugEnabled()) {
624                 LOG.debug("length={}, offset={},size={}",
625                     snapshotLength, start, size);
626             }
627             return getSnapshotBytes().substring(start, start + size);
628
629         }
630     }
631
632 }