Bug 1894: Add LISP configuration options to etc/custom.properties in Karaf
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import com.google.protobuf.ByteString;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
17 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
33
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 /**
45  * The behavior of a RaftActor when it is in the Leader state
46  * <p/>
47  * Leaders:
48  * <ul>
49  * <li> Upon election: send initial empty AppendEntries RPCs
50  * (heartbeat) to each server; repeat during idle periods to
51  * prevent election timeouts (§5.2)
52  * <li> If command received from client: append entry to local log,
53  * respond after entry applied to state machine (§5.3)
54  * <li> If last log index ≥ nextIndex for a follower: send
55  * AppendEntries RPC with log entries starting at nextIndex
56  * <ul>
57  * <li> If successful: update nextIndex and matchIndex for
58  * follower (§5.3)
59  * <li> If AppendEntries fails because of log inconsistency:
60  * decrement nextIndex and retry (§5.3)
61  * </ul>
62  * <li> If there exists an N such that N > commitIndex, a majority
63  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
64  * set commitIndex = N (§5.3, §5.4).
65  */
66 public class Leader extends AbstractRaftActorBehavior {
67
68
69     protected final Map<String, FollowerLogInformation> followerToLog =
70         new HashMap();
71     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
72
73     private final Set<String> followers;
74
75     private Cancellable heartbeatSchedule = null;
76     private Cancellable appendEntriesSchedule = null;
77     private Cancellable installSnapshotSchedule = null;
78
79     private List<ClientRequestTracker> trackerList = new ArrayList<>();
80
81     private final int minReplicationCount;
82
83     public Leader(RaftActorContext context) {
84         super(context);
85
86         if (lastIndex() >= 0) {
87             context.setCommitIndex(lastIndex());
88         }
89
90         followers = context.getPeerAddresses().keySet();
91
92         for (String followerId : followers) {
93             FollowerLogInformation followerLogInformation =
94                 new FollowerLogInformationImpl(followerId,
95                     new AtomicLong(lastIndex()),
96                     new AtomicLong(-1));
97
98             followerToLog.put(followerId, followerLogInformation);
99         }
100
101         context.getLogger().debug("Election:Leader has following peers:"+ followers);
102
103         if (followers.size() > 0) {
104             minReplicationCount = (followers.size() + 1) / 2 + 1;
105         } else {
106             minReplicationCount = 0;
107         }
108
109
110         // Immediately schedule a heartbeat
111         // Upon election: send initial empty AppendEntries RPCs
112         // (heartbeat) to each server; repeat during idle periods to
113         // prevent election timeouts (§5.2)
114         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
115
116         scheduleInstallSnapshotCheck(
117             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
118                 context.getConfigParams().getHeartBeatInterval().unit())
119         );
120
121     }
122
123     @Override protected RaftState handleAppendEntries(ActorRef sender,
124         AppendEntries appendEntries) {
125
126         context.getLogger().debug(appendEntries.toString());
127
128         return state();
129     }
130
131     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
132         AppendEntriesReply appendEntriesReply) {
133
134         if(! appendEntriesReply.isSuccess()) {
135             context.getLogger()
136                 .debug(appendEntriesReply.toString());
137         }
138
139         // Update the FollowerLogInformation
140         String followerId = appendEntriesReply.getFollowerId();
141         FollowerLogInformation followerLogInformation =
142             followerToLog.get(followerId);
143
144         if(followerLogInformation == null){
145             context.getLogger().error("Unknown follower {}", followerId);
146             return state();
147         }
148
149         if (appendEntriesReply.isSuccess()) {
150             followerLogInformation
151                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
152             followerLogInformation
153                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
154         } else {
155
156             // TODO: When we find that the follower is out of sync with the
157             // Leader we simply decrement that followers next index by 1.
158             // Would it be possible to do better than this? The RAFT spec
159             // does not explicitly deal with it but may be something for us to
160             // think about
161
162             followerLogInformation.decrNextIndex();
163         }
164
165         // Now figure out if this reply warrants a change in the commitIndex
166         // If there exists an N such that N > commitIndex, a majority
167         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
168         // set commitIndex = N (§5.3, §5.4).
169         for (long N = context.getCommitIndex() + 1; ; N++) {
170             int replicatedCount = 1;
171
172             for (FollowerLogInformation info : followerToLog.values()) {
173                 if (info.getMatchIndex().get() >= N) {
174                     replicatedCount++;
175                 }
176             }
177
178             if (replicatedCount >= minReplicationCount) {
179                 ReplicatedLogEntry replicatedLogEntry =
180                     context.getReplicatedLog().get(N);
181                 if (replicatedLogEntry != null
182                     && replicatedLogEntry.getTerm()
183                     == currentTerm()) {
184                     context.setCommitIndex(N);
185                 }
186             } else {
187                 break;
188             }
189         }
190
191         // Apply the change to the state machine
192         if (context.getCommitIndex() > context.getLastApplied()) {
193             applyLogToStateMachine(context.getCommitIndex());
194         }
195
196         return state();
197     }
198
199     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
200         for (ClientRequestTracker tracker : trackerList) {
201             if (tracker.getIndex() == logIndex) {
202                 return tracker;
203             }
204         }
205
206         return null;
207     }
208
209     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
210         RequestVoteReply requestVoteReply) {
211         return state();
212     }
213
214     @Override public RaftState state() {
215         return RaftState.Leader;
216     }
217
218     @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
219         Preconditions.checkNotNull(sender, "sender should not be null");
220
221         Object message = fromSerializableMessage(originalMessage);
222
223         if (message instanceof RaftRPC) {
224             RaftRPC rpc = (RaftRPC) message;
225             // If RPC request or response contains term T > currentTerm:
226             // set currentTerm = T, convert to follower (§5.1)
227             // This applies to all RPC messages and responses
228             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
229                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
230                 return RaftState.Follower;
231             }
232         }
233
234         try {
235             if (message instanceof SendHeartBeat) {
236                 return sendHeartBeat();
237             } else if(message instanceof SendInstallSnapshot) {
238                 installSnapshotIfNeeded();
239             } else if (message instanceof Replicate) {
240                 replicate((Replicate) message);
241             } else if (message instanceof InstallSnapshotReply){
242                 handleInstallSnapshotReply(
243                     (InstallSnapshotReply) message);
244             }
245         } finally {
246             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
247         }
248
249         return super.handleMessage(sender, message);
250     }
251
252     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
253         String followerId = reply.getFollowerId();
254         FollowerToSnapshot followerToSnapshot =
255             mapFollowerToSnapshot.get(followerId);
256
257         if (followerToSnapshot != null &&
258             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
259
260             if (reply.isSuccess()) {
261                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
262                     //this was the last chunk reply
263                     context.getLogger().debug("InstallSnapshotReply received, " +
264                         "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
265                         reply.getChunkIndex(), followerId,
266                         context.getReplicatedLog().getSnapshotIndex() + 1);
267
268                     FollowerLogInformation followerLogInformation =
269                         followerToLog.get(followerId);
270                     followerLogInformation.setMatchIndex(
271                         context.getReplicatedLog().getSnapshotIndex());
272                     followerLogInformation.setNextIndex(
273                         context.getReplicatedLog().getSnapshotIndex() + 1);
274                     mapFollowerToSnapshot.remove(followerId);
275                     context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
276                         followerToLog.get(followerId).getNextIndex().get());
277
278                 } else {
279                     followerToSnapshot.markSendStatus(true);
280                 }
281             } else {
282                 context.getLogger().info("InstallSnapshotReply received, " +
283                     "sending snapshot chunk failed, Will retry, Chunk:{}",
284                     reply.getChunkIndex());
285                 followerToSnapshot.markSendStatus(false);
286             }
287
288         } else {
289             context.getLogger().error("ERROR!!" +
290                 "FollowerId in InstallSnapshotReply not known to Leader" +
291                 " or Chunk Index in InstallSnapshotReply not matching {} != {}",
292                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
293         }
294     }
295
296     private void replicate(Replicate replicate) {
297         long logIndex = replicate.getReplicatedLogEntry().getIndex();
298
299         context.getLogger().debug("Replicate message " + logIndex);
300
301         // Create a tracker entry we will use this later to notify the
302         // client actor
303         trackerList.add(
304             new ClientRequestTrackerImpl(replicate.getClientActor(),
305                 replicate.getIdentifier(),
306                 logIndex)
307         );
308
309         if (followers.size() == 0) {
310             context.setCommitIndex(logIndex);
311             applyLogToStateMachine(logIndex);
312         } else {
313             sendAppendEntries();
314         }
315     }
316
317     private void sendAppendEntries() {
318         // Send an AppendEntries to all followers
319         for (String followerId : followers) {
320             ActorSelection followerActor = context.getPeerActorSelection(followerId);
321
322             if (followerActor != null) {
323                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
324                 long followerNextIndex = followerLogInformation.getNextIndex().get();
325                 List<ReplicatedLogEntry> entries = Collections.emptyList();
326
327                 if (mapFollowerToSnapshot.get(followerId) != null) {
328                     if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
329                         sendSnapshotChunk(followerActor, followerId);
330                     }
331
332                 } else {
333
334                     if (context.getReplicatedLog().isPresent(followerNextIndex)) {
335                         // FIXME : Sending one entry at a time
336                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
337
338                         followerActor.tell(
339                             new AppendEntries(currentTerm(), context.getId(),
340                                 prevLogIndex(followerNextIndex),
341                                 prevLogTerm(followerNextIndex), entries,
342                                 context.getCommitIndex()).toSerializable(),
343                             actor()
344                         );
345
346                     } else {
347                         // if the followers next index is not present in the leaders log, then snapshot should be sent
348                         long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
349                         long leaderLastIndex = context.getReplicatedLog().lastIndex();
350                         if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
351                             // if the follower is just not starting and leader's index
352                             // is more than followers index
353                             context.getLogger().debug("SendInstallSnapshot to follower:{}," +
354                                 "follower-nextIndex:{}, leader-snapshot-index:{},  " +
355                                 "leader-last-index:{}", followerId,
356                                 followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
357
358                             actor().tell(new SendInstallSnapshot(), actor());
359                         } else {
360                             followerActor.tell(
361                                 new AppendEntries(currentTerm(), context.getId(),
362                                     prevLogIndex(followerNextIndex),
363                                     prevLogTerm(followerNextIndex), entries,
364                                     context.getCommitIndex()).toSerializable(),
365                                 actor()
366                             );
367                         }
368                     }
369                 }
370             }
371         }
372     }
373
374     /**
375      * An installSnapshot is scheduled at a interval that is a multiple of
376      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
377      * snapshots at every heartbeat.
378      */
379     private void installSnapshotIfNeeded(){
380         for (String followerId : followers) {
381             ActorSelection followerActor =
382                 context.getPeerActorSelection(followerId);
383
384             if(followerActor != null) {
385                 FollowerLogInformation followerLogInformation =
386                     followerToLog.get(followerId);
387
388                 long nextIndex = followerLogInformation.getNextIndex().get();
389
390                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
391                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
392                     sendSnapshotChunk(followerActor, followerId);
393                 }
394             }
395         }
396     }
397
398     /**
399      *  Sends a snapshot chunk to a given follower
400      *  InstallSnapshot should qualify as a heartbeat too.
401      */
402     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
403         try {
404             followerActor.tell(
405                 new InstallSnapshot(currentTerm(), context.getId(),
406                     context.getReplicatedLog().getSnapshotIndex(),
407                     context.getReplicatedLog().getSnapshotTerm(),
408                     getNextSnapshotChunk(followerId,
409                         context.getReplicatedLog().getSnapshot()),
410                     mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
411                     mapFollowerToSnapshot.get(followerId).getTotalChunks()
412                 ).toSerializable(),
413                 actor()
414             );
415             context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
416                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
417                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
418         } catch (IOException e) {
419             context.getLogger().error("InstallSnapshot failed for Leader.", e);
420         }
421     }
422
423     /**
424      * Acccepts snaphot as ByteString, enters into map for future chunks
425      * creates and return a ByteString chunk
426      */
427     private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
428         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
429         if (followerToSnapshot == null) {
430             followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
431             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
432         }
433         ByteString nextChunk = followerToSnapshot.getNextChunk();
434         context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
435
436         return nextChunk;
437     }
438
439     private RaftState sendHeartBeat() {
440         if (followers.size() > 0) {
441             sendAppendEntries();
442         }
443         return state();
444     }
445
446     private void stopHeartBeat() {
447         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
448             heartbeatSchedule.cancel();
449         }
450     }
451
452     private void stopInstallSnapshotSchedule() {
453         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
454             installSnapshotSchedule.cancel();
455         }
456     }
457
458     private void scheduleHeartBeat(FiniteDuration interval) {
459         if(followers.size() == 0){
460             // Optimization - do not bother scheduling a heartbeat as there are
461             // no followers
462             return;
463         }
464
465         stopHeartBeat();
466
467         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
468         // message is sent to itself.
469         // Scheduling the heartbeat only once here because heartbeats do not
470         // need to be sent if there are other messages being sent to the remote
471         // actor.
472         heartbeatSchedule =
473             context.getActorSystem().scheduler().scheduleOnce(
474                 interval,
475                 context.getActor(), new SendHeartBeat(),
476                 context.getActorSystem().dispatcher(), context.getActor());
477     }
478
479
480     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
481         if(followers.size() == 0){
482             // Optimization - do not bother scheduling a heartbeat as there are
483             // no followers
484             return;
485         }
486
487         stopInstallSnapshotSchedule();
488
489         // Schedule a message to send append entries to followers that can
490         // accept an append entries with some data in it
491         installSnapshotSchedule =
492             context.getActorSystem().scheduler().scheduleOnce(
493                 interval,
494                 context.getActor(), new SendInstallSnapshot(),
495                 context.getActorSystem().dispatcher(), context.getActor());
496     }
497
498
499
500     @Override public void close() throws Exception {
501         stopHeartBeat();
502     }
503
504     @Override public String getLeaderId() {
505         return context.getId();
506     }
507
508     /**
509      * Encapsulates the snapshot bytestring and handles the logic of sending
510      * snapshot chunks
511      */
512     protected class FollowerToSnapshot {
513         private ByteString snapshotBytes;
514         private int offset = 0;
515         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
516         private int replyReceivedForOffset;
517         // if replyStatus is false, the previous chunk is attempted
518         private boolean replyStatus = false;
519         private int chunkIndex;
520         private int totalChunks;
521
522         public FollowerToSnapshot(ByteString snapshotBytes) {
523             this.snapshotBytes = snapshotBytes;
524             replyReceivedForOffset = -1;
525             chunkIndex = 1;
526             int size = snapshotBytes.size();
527             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
528                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
529             context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
530                 size, totalChunks);
531         }
532
533         public ByteString getSnapshotBytes() {
534             return snapshotBytes;
535         }
536
537         public int incrementOffset() {
538             if(replyStatus) {
539                 // if prev chunk failed, we would want to sent the same chunk again
540                 offset = offset + context.getConfigParams().getSnapshotChunkSize();
541             }
542             return offset;
543         }
544
545         public int incrementChunkIndex() {
546             if (replyStatus) {
547                 // if prev chunk failed, we would want to sent the same chunk again
548                 chunkIndex =  chunkIndex + 1;
549             }
550             return chunkIndex;
551         }
552
553         public int getChunkIndex() {
554             return chunkIndex;
555         }
556
557         public int getTotalChunks() {
558             return totalChunks;
559         }
560
561         public boolean canSendNextChunk() {
562             // we only send a false if a chunk is sent but we have not received a reply yet
563             return replyReceivedForOffset == offset;
564         }
565
566         public boolean isLastChunk(int chunkIndex) {
567             return totalChunks == chunkIndex;
568         }
569
570         public void markSendStatus(boolean success) {
571             if (success) {
572                 // if the chunk sent was successful
573                 replyReceivedForOffset = offset;
574                 replyStatus = true;
575             } else {
576                 // if the chunk sent was failure
577                 replyReceivedForOffset = offset;
578                 replyStatus = false;
579             }
580         }
581
582         public ByteString getNextChunk() {
583             int snapshotLength = getSnapshotBytes().size();
584             int start = incrementOffset();
585             int size = context.getConfigParams().getSnapshotChunkSize();
586             if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
587                 size = snapshotLength;
588             } else {
589                 if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
590                     size = snapshotLength - start;
591                 }
592             }
593
594             context.getLogger().debug("length={}, offset={},size={}",
595                 snapshotLength, start, size);
596             return getSnapshotBytes().substring(start, start + size);
597
598         }
599     }
600
601 }