Merge "Added hosttracker shell for karaf (rebased)"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import scala.concurrent.duration.FiniteDuration;
33
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 /**
44  * The behavior of a RaftActor when it is in the Leader state
45  * <p/>
46  * Leaders:
47  * <ul>
48  * <li> Upon election: send initial empty AppendEntries RPCs
49  * (heartbeat) to each server; repeat during idle periods to
50  * prevent election timeouts (§5.2)
51  * <li> If command received from client: append entry to local log,
52  * respond after entry applied to state machine (§5.3)
53  * <li> If last log index ≥ nextIndex for a follower: send
54  * AppendEntries RPC with log entries starting at nextIndex
55  * <ul>
56  * <li> If successful: update nextIndex and matchIndex for
57  * follower (§5.3)
58  * <li> If AppendEntries fails because of log inconsistency:
59  * decrement nextIndex and retry (§5.3)
60  * </ul>
61  * <li> If there exists an N such that N > commitIndex, a majority
62  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
63  * set commitIndex = N (§5.3, §5.4).
64  */
65 public class Leader extends AbstractRaftActorBehavior {
66
67
68     private final Map<String, FollowerLogInformation> followerToLog =
69         new HashMap();
70
71     private final Set<String> followers;
72
73     private Cancellable heartbeatSchedule = null;
74     private Cancellable appendEntriesSchedule = null;
75     private Cancellable installSnapshotSchedule = null;
76
77     private List<ClientRequestTracker> trackerList = new ArrayList<>();
78
79     private final int minReplicationCount;
80
81     public Leader(RaftActorContext context) {
82         super(context);
83
84         if (lastIndex() >= 0) {
85             context.setCommitIndex(lastIndex());
86         }
87
88         followers = context.getPeerAddresses().keySet();
89
90         for (String followerId : followers) {
91             FollowerLogInformation followerLogInformation =
92                 new FollowerLogInformationImpl(followerId,
93                     new AtomicLong(lastIndex()),
94                     new AtomicLong(-1));
95
96             followerToLog.put(followerId, followerLogInformation);
97         }
98
99         context.getLogger().debug("Election:Leader has following peers:"+ followers);
100
101         if (followers.size() > 0) {
102             minReplicationCount = (followers.size() + 1) / 2 + 1;
103         } else {
104             minReplicationCount = 0;
105         }
106
107
108         // Immediately schedule a heartbeat
109         // Upon election: send initial empty AppendEntries RPCs
110         // (heartbeat) to each server; repeat during idle periods to
111         // prevent election timeouts (§5.2)
112         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
113
114         scheduleInstallSnapshotCheck(
115             new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
116                 context.getConfigParams().getHeartBeatInterval().unit())
117         );
118
119     }
120
121     @Override protected RaftState handleAppendEntries(ActorRef sender,
122         AppendEntries appendEntries) {
123
124         context.getLogger().info("Leader: Received {}", appendEntries.toString());
125
126         return state();
127     }
128
129     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
130         AppendEntriesReply appendEntriesReply) {
131
132         if(! appendEntriesReply.isSuccess()) {
133             context.getLogger()
134                 .info("Leader: Received {}", appendEntriesReply.toString());
135         }
136
137         // Update the FollowerLogInformation
138         String followerId = appendEntriesReply.getFollowerId();
139         FollowerLogInformation followerLogInformation =
140             followerToLog.get(followerId);
141
142         if(followerLogInformation == null){
143             context.getLogger().error("Unknown follower {}", followerId);
144             return state();
145         }
146
147         if (appendEntriesReply.isSuccess()) {
148             followerLogInformation
149                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
150             followerLogInformation
151                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
152         } else {
153
154             // TODO: When we find that the follower is out of sync with the
155             // Leader we simply decrement that followers next index by 1.
156             // Would it be possible to do better than this? The RAFT spec
157             // does not explicitly deal with it but may be something for us to
158             // think about
159
160             followerLogInformation.decrNextIndex();
161         }
162
163         // Now figure out if this reply warrants a change in the commitIndex
164         // If there exists an N such that N > commitIndex, a majority
165         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
166         // set commitIndex = N (§5.3, §5.4).
167         for (long N = context.getCommitIndex() + 1; ; N++) {
168             int replicatedCount = 1;
169
170             for (FollowerLogInformation info : followerToLog.values()) {
171                 if (info.getMatchIndex().get() >= N) {
172                     replicatedCount++;
173                 }
174             }
175
176             if (replicatedCount >= minReplicationCount) {
177                 ReplicatedLogEntry replicatedLogEntry =
178                     context.getReplicatedLog().get(N);
179                 if (replicatedLogEntry != null
180                     && replicatedLogEntry.getTerm()
181                     == currentTerm()) {
182                     context.setCommitIndex(N);
183                 }
184             } else {
185                 break;
186             }
187         }
188
189         // Apply the change to the state machine
190         if (context.getCommitIndex() > context.getLastApplied()) {
191             applyLogToStateMachine(context.getCommitIndex());
192         }
193
194         return state();
195     }
196
197     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
198         for (ClientRequestTracker tracker : trackerList) {
199             if (tracker.getIndex() == logIndex) {
200                 return tracker;
201             }
202         }
203
204         return null;
205     }
206
207     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
208         RequestVoteReply requestVoteReply) {
209         return state();
210     }
211
212     @Override public RaftState state() {
213         return RaftState.Leader;
214     }
215
216     @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
217         Preconditions.checkNotNull(sender, "sender should not be null");
218
219         Object message = fromSerializableMessage(originalMessage);
220
221         if (message instanceof RaftRPC) {
222             RaftRPC rpc = (RaftRPC) message;
223             // If RPC request or response contains term T > currentTerm:
224             // set currentTerm = T, convert to follower (§5.1)
225             // This applies to all RPC messages and responses
226             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
227                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
228                 return RaftState.Follower;
229             }
230         }
231
232         try {
233             if (message instanceof SendHeartBeat) {
234                 return sendHeartBeat();
235             } else if(message instanceof SendInstallSnapshot) {
236                 installSnapshotIfNeeded();
237             } else if (message instanceof Replicate) {
238                 replicate((Replicate) message);
239             } else if (message instanceof InstallSnapshotReply){
240                 handleInstallSnapshotReply(
241                     (InstallSnapshotReply) message);
242             }
243         } finally {
244             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
245         }
246
247         return super.handleMessage(sender, message);
248     }
249
250     private void handleInstallSnapshotReply(InstallSnapshotReply message) {
251         InstallSnapshotReply reply = message;
252         String followerId = reply.getFollowerId();
253         FollowerLogInformation followerLogInformation =
254             followerToLog.get(followerId);
255
256         followerLogInformation
257             .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
258         followerLogInformation
259             .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
260     }
261
262     private void replicate(Replicate replicate) {
263         long logIndex = replicate.getReplicatedLogEntry().getIndex();
264
265         context.getLogger().debug("Replicate message " + logIndex);
266
267         if (followers.size() == 0) {
268             context.setCommitIndex(
269                 replicate.getReplicatedLogEntry().getIndex());
270
271             context.getActor()
272                 .tell(new ApplyState(replicate.getClientActor(),
273                         replicate.getIdentifier(),
274                         replicate.getReplicatedLogEntry()),
275                     context.getActor()
276                 );
277         } else {
278
279             // Create a tracker entry we will use this later to notify the
280             // client actor
281             trackerList.add(
282                 new ClientRequestTrackerImpl(replicate.getClientActor(),
283                     replicate.getIdentifier(),
284                     logIndex)
285             );
286
287             sendAppendEntries();
288         }
289     }
290
291     private void sendAppendEntries() {
292         // Send an AppendEntries to all followers
293         for (String followerId : followers) {
294             ActorSelection followerActor =
295                 context.getPeerActorSelection(followerId);
296
297             if (followerActor != null) {
298                 FollowerLogInformation followerLogInformation =
299                     followerToLog.get(followerId);
300
301                 long nextIndex = followerLogInformation.getNextIndex().get();
302
303                 List<ReplicatedLogEntry> entries = Collections.emptyList();
304
305                 if (context.getReplicatedLog().isPresent(nextIndex)) {
306                     // TODO: Instead of sending all entries from nextIndex
307                     // only send a fixed number of entries to each follower
308                     // This is to avoid the situation where there are a lot of
309                     // entries to install for a fresh follower or to a follower
310                     // that has fallen too far behind with the log but yet is not
311                     // eligible to receive a snapshot
312                     entries =
313                         context.getReplicatedLog().getFrom(nextIndex);
314                 }
315
316                 followerActor.tell(
317                     new AppendEntries(currentTerm(), context.getId(),
318                         prevLogIndex(nextIndex),
319                         prevLogTerm(nextIndex), entries,
320                         context.getCommitIndex()).toSerializable(),
321                     actor()
322                 );
323             }
324         }
325     }
326
327     /**
328      * An installSnapshot is scheduled at a interval that is a multiple of
329      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
330      * snapshots at every heartbeat.
331      */
332     private void installSnapshotIfNeeded(){
333         for (String followerId : followers) {
334             ActorSelection followerActor =
335                 context.getPeerActorSelection(followerId);
336
337             if(followerActor != null) {
338                 FollowerLogInformation followerLogInformation =
339                     followerToLog.get(followerId);
340
341                 long nextIndex = followerLogInformation.getNextIndex().get();
342
343                 if (!context.getReplicatedLog().isPresent(nextIndex) && context
344                     .getReplicatedLog().isInSnapshot(nextIndex)) {
345                     followerActor.tell(
346                         new InstallSnapshot(currentTerm(), context.getId(),
347                             context.getReplicatedLog().getSnapshotIndex(),
348                             context.getReplicatedLog().getSnapshotTerm(),
349                             context.getReplicatedLog().getSnapshot()
350                         ),
351                         actor()
352                     );
353                 }
354             }
355         }
356     }
357
358     private RaftState sendHeartBeat() {
359         if (followers.size() > 0) {
360             sendAppendEntries();
361         }
362         return state();
363     }
364
365     private void stopHeartBeat() {
366         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
367             heartbeatSchedule.cancel();
368         }
369     }
370
371     private void stopInstallSnapshotSchedule() {
372         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
373             installSnapshotSchedule.cancel();
374         }
375     }
376
377     private void scheduleHeartBeat(FiniteDuration interval) {
378         if(followers.size() == 0){
379             // Optimization - do not bother scheduling a heartbeat as there are
380             // no followers
381             return;
382         }
383
384         stopHeartBeat();
385
386         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
387         // message is sent to itself.
388         // Scheduling the heartbeat only once here because heartbeats do not
389         // need to be sent if there are other messages being sent to the remote
390         // actor.
391         heartbeatSchedule =
392             context.getActorSystem().scheduler().scheduleOnce(
393                 interval,
394                 context.getActor(), new SendHeartBeat(),
395                 context.getActorSystem().dispatcher(), context.getActor());
396     }
397
398
399     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
400         if(followers.size() == 0){
401             // Optimization - do not bother scheduling a heartbeat as there are
402             // no followers
403             return;
404         }
405
406         stopInstallSnapshotSchedule();
407
408         // Schedule a message to send append entries to followers that can
409         // accept an append entries with some data in it
410         installSnapshotSchedule =
411             context.getActorSystem().scheduler().scheduleOnce(
412                 interval,
413                 context.getActor(), new SendInstallSnapshot(),
414                 context.getActorSystem().dispatcher(), context.getActor());
415     }
416
417
418
419     @Override public void close() throws Exception {
420         stopHeartBeat();
421     }
422
423     @Override public String getLeaderId() {
424         return context.getId();
425     }
426
427 }