Merge "Bug 1025: Fixed incorrect revision in sal-remote-augment, which caused log...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.event.LoggingAdapter;
15 import com.google.common.base.Preconditions;
16 import com.google.protobuf.ByteString;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
18 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
26 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
32 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
33 import scala.concurrent.duration.FiniteDuration;
34
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Set;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 /**
46  * The behavior of a RaftActor when it is in the Leader state
47  * <p/>
48  * Leaders:
49  * <ul>
50  * <li> Upon election: send initial empty AppendEntries RPCs
51  * (heartbeat) to each server; repeat during idle periods to
52  * prevent election timeouts (§5.2)
53  * <li> If command received from client: append entry to local log,
54  * respond after entry applied to state machine (§5.3)
55  * <li> If last log index ≥ nextIndex for a follower: send
56  * AppendEntries RPC with log entries starting at nextIndex
57  * <ul>
58  * <li> If successful: update nextIndex and matchIndex for
59  * follower (§5.3)
60  * <li> If AppendEntries fails because of log inconsistency:
61  * decrement nextIndex and retry (§5.3)
62  * </ul>
63  * <li> If there exists an N such that N > commitIndex, a majority
64  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
65  * set commitIndex = N (§5.3, §5.4).
66  */
67 public class Leader extends AbstractRaftActorBehavior {
68
69
70     protected final Map<String, FollowerLogInformation> followerToLog =
71         new HashMap();
72     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
73
74     private final Set<String> followers;
75
76     private Cancellable heartbeatSchedule = null;
77     private Cancellable appendEntriesSchedule = null;
78     private Cancellable installSnapshotSchedule = null;
79
80     private List<ClientRequestTracker> trackerList = new ArrayList<>();
81
82     private final int minReplicationCount;
83
84     private final LoggingAdapter LOG;
85
86     public Leader(RaftActorContext context) {
87         super(context);
88
89         LOG = context.getLogger();
90
91         followers = context.getPeerAddresses().keySet();
92
93         for (String followerId : followers) {
94             FollowerLogInformation followerLogInformation =
95                 new FollowerLogInformationImpl(followerId,
96                     new AtomicLong(context.getCommitIndex()),
97                     new AtomicLong(-1));
98
99             followerToLog.put(followerId, followerLogInformation);
100         }
101
102         if(LOG.isDebugEnabled()) {
103             LOG.debug("Election:Leader has following peers:" + followers);
104         }
105
106         if (followers.size() > 0) {
107             minReplicationCount = (followers.size() + 1) / 2 + 1;
108         } else {
109             minReplicationCount = 0;
110         }
111
112
113         // Immediately schedule a heartbeat
114         // Upon election: send initial empty AppendEntries RPCs
115         // (heartbeat) to each server; repeat during idle periods to
116         // prevent election timeouts (§5.2)
117         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
118
119         scheduleInstallSnapshotCheck(
120             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
121                 context.getConfigParams().getHeartBeatInterval().unit())
122         );
123
124     }
125
126     @Override protected RaftState handleAppendEntries(ActorRef sender,
127         AppendEntries appendEntries) {
128
129         if(LOG.isDebugEnabled()) {
130             LOG.debug(appendEntries.toString());
131         }
132
133         return state();
134     }
135
136     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
137         AppendEntriesReply appendEntriesReply) {
138
139         if(! appendEntriesReply.isSuccess()) {
140             if(LOG.isDebugEnabled()) {
141                 LOG.debug(appendEntriesReply.toString());
142             }
143         }
144
145         // Update the FollowerLogInformation
146         String followerId = appendEntriesReply.getFollowerId();
147         FollowerLogInformation followerLogInformation =
148             followerToLog.get(followerId);
149
150         if(followerLogInformation == null){
151             LOG.error("Unknown follower {}", followerId);
152             return state();
153         }
154
155         if (appendEntriesReply.isSuccess()) {
156             followerLogInformation
157                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
158             followerLogInformation
159                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
160         } else {
161
162             // TODO: When we find that the follower is out of sync with the
163             // Leader we simply decrement that followers next index by 1.
164             // Would it be possible to do better than this? The RAFT spec
165             // does not explicitly deal with it but may be something for us to
166             // think about
167
168             followerLogInformation.decrNextIndex();
169         }
170
171         // Now figure out if this reply warrants a change in the commitIndex
172         // If there exists an N such that N > commitIndex, a majority
173         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
174         // set commitIndex = N (§5.3, §5.4).
175         for (long N = context.getCommitIndex() + 1; ; N++) {
176             int replicatedCount = 1;
177
178             for (FollowerLogInformation info : followerToLog.values()) {
179                 if (info.getMatchIndex().get() >= N) {
180                     replicatedCount++;
181                 }
182             }
183
184             if (replicatedCount >= minReplicationCount) {
185                 ReplicatedLogEntry replicatedLogEntry =
186                     context.getReplicatedLog().get(N);
187                 if (replicatedLogEntry != null
188                     && replicatedLogEntry.getTerm()
189                     == currentTerm()) {
190                     context.setCommitIndex(N);
191                 }
192             } else {
193                 break;
194             }
195         }
196
197         // Apply the change to the state machine
198         if (context.getCommitIndex() > context.getLastApplied()) {
199             applyLogToStateMachine(context.getCommitIndex());
200         }
201
202         return state();
203     }
204
205     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
206
207         ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
208         if(toRemove != null) {
209             trackerList.remove(toRemove);
210         }
211
212         return toRemove;
213     }
214
215     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
216         for (ClientRequestTracker tracker : trackerList) {
217             if (tracker.getIndex() == logIndex) {
218                 return tracker;
219             }
220         }
221
222         return null;
223     }
224
225     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
226         RequestVoteReply requestVoteReply) {
227         return state();
228     }
229
230     @Override public RaftState state() {
231         return RaftState.Leader;
232     }
233
234     @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
235         Preconditions.checkNotNull(sender, "sender should not be null");
236
237         Object message = fromSerializableMessage(originalMessage);
238
239         if (message instanceof RaftRPC) {
240             RaftRPC rpc = (RaftRPC) message;
241             // If RPC request or response contains term T > currentTerm:
242             // set currentTerm = T, convert to follower (§5.1)
243             // This applies to all RPC messages and responses
244             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
245                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
246                 return RaftState.Follower;
247             }
248         }
249
250         try {
251             if (message instanceof SendHeartBeat) {
252                 return sendHeartBeat();
253             } else if(message instanceof SendInstallSnapshot) {
254                 installSnapshotIfNeeded();
255             } else if (message instanceof Replicate) {
256                 replicate((Replicate) message);
257             } else if (message instanceof InstallSnapshotReply){
258                 handleInstallSnapshotReply(
259                     (InstallSnapshotReply) message);
260             }
261         } finally {
262             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
263         }
264
265         return super.handleMessage(sender, message);
266     }
267
268     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
269         String followerId = reply.getFollowerId();
270         FollowerToSnapshot followerToSnapshot =
271             mapFollowerToSnapshot.get(followerId);
272
273         if (followerToSnapshot != null &&
274             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
275
276             if (reply.isSuccess()) {
277                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
278                     //this was the last chunk reply
279                     if(LOG.isDebugEnabled()) {
280                         LOG.debug("InstallSnapshotReply received, " +
281                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
282                             reply.getChunkIndex(), followerId,
283                             context.getReplicatedLog().getSnapshotIndex() + 1
284                         );
285                     }
286
287                     FollowerLogInformation followerLogInformation =
288                         followerToLog.get(followerId);
289                     followerLogInformation.setMatchIndex(
290                         context.getReplicatedLog().getSnapshotIndex());
291                     followerLogInformation.setNextIndex(
292                         context.getReplicatedLog().getSnapshotIndex() + 1);
293                     mapFollowerToSnapshot.remove(followerId);
294
295                     if(LOG.isDebugEnabled()) {
296                         LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
297                             followerToLog.get(followerId).getNextIndex().get());
298                     }
299
300                 } else {
301                     followerToSnapshot.markSendStatus(true);
302                 }
303             } else {
304                 LOG.info("InstallSnapshotReply received, " +
305                         "sending snapshot chunk failed, Will retry, Chunk:{}",
306                     reply.getChunkIndex()
307                 );
308                 followerToSnapshot.markSendStatus(false);
309             }
310
311         } else {
312             LOG.error("ERROR!!" +
313                     "FollowerId in InstallSnapshotReply not known to Leader" +
314                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
315                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
316             );
317         }
318     }
319
320     private void replicate(Replicate replicate) {
321         long logIndex = replicate.getReplicatedLogEntry().getIndex();
322
323         if(LOG.isDebugEnabled()) {
324             LOG.debug("Replicate message " + logIndex);
325         }
326
327         // Create a tracker entry we will use this later to notify the
328         // client actor
329         trackerList.add(
330             new ClientRequestTrackerImpl(replicate.getClientActor(),
331                 replicate.getIdentifier(),
332                 logIndex)
333         );
334
335         if (followers.size() == 0) {
336             context.setCommitIndex(logIndex);
337             applyLogToStateMachine(logIndex);
338         } else {
339             sendAppendEntries();
340         }
341     }
342
343     private void sendAppendEntries() {
344         // Send an AppendEntries to all followers
345         for (String followerId : followers) {
346             ActorSelection followerActor = context.getPeerActorSelection(followerId);
347
348             if (followerActor != null) {
349                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
350                 long followerNextIndex = followerLogInformation.getNextIndex().get();
351                 List<ReplicatedLogEntry> entries = Collections.emptyList();
352
353                 if (mapFollowerToSnapshot.get(followerId) != null) {
354                     if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
355                         sendSnapshotChunk(followerActor, followerId);
356                     }
357
358                 } else {
359
360                     if (context.getReplicatedLog().isPresent(followerNextIndex)) {
361                         // FIXME : Sending one entry at a time
362                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
363
364                         followerActor.tell(
365                             new AppendEntries(currentTerm(), context.getId(),
366                                 prevLogIndex(followerNextIndex),
367                                 prevLogTerm(followerNextIndex), entries,
368                                 context.getCommitIndex()).toSerializable(),
369                             actor()
370                         );
371
372                     } else {
373                         // if the followers next index is not present in the leaders log, then snapshot should be sent
374                         long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
375                         long leaderLastIndex = context.getReplicatedLog().lastIndex();
376                         if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
377                             // if the follower is just not starting and leader's index
378                             // is more than followers index
379                             if(LOG.isDebugEnabled()) {
380                                 LOG.debug("SendInstallSnapshot to follower:{}," +
381                                         "follower-nextIndex:{}, leader-snapshot-index:{},  " +
382                                         "leader-last-index:{}", followerId,
383                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex
384                                 );
385                             }
386
387                             actor().tell(new SendInstallSnapshot(), actor());
388                         } else {
389                             followerActor.tell(
390                                 new AppendEntries(currentTerm(), context.getId(),
391                                     prevLogIndex(followerNextIndex),
392                                     prevLogTerm(followerNextIndex), entries,
393                                     context.getCommitIndex()).toSerializable(),
394                                 actor()
395                             );
396                         }
397                     }
398                 }
399             }
400         }
401     }
402
403     /**
404      * An installSnapshot is scheduled at a interval that is a multiple of
405      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
406      * snapshots at every heartbeat.
407      */
408     private void installSnapshotIfNeeded(){
409         for (String followerId : followers) {
410             ActorSelection followerActor =
411                 context.getPeerActorSelection(followerId);
412
413             if(followerActor != null) {
414                 FollowerLogInformation followerLogInformation =
415                     followerToLog.get(followerId);
416
417                 long nextIndex = followerLogInformation.getNextIndex().get();
418
419                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
420                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
421                     sendSnapshotChunk(followerActor, followerId);
422                 }
423             }
424         }
425     }
426
427     /**
428      *  Sends a snapshot chunk to a given follower
429      *  InstallSnapshot should qualify as a heartbeat too.
430      */
431     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
432         try {
433             followerActor.tell(
434                 new InstallSnapshot(currentTerm(), context.getId(),
435                     context.getReplicatedLog().getSnapshotIndex(),
436                     context.getReplicatedLog().getSnapshotTerm(),
437                     getNextSnapshotChunk(followerId,
438                         context.getReplicatedLog().getSnapshot()),
439                     mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
440                     mapFollowerToSnapshot.get(followerId).getTotalChunks()
441                 ).toSerializable(),
442                 actor()
443             );
444             LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
445                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
446                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
447         } catch (IOException e) {
448             LOG.error("InstallSnapshot failed for Leader.", e);
449         }
450     }
451
452     /**
453      * Acccepts snaphot as ByteString, enters into map for future chunks
454      * creates and return a ByteString chunk
455      */
456     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
457         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
458         if (followerToSnapshot == null) {
459             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
460             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
461         }
462         ByteString nextChunk = followerToSnapshot.getNextChunk();
463         if(LOG.isDebugEnabled()) {
464             LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
465         }
466
467         return nextChunk;
468     }
469
470     private RaftState sendHeartBeat() {
471         if (followers.size() > 0) {
472             sendAppendEntries();
473         }
474         return state();
475     }
476
477     private void stopHeartBeat() {
478         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
479             heartbeatSchedule.cancel();
480         }
481     }
482
483     private void stopInstallSnapshotSchedule() {
484         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
485             installSnapshotSchedule.cancel();
486         }
487     }
488
489     private void scheduleHeartBeat(FiniteDuration interval) {
490         if(followers.size() == 0){
491             // Optimization - do not bother scheduling a heartbeat as there are
492             // no followers
493             return;
494         }
495
496         stopHeartBeat();
497
498         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
499         // message is sent to itself.
500         // Scheduling the heartbeat only once here because heartbeats do not
501         // need to be sent if there are other messages being sent to the remote
502         // actor.
503         heartbeatSchedule =
504             context.getActorSystem().scheduler().scheduleOnce(
505                 interval,
506                 context.getActor(), new SendHeartBeat(),
507                 context.getActorSystem().dispatcher(), context.getActor());
508     }
509
510
511     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
512         if(followers.size() == 0){
513             // Optimization - do not bother scheduling a heartbeat as there are
514             // no followers
515             return;
516         }
517
518         stopInstallSnapshotSchedule();
519
520         // Schedule a message to send append entries to followers that can
521         // accept an append entries with some data in it
522         installSnapshotSchedule =
523             context.getActorSystem().scheduler().scheduleOnce(
524                 interval,
525                 context.getActor(), new SendInstallSnapshot(),
526                 context.getActorSystem().dispatcher(), context.getActor());
527     }
528
529
530
531     @Override public void close() throws Exception {
532         stopHeartBeat();
533     }
534
535     @Override public String getLeaderId() {
536         return context.getId();
537     }
538
539     /**
540      * Encapsulates the snapshot bytestring and handles the logic of sending
541      * snapshot chunks
542      */
543     protected class FollowerToSnapshot {
544         private ByteString snapshotBytes;
545         private int offset = 0;
546         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
547         private int replyReceivedForOffset;
548         // if replyStatus is false, the previous chunk is attempted
549         private boolean replyStatus = false;
550         private int chunkIndex;
551         private int totalChunks;
552
553         public FollowerToSnapshot(ByteString snapshotBytes) {
554             this.snapshotBytes = snapshotBytes;
555             replyReceivedForOffset = -1;
556             chunkIndex = 1;
557             int size = snapshotBytes.size();
558             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
559                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
560             if(LOG.isDebugEnabled()) {
561                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
562                     size, totalChunks);
563             }
564         }
565
566         public ByteString getSnapshotBytes() {
567             return snapshotBytes;
568         }
569
570         public int incrementOffset() {
571             if(replyStatus) {
572                 // if prev chunk failed, we would want to sent the same chunk again
573                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
574             }
575             return offset;
576         }
577
578         public int incrementChunkIndex() {
579             if (replyStatus) {
580                 // if prev chunk failed, we would want to sent the same chunk again
581                 chunkIndex =  chunkIndex + 1;
582             }
583             return chunkIndex;
584         }
585
586         public int getChunkIndex() {
587             return chunkIndex;
588         }
589
590         public int getTotalChunks() {
591             return totalChunks;
592         }
593
594         public boolean canSendNextChunk() {
595             // we only send a false if a chunk is sent but we have not received a reply yet
596             return replyReceivedForOffset == offset;
597         }
598
599         public boolean isLastChunk(int chunkIndex) {
600             return totalChunks == chunkIndex;
601         }
602
603         public void markSendStatus(boolean success) {
604             if (success) {
605                 // if the chunk sent was successful
606                 replyReceivedForOffset = offset;
607                 replyStatus = true;
608             } else {
609                 // if the chunk sent was failure
610                 replyReceivedForOffset = offset;
611                 replyStatus = false;
612             }
613         }
614
615         public ByteString getNextChunk() {
616             int snapshotLength = getSnapshotBytes().size();
617             int start = incrementOffset();
618             int size = context.getConfigParams().getSnapshotChunkSize();
619             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
620                 size = snapshotLength;
621             } else {
622                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
623                     size = snapshotLength - start;
624                 }
625             }
626
627             if(LOG.isDebugEnabled()) {
628                 LOG.debug("length={}, offset={},size={}",
629                     snapshotLength, start, size);
630             }
631             return getSnapshotBytes().substring(start, start + size);
632
633         }
634     }
635
636 }