Merge "Bug 1875 - Used variables for nexusproxy host, externalized versions"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.event.LoggingAdapter;
15 import com.google.common.base.Preconditions;
16 import com.google.protobuf.ByteString;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
18 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
26 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
30 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
32 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
33 import scala.concurrent.duration.FiniteDuration;
34
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Set;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 /**
46  * The behavior of a RaftActor when it is in the Leader state
47  * <p/>
48  * Leaders:
49  * <ul>
50  * <li> Upon election: send initial empty AppendEntries RPCs
51  * (heartbeat) to each server; repeat during idle periods to
52  * prevent election timeouts (§5.2)
53  * <li> If command received from client: append entry to local log,
54  * respond after entry applied to state machine (§5.3)
55  * <li> If last log index ≥ nextIndex for a follower: send
56  * AppendEntries RPC with log entries starting at nextIndex
57  * <ul>
58  * <li> If successful: update nextIndex and matchIndex for
59  * follower (§5.3)
60  * <li> If AppendEntries fails because of log inconsistency:
61  * decrement nextIndex and retry (§5.3)
62  * </ul>
63  * <li> If there exists an N such that N > commitIndex, a majority
64  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
65  * set commitIndex = N (§5.3, §5.4).
66  */
67 public class Leader extends AbstractRaftActorBehavior {
68
69
70     protected final Map<String, FollowerLogInformation> followerToLog =
71         new HashMap();
72     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
73
74     private final Set<String> followers;
75
76     private Cancellable heartbeatSchedule = null;
77     private Cancellable appendEntriesSchedule = null;
78     private Cancellable installSnapshotSchedule = null;
79
80     private List<ClientRequestTracker> trackerList = new ArrayList<>();
81
82     private final int minReplicationCount;
83
84     private final LoggingAdapter LOG;
85
86     public Leader(RaftActorContext context) {
87         super(context);
88
89         LOG = context.getLogger();
90
91         if (lastIndex() >= 0) {
92             context.setCommitIndex(lastIndex());
93         }
94
95         followers = context.getPeerAddresses().keySet();
96
97         for (String followerId : followers) {
98             FollowerLogInformation followerLogInformation =
99                 new FollowerLogInformationImpl(followerId,
100                     new AtomicLong(lastIndex()),
101                     new AtomicLong(-1));
102
103             followerToLog.put(followerId, followerLogInformation);
104         }
105
106         if(LOG.isDebugEnabled()) {
107             LOG.debug("Election:Leader has following peers:" + followers);
108         }
109
110         if (followers.size() > 0) {
111             minReplicationCount = (followers.size() + 1) / 2 + 1;
112         } else {
113             minReplicationCount = 0;
114         }
115
116
117         // Immediately schedule a heartbeat
118         // Upon election: send initial empty AppendEntries RPCs
119         // (heartbeat) to each server; repeat during idle periods to
120         // prevent election timeouts (§5.2)
121         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
122
123         scheduleInstallSnapshotCheck(
124             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
125                 context.getConfigParams().getHeartBeatInterval().unit())
126         );
127
128     }
129
130     @Override protected RaftState handleAppendEntries(ActorRef sender,
131         AppendEntries appendEntries) {
132
133         if(LOG.isDebugEnabled()) {
134             LOG.debug(appendEntries.toString());
135         }
136
137         return state();
138     }
139
140     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
141         AppendEntriesReply appendEntriesReply) {
142
143         if(! appendEntriesReply.isSuccess()) {
144             if(LOG.isDebugEnabled()) {
145                 LOG.debug(appendEntriesReply.toString());
146             }
147         }
148
149         // Update the FollowerLogInformation
150         String followerId = appendEntriesReply.getFollowerId();
151         FollowerLogInformation followerLogInformation =
152             followerToLog.get(followerId);
153
154         if(followerLogInformation == null){
155             LOG.error("Unknown follower {}", followerId);
156             return state();
157         }
158
159         if (appendEntriesReply.isSuccess()) {
160             followerLogInformation
161                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
162             followerLogInformation
163                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
164         } else {
165
166             // TODO: When we find that the follower is out of sync with the
167             // Leader we simply decrement that followers next index by 1.
168             // Would it be possible to do better than this? The RAFT spec
169             // does not explicitly deal with it but may be something for us to
170             // think about
171
172             followerLogInformation.decrNextIndex();
173         }
174
175         // Now figure out if this reply warrants a change in the commitIndex
176         // If there exists an N such that N > commitIndex, a majority
177         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
178         // set commitIndex = N (§5.3, §5.4).
179         for (long N = context.getCommitIndex() + 1; ; N++) {
180             int replicatedCount = 1;
181
182             for (FollowerLogInformation info : followerToLog.values()) {
183                 if (info.getMatchIndex().get() >= N) {
184                     replicatedCount++;
185                 }
186             }
187
188             if (replicatedCount >= minReplicationCount) {
189                 ReplicatedLogEntry replicatedLogEntry =
190                     context.getReplicatedLog().get(N);
191                 if (replicatedLogEntry != null
192                     && replicatedLogEntry.getTerm()
193                     == currentTerm()) {
194                     context.setCommitIndex(N);
195                 }
196             } else {
197                 break;
198             }
199         }
200
201         // Apply the change to the state machine
202         if (context.getCommitIndex() > context.getLastApplied()) {
203             applyLogToStateMachine(context.getCommitIndex());
204         }
205
206         return state();
207     }
208
209     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
210
211         ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
212         if(toRemove != null) {
213             trackerList.remove(toRemove);
214         }
215
216         return toRemove;
217     }
218
219     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
220         for (ClientRequestTracker tracker : trackerList) {
221             if (tracker.getIndex() == logIndex) {
222                 return tracker;
223             }
224         }
225
226         return null;
227     }
228
229     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
230         RequestVoteReply requestVoteReply) {
231         return state();
232     }
233
234     @Override public RaftState state() {
235         return RaftState.Leader;
236     }
237
238     @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
239         Preconditions.checkNotNull(sender, "sender should not be null");
240
241         Object message = fromSerializableMessage(originalMessage);
242
243         if (message instanceof RaftRPC) {
244             RaftRPC rpc = (RaftRPC) message;
245             // If RPC request or response contains term T > currentTerm:
246             // set currentTerm = T, convert to follower (§5.1)
247             // This applies to all RPC messages and responses
248             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
249                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
250                 return RaftState.Follower;
251             }
252         }
253
254         try {
255             if (message instanceof SendHeartBeat) {
256                 return sendHeartBeat();
257             } else if(message instanceof SendInstallSnapshot) {
258                 installSnapshotIfNeeded();
259             } else if (message instanceof Replicate) {
260                 replicate((Replicate) message);
261             } else if (message instanceof InstallSnapshotReply){
262                 handleInstallSnapshotReply(
263                     (InstallSnapshotReply) message);
264             }
265         } finally {
266             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
267         }
268
269         return super.handleMessage(sender, message);
270     }
271
272     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
273         String followerId = reply.getFollowerId();
274         FollowerToSnapshot followerToSnapshot =
275             mapFollowerToSnapshot.get(followerId);
276
277         if (followerToSnapshot != null &&
278             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
279
280             if (reply.isSuccess()) {
281                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
282                     //this was the last chunk reply
283                     if(LOG.isDebugEnabled()) {
284                         LOG.debug("InstallSnapshotReply received, " +
285                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
286                             reply.getChunkIndex(), followerId,
287                             context.getReplicatedLog().getSnapshotIndex() + 1
288                         );
289                     }
290
291                     FollowerLogInformation followerLogInformation =
292                         followerToLog.get(followerId);
293                     followerLogInformation.setMatchIndex(
294                         context.getReplicatedLog().getSnapshotIndex());
295                     followerLogInformation.setNextIndex(
296                         context.getReplicatedLog().getSnapshotIndex() + 1);
297                     mapFollowerToSnapshot.remove(followerId);
298
299                     if(LOG.isDebugEnabled()) {
300                         LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
301                             followerToLog.get(followerId).getNextIndex().get());
302                     }
303
304                 } else {
305                     followerToSnapshot.markSendStatus(true);
306                 }
307             } else {
308                 LOG.info("InstallSnapshotReply received, " +
309                         "sending snapshot chunk failed, Will retry, Chunk:{}",
310                     reply.getChunkIndex()
311                 );
312                 followerToSnapshot.markSendStatus(false);
313             }
314
315         } else {
316             LOG.error("ERROR!!" +
317                     "FollowerId in InstallSnapshotReply not known to Leader" +
318                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
319                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
320             );
321         }
322     }
323
324     private void replicate(Replicate replicate) {
325         long logIndex = replicate.getReplicatedLogEntry().getIndex();
326
327         if(LOG.isDebugEnabled()) {
328             LOG.debug("Replicate message " + logIndex);
329         }
330
331         // Create a tracker entry we will use this later to notify the
332         // client actor
333         trackerList.add(
334             new ClientRequestTrackerImpl(replicate.getClientActor(),
335                 replicate.getIdentifier(),
336                 logIndex)
337         );
338
339         if (followers.size() == 0) {
340             context.setCommitIndex(logIndex);
341             applyLogToStateMachine(logIndex);
342         } else {
343             sendAppendEntries();
344         }
345     }
346
347     private void sendAppendEntries() {
348         // Send an AppendEntries to all followers
349         for (String followerId : followers) {
350             ActorSelection followerActor = context.getPeerActorSelection(followerId);
351
352             if (followerActor != null) {
353                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
354                 long followerNextIndex = followerLogInformation.getNextIndex().get();
355                 List<ReplicatedLogEntry> entries = Collections.emptyList();
356
357                 if (mapFollowerToSnapshot.get(followerId) != null) {
358                     if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
359                         sendSnapshotChunk(followerActor, followerId);
360                     }
361
362                 } else {
363
364                     if (context.getReplicatedLog().isPresent(followerNextIndex)) {
365                         // FIXME : Sending one entry at a time
366                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
367
368                         followerActor.tell(
369                             new AppendEntries(currentTerm(), context.getId(),
370                                 prevLogIndex(followerNextIndex),
371                                 prevLogTerm(followerNextIndex), entries,
372                                 context.getCommitIndex()).toSerializable(),
373                             actor()
374                         );
375
376                     } else {
377                         // if the followers next index is not present in the leaders log, then snapshot should be sent
378                         long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
379                         long leaderLastIndex = context.getReplicatedLog().lastIndex();
380                         if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
381                             // if the follower is just not starting and leader's index
382                             // is more than followers index
383                             if(LOG.isDebugEnabled()) {
384                                 LOG.debug("SendInstallSnapshot to follower:{}," +
385                                         "follower-nextIndex:{}, leader-snapshot-index:{},  " +
386                                         "leader-last-index:{}", followerId,
387                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex
388                                 );
389                             }
390
391                             actor().tell(new SendInstallSnapshot(), actor());
392                         } else {
393                             followerActor.tell(
394                                 new AppendEntries(currentTerm(), context.getId(),
395                                     prevLogIndex(followerNextIndex),
396                                     prevLogTerm(followerNextIndex), entries,
397                                     context.getCommitIndex()).toSerializable(),
398                                 actor()
399                             );
400                         }
401                     }
402                 }
403             }
404         }
405     }
406
407     /**
408      * An installSnapshot is scheduled at a interval that is a multiple of
409      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
410      * snapshots at every heartbeat.
411      */
412     private void installSnapshotIfNeeded(){
413         for (String followerId : followers) {
414             ActorSelection followerActor =
415                 context.getPeerActorSelection(followerId);
416
417             if(followerActor != null) {
418                 FollowerLogInformation followerLogInformation =
419                     followerToLog.get(followerId);
420
421                 long nextIndex = followerLogInformation.getNextIndex().get();
422
423                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
424                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
425                     sendSnapshotChunk(followerActor, followerId);
426                 }
427             }
428         }
429     }
430
431     /**
432      *  Sends a snapshot chunk to a given follower
433      *  InstallSnapshot should qualify as a heartbeat too.
434      */
435     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
436         try {
437             followerActor.tell(
438                 new InstallSnapshot(currentTerm(), context.getId(),
439                     context.getReplicatedLog().getSnapshotIndex(),
440                     context.getReplicatedLog().getSnapshotTerm(),
441                     getNextSnapshotChunk(followerId,
442                         context.getReplicatedLog().getSnapshot()),
443                     mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
444                     mapFollowerToSnapshot.get(followerId).getTotalChunks()
445                 ).toSerializable(),
446                 actor()
447             );
448             LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
449                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
450                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
451         } catch (IOException e) {
452             LOG.error("InstallSnapshot failed for Leader.", e);
453         }
454     }
455
456     /**
457      * Acccepts snaphot as ByteString, enters into map for future chunks
458      * creates and return a ByteString chunk
459      */
460     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
461         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
462         if (followerToSnapshot == null) {
463             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
464             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
465         }
466         ByteString nextChunk = followerToSnapshot.getNextChunk();
467         if(LOG.isDebugEnabled()) {
468             LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
469         }
470
471         return nextChunk;
472     }
473
474     private RaftState sendHeartBeat() {
475         if (followers.size() > 0) {
476             sendAppendEntries();
477         }
478         return state();
479     }
480
481     private void stopHeartBeat() {
482         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
483             heartbeatSchedule.cancel();
484         }
485     }
486
487     private void stopInstallSnapshotSchedule() {
488         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
489             installSnapshotSchedule.cancel();
490         }
491     }
492
493     private void scheduleHeartBeat(FiniteDuration interval) {
494         if(followers.size() == 0){
495             // Optimization - do not bother scheduling a heartbeat as there are
496             // no followers
497             return;
498         }
499
500         stopHeartBeat();
501
502         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
503         // message is sent to itself.
504         // Scheduling the heartbeat only once here because heartbeats do not
505         // need to be sent if there are other messages being sent to the remote
506         // actor.
507         heartbeatSchedule =
508             context.getActorSystem().scheduler().scheduleOnce(
509                 interval,
510                 context.getActor(), new SendHeartBeat(),
511                 context.getActorSystem().dispatcher(), context.getActor());
512     }
513
514
515     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
516         if(followers.size() == 0){
517             // Optimization - do not bother scheduling a heartbeat as there are
518             // no followers
519             return;
520         }
521
522         stopInstallSnapshotSchedule();
523
524         // Schedule a message to send append entries to followers that can
525         // accept an append entries with some data in it
526         installSnapshotSchedule =
527             context.getActorSystem().scheduler().scheduleOnce(
528                 interval,
529                 context.getActor(), new SendInstallSnapshot(),
530                 context.getActorSystem().dispatcher(), context.getActor());
531     }
532
533
534
535     @Override public void close() throws Exception {
536         stopHeartBeat();
537     }
538
539     @Override public String getLeaderId() {
540         return context.getId();
541     }
542
543     /**
544      * Encapsulates the snapshot bytestring and handles the logic of sending
545      * snapshot chunks
546      */
547     protected class FollowerToSnapshot {
548         private ByteString snapshotBytes;
549         private int offset = 0;
550         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
551         private int replyReceivedForOffset;
552         // if replyStatus is false, the previous chunk is attempted
553         private boolean replyStatus = false;
554         private int chunkIndex;
555         private int totalChunks;
556
557         public FollowerToSnapshot(ByteString snapshotBytes) {
558             this.snapshotBytes = snapshotBytes;
559             replyReceivedForOffset = -1;
560             chunkIndex = 1;
561             int size = snapshotBytes.size();
562             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
563                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
564             if(LOG.isDebugEnabled()) {
565                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
566                     size, totalChunks);
567             }
568         }
569
570         public ByteString getSnapshotBytes() {
571             return snapshotBytes;
572         }
573
574         public int incrementOffset() {
575             if(replyStatus) {
576                 // if prev chunk failed, we would want to sent the same chunk again
577                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
578             }
579             return offset;
580         }
581
582         public int incrementChunkIndex() {
583             if (replyStatus) {
584                 // if prev chunk failed, we would want to sent the same chunk again
585                 chunkIndex =  chunkIndex + 1;
586             }
587             return chunkIndex;
588         }
589
590         public int getChunkIndex() {
591             return chunkIndex;
592         }
593
594         public int getTotalChunks() {
595             return totalChunks;
596         }
597
598         public boolean canSendNextChunk() {
599             // we only send a false if a chunk is sent but we have not received a reply yet
600             return replyReceivedForOffset == offset;
601         }
602
603         public boolean isLastChunk(int chunkIndex) {
604             return totalChunks == chunkIndex;
605         }
606
607         public void markSendStatus(boolean success) {
608             if (success) {
609                 // if the chunk sent was successful
610                 replyReceivedForOffset = offset;
611                 replyStatus = true;
612             } else {
613                 // if the chunk sent was failure
614                 replyReceivedForOffset = offset;
615                 replyStatus = false;
616             }
617         }
618
619         public ByteString getNextChunk() {
620             int snapshotLength = getSnapshotBytes().size();
621             int start = incrementOffset();
622             int size = context.getConfigParams().getSnapshotChunkSize();
623             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
624                 size = snapshotLength;
625             } else {
626                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
627                     size = snapshotLength - start;
628                 }
629             }
630
631             if(LOG.isDebugEnabled()) {
632                 LOG.debug("length={}, offset={},size={}",
633                     snapshotLength, start, size);
634             }
635             return getSnapshotBytes().substring(start, start + size);
636
637         }
638     }
639
640 }