Merge "BUG-2254 Make runtime rpcs in config subsystem/netconf handle context-instance...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import com.google.protobuf.ByteString;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
33
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 /**
45  * The behavior of a RaftActor when it is in the Leader state
46  * <p/>
47  * Leaders:
48  * <ul>
49  * <li> Upon election: send initial empty AppendEntries RPCs
50  * (heartbeat) to each server; repeat during idle periods to
51  * prevent election timeouts (§5.2)
52  * <li> If command received from client: append entry to local log,
53  * respond after entry applied to state machine (§5.3)
54  * <li> If last log index ≥ nextIndex for a follower: send
55  * AppendEntries RPC with log entries starting at nextIndex
56  * <ul>
57  * <li> If successful: update nextIndex and matchIndex for
58  * follower (§5.3)
59  * <li> If AppendEntries fails because of log inconsistency:
60  * decrement nextIndex and retry (§5.3)
61  * </ul>
62  * <li> If there exists an N such that N > commitIndex, a majority
63  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
64  * set commitIndex = N (§5.3, §5.4).
65  */
66 public class Leader extends AbstractRaftActorBehavior {
67
68
69     protected final Map<String, FollowerLogInformation> followerToLog =
70         new HashMap();
71     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
72
73     private final Set<String> followers;
74
75     private Cancellable heartbeatSchedule = null;
76     private Cancellable installSnapshotSchedule = null;
77
78     private List<ClientRequestTracker> trackerList = new ArrayList<>();
79
80     private final int minReplicationCount;
81
82     public Leader(RaftActorContext context) {
83         super(context);
84
85         followers = context.getPeerAddresses().keySet();
86
87         for (String followerId : followers) {
88             FollowerLogInformation followerLogInformation =
89                 new FollowerLogInformationImpl(followerId,
90                     new AtomicLong(context.getCommitIndex()),
91                     new AtomicLong(-1));
92
93             followerToLog.put(followerId, followerLogInformation);
94         }
95
96         if(LOG.isDebugEnabled()) {
97             LOG.debug("Election:Leader has following peers: {}", followers);
98         }
99
100         if (followers.size() > 0) {
101             minReplicationCount = (followers.size() + 1) / 2 + 1;
102         } else {
103             minReplicationCount = 0;
104         }
105
106
107         // Immediately schedule a heartbeat
108         // Upon election: send initial empty AppendEntries RPCs
109         // (heartbeat) to each server; repeat during idle periods to
110         // prevent election timeouts (§5.2)
111         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
112
113         scheduleInstallSnapshotCheck(
114             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
115                 context.getConfigParams().getHeartBeatInterval().unit())
116         );
117
118     }
119
120     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
121         AppendEntries appendEntries) {
122
123         if(LOG.isDebugEnabled()) {
124             LOG.debug(appendEntries.toString());
125         }
126
127         return this;
128     }
129
130     @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
131         AppendEntriesReply appendEntriesReply) {
132
133         if(! appendEntriesReply.isSuccess()) {
134             if(LOG.isDebugEnabled()) {
135                 LOG.debug(appendEntriesReply.toString());
136             }
137         }
138
139         // Update the FollowerLogInformation
140         String followerId = appendEntriesReply.getFollowerId();
141         FollowerLogInformation followerLogInformation =
142             followerToLog.get(followerId);
143
144         if(followerLogInformation == null){
145             LOG.error("Unknown follower {}", followerId);
146             return this;
147         }
148
149         if (appendEntriesReply.isSuccess()) {
150             followerLogInformation
151                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
152             followerLogInformation
153                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
154         } else {
155
156             // TODO: When we find that the follower is out of sync with the
157             // Leader we simply decrement that followers next index by 1.
158             // Would it be possible to do better than this? The RAFT spec
159             // does not explicitly deal with it but may be something for us to
160             // think about
161
162             followerLogInformation.decrNextIndex();
163         }
164
165         // Now figure out if this reply warrants a change in the commitIndex
166         // If there exists an N such that N > commitIndex, a majority
167         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
168         // set commitIndex = N (§5.3, §5.4).
169         for (long N = context.getCommitIndex() + 1; ; N++) {
170             int replicatedCount = 1;
171
172             for (FollowerLogInformation info : followerToLog.values()) {
173                 if (info.getMatchIndex().get() >= N) {
174                     replicatedCount++;
175                 }
176             }
177
178             if (replicatedCount >= minReplicationCount) {
179                 ReplicatedLogEntry replicatedLogEntry =
180                     context.getReplicatedLog().get(N);
181                 if (replicatedLogEntry != null
182                     && replicatedLogEntry.getTerm()
183                     == currentTerm()) {
184                     context.setCommitIndex(N);
185                 }
186             } else {
187                 break;
188             }
189         }
190
191         // Apply the change to the state machine
192         if (context.getCommitIndex() > context.getLastApplied()) {
193             applyLogToStateMachine(context.getCommitIndex());
194         }
195
196         return this;
197     }
198
199     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
200
201         ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
202         if(toRemove != null) {
203             trackerList.remove(toRemove);
204         }
205
206         return toRemove;
207     }
208
209     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
210         for (ClientRequestTracker tracker : trackerList) {
211             if (tracker.getIndex() == logIndex) {
212                 return tracker;
213             }
214         }
215
216         return null;
217     }
218
219     @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
220         RequestVoteReply requestVoteReply) {
221         return this;
222     }
223
224     @Override public RaftState state() {
225         return RaftState.Leader;
226     }
227
228     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
229         Preconditions.checkNotNull(sender, "sender should not be null");
230
231         Object message = fromSerializableMessage(originalMessage);
232
233         if (message instanceof RaftRPC) {
234             RaftRPC rpc = (RaftRPC) message;
235             // If RPC request or response contains term T > currentTerm:
236             // set currentTerm = T, convert to follower (§5.1)
237             // This applies to all RPC messages and responses
238             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
239                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
240
241                 return switchBehavior(new Follower(context));
242             }
243         }
244
245         try {
246             if (message instanceof SendHeartBeat) {
247                 sendHeartBeat();
248                 return this;
249             } else if(message instanceof SendInstallSnapshot) {
250                 installSnapshotIfNeeded();
251             } else if (message instanceof Replicate) {
252                 replicate((Replicate) message);
253             } else if (message instanceof InstallSnapshotReply){
254                 handleInstallSnapshotReply(
255                     (InstallSnapshotReply) message);
256             }
257         } finally {
258             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
259         }
260
261         return super.handleMessage(sender, message);
262     }
263
264     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
265         String followerId = reply.getFollowerId();
266         FollowerToSnapshot followerToSnapshot =
267             mapFollowerToSnapshot.get(followerId);
268
269         if (followerToSnapshot != null &&
270             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
271
272             if (reply.isSuccess()) {
273                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
274                     //this was the last chunk reply
275                     if(LOG.isDebugEnabled()) {
276                         LOG.debug("InstallSnapshotReply received, " +
277                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
278                             reply.getChunkIndex(), followerId,
279                             context.getReplicatedLog().getSnapshotIndex() + 1
280                         );
281                     }
282
283                     FollowerLogInformation followerLogInformation =
284                         followerToLog.get(followerId);
285                     followerLogInformation.setMatchIndex(
286                         context.getReplicatedLog().getSnapshotIndex());
287                     followerLogInformation.setNextIndex(
288                         context.getReplicatedLog().getSnapshotIndex() + 1);
289                     mapFollowerToSnapshot.remove(followerId);
290
291                     if(LOG.isDebugEnabled()) {
292                         LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
293                             followerToLog.get(followerId).getNextIndex().get());
294                     }
295
296                 } else {
297                     followerToSnapshot.markSendStatus(true);
298                 }
299             } else {
300                 LOG.info("InstallSnapshotReply received, " +
301                         "sending snapshot chunk failed, Will retry, Chunk:{}",
302                     reply.getChunkIndex()
303                 );
304                 followerToSnapshot.markSendStatus(false);
305             }
306
307         } else {
308             LOG.error("ERROR!!" +
309                     "FollowerId in InstallSnapshotReply not known to Leader" +
310                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
311                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
312             );
313         }
314     }
315
316     private void replicate(Replicate replicate) {
317         long logIndex = replicate.getReplicatedLogEntry().getIndex();
318
319         if(LOG.isDebugEnabled()) {
320             LOG.debug("Replicate message {}", logIndex);
321         }
322
323         // Create a tracker entry we will use this later to notify the
324         // client actor
325         trackerList.add(
326             new ClientRequestTrackerImpl(replicate.getClientActor(),
327                 replicate.getIdentifier(),
328                 logIndex)
329         );
330
331         if (followers.size() == 0) {
332             context.setCommitIndex(logIndex);
333             applyLogToStateMachine(logIndex);
334         } else {
335             sendAppendEntries();
336         }
337     }
338
339     private void sendAppendEntries() {
340         // Send an AppendEntries to all followers
341         for (String followerId : followers) {
342             ActorSelection followerActor = context.getPeerActorSelection(followerId);
343
344             if (followerActor != null) {
345                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
346                 long followerNextIndex = followerLogInformation.getNextIndex().get();
347                 List<ReplicatedLogEntry> entries = Collections.emptyList();
348
349                 if (mapFollowerToSnapshot.get(followerId) != null) {
350                     if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
351                         sendSnapshotChunk(followerActor, followerId);
352                     }
353
354                 } else {
355
356                     if (context.getReplicatedLog().isPresent(followerNextIndex)) {
357                         // FIXME : Sending one entry at a time
358                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
359
360                         followerActor.tell(
361                             new AppendEntries(currentTerm(), context.getId(),
362                                 prevLogIndex(followerNextIndex),
363                                 prevLogTerm(followerNextIndex), entries,
364                                 context.getCommitIndex()).toSerializable(),
365                             actor()
366                         );
367
368                     } else {
369                         // if the followers next index is not present in the leaders log, then snapshot should be sent
370                         long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
371                         long leaderLastIndex = context.getReplicatedLog().lastIndex();
372                         if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
373                             // if the follower is just not starting and leader's index
374                             // is more than followers index
375                             if(LOG.isDebugEnabled()) {
376                                 LOG.debug("SendInstallSnapshot to follower:{}," +
377                                         "follower-nextIndex:{}, leader-snapshot-index:{},  " +
378                                         "leader-last-index:{}", followerId,
379                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex
380                                 );
381                             }
382
383                             actor().tell(new SendInstallSnapshot(), actor());
384                         } else {
385                             followerActor.tell(
386                                 new AppendEntries(currentTerm(), context.getId(),
387                                     prevLogIndex(followerNextIndex),
388                                     prevLogTerm(followerNextIndex), entries,
389                                     context.getCommitIndex()).toSerializable(),
390                                 actor()
391                             );
392                         }
393                     }
394                 }
395             }
396         }
397     }
398
399     /**
400      * An installSnapshot is scheduled at a interval that is a multiple of
401      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
402      * snapshots at every heartbeat.
403      */
404     private void installSnapshotIfNeeded(){
405         for (String followerId : followers) {
406             ActorSelection followerActor =
407                 context.getPeerActorSelection(followerId);
408
409             if(followerActor != null) {
410                 FollowerLogInformation followerLogInformation =
411                     followerToLog.get(followerId);
412
413                 long nextIndex = followerLogInformation.getNextIndex().get();
414
415                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
416                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
417                     sendSnapshotChunk(followerActor, followerId);
418                 }
419             }
420         }
421     }
422
423     /**
424      *  Sends a snapshot chunk to a given follower
425      *  InstallSnapshot should qualify as a heartbeat too.
426      */
427     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
428         try {
429             followerActor.tell(
430                 new InstallSnapshot(currentTerm(), context.getId(),
431                     context.getReplicatedLog().getSnapshotIndex(),
432                     context.getReplicatedLog().getSnapshotTerm(),
433                     getNextSnapshotChunk(followerId,
434                         context.getReplicatedLog().getSnapshot()),
435                     mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
436                     mapFollowerToSnapshot.get(followerId).getTotalChunks()
437                 ).toSerializable(),
438                 actor()
439             );
440             LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
441                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
442                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
443         } catch (IOException e) {
444             LOG.error(e, "InstallSnapshot failed for Leader.");
445         }
446     }
447
448     /**
449      * Acccepts snaphot as ByteString, enters into map for future chunks
450      * creates and return a ByteString chunk
451      */
452     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
453         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
454         if (followerToSnapshot == null) {
455             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
456             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
457         }
458         ByteString nextChunk = followerToSnapshot.getNextChunk();
459         if(LOG.isDebugEnabled()) {
460             LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
461         }
462
463         return nextChunk;
464     }
465
466     private void sendHeartBeat() {
467         if (followers.size() > 0) {
468             sendAppendEntries();
469         }
470     }
471
472     private void stopHeartBeat() {
473         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
474             heartbeatSchedule.cancel();
475         }
476     }
477
478     private void stopInstallSnapshotSchedule() {
479         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
480             installSnapshotSchedule.cancel();
481         }
482     }
483
484     private void scheduleHeartBeat(FiniteDuration interval) {
485         if(followers.size() == 0){
486             // Optimization - do not bother scheduling a heartbeat as there are
487             // no followers
488             return;
489         }
490
491         stopHeartBeat();
492
493         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
494         // message is sent to itself.
495         // Scheduling the heartbeat only once here because heartbeats do not
496         // need to be sent if there are other messages being sent to the remote
497         // actor.
498         heartbeatSchedule =
499             context.getActorSystem().scheduler().scheduleOnce(
500                 interval,
501                 context.getActor(), new SendHeartBeat(),
502                 context.getActorSystem().dispatcher(), context.getActor());
503     }
504
505
506     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
507         if(followers.size() == 0){
508             // Optimization - do not bother scheduling a heartbeat as there are
509             // no followers
510             return;
511         }
512
513         stopInstallSnapshotSchedule();
514
515         // Schedule a message to send append entries to followers that can
516         // accept an append entries with some data in it
517         installSnapshotSchedule =
518             context.getActorSystem().scheduler().scheduleOnce(
519                 interval,
520                 context.getActor(), new SendInstallSnapshot(),
521                 context.getActorSystem().dispatcher(), context.getActor());
522     }
523
524
525
526     @Override public void close() throws Exception {
527         stopHeartBeat();
528     }
529
530     @Override public String getLeaderId() {
531         return context.getId();
532     }
533
534     /**
535      * Encapsulates the snapshot bytestring and handles the logic of sending
536      * snapshot chunks
537      */
538     protected class FollowerToSnapshot {
539         private ByteString snapshotBytes;
540         private int offset = 0;
541         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
542         private int replyReceivedForOffset;
543         // if replyStatus is false, the previous chunk is attempted
544         private boolean replyStatus = false;
545         private int chunkIndex;
546         private int totalChunks;
547
548         public FollowerToSnapshot(ByteString snapshotBytes) {
549             this.snapshotBytes = snapshotBytes;
550             replyReceivedForOffset = -1;
551             chunkIndex = 1;
552             int size = snapshotBytes.size();
553             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
554                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
555             if(LOG.isDebugEnabled()) {
556                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
557                     size, totalChunks);
558             }
559         }
560
561         public ByteString getSnapshotBytes() {
562             return snapshotBytes;
563         }
564
565         public int incrementOffset() {
566             if(replyStatus) {
567                 // if prev chunk failed, we would want to sent the same chunk again
568                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
569             }
570             return offset;
571         }
572
573         public int incrementChunkIndex() {
574             if (replyStatus) {
575                 // if prev chunk failed, we would want to sent the same chunk again
576                 chunkIndex =  chunkIndex + 1;
577             }
578             return chunkIndex;
579         }
580
581         public int getChunkIndex() {
582             return chunkIndex;
583         }
584
585         public int getTotalChunks() {
586             return totalChunks;
587         }
588
589         public boolean canSendNextChunk() {
590             // we only send a false if a chunk is sent but we have not received a reply yet
591             return replyReceivedForOffset == offset;
592         }
593
594         public boolean isLastChunk(int chunkIndex) {
595             return totalChunks == chunkIndex;
596         }
597
598         public void markSendStatus(boolean success) {
599             if (success) {
600                 // if the chunk sent was successful
601                 replyReceivedForOffset = offset;
602                 replyStatus = true;
603             } else {
604                 // if the chunk sent was failure
605                 replyReceivedForOffset = offset;
606                 replyStatus = false;
607             }
608         }
609
610         public ByteString getNextChunk() {
611             int snapshotLength = getSnapshotBytes().size();
612             int start = incrementOffset();
613             int size = context.getConfigParams().getSnapshotChunkSize();
614             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
615                 size = snapshotLength;
616             } else {
617                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
618                     size = snapshotLength - start;
619                 }
620             }
621
622             if(LOG.isDebugEnabled()) {
623                 LOG.debug("length={}, offset={},size={}",
624                     snapshotLength, start, size);
625             }
626             return getSnapshotBytes().substring(start, start + size);
627
628         }
629     }
630
631 }