Initial implementation of saving and installing snapshots
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
17 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftState;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
22 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
24 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
25 import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import scala.concurrent.duration.FiniteDuration;
32
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40
41 /**
42  * The behavior of a RaftActor when it is in the Leader state
43  * <p/>
44  * Leaders:
45  * <ul>
46  * <li> Upon election: send initial empty AppendEntries RPCs
47  * (heartbeat) to each server; repeat during idle periods to
48  * prevent election timeouts (§5.2)
49  * <li> If command received from client: append entry to local log,
50  * respond after entry applied to state machine (§5.3)
51  * <li> If last log index ≥ nextIndex for a follower: send
52  * AppendEntries RPC with log entries starting at nextIndex
53  * <ul>
54  * <li> If successful: update nextIndex and matchIndex for
55  * follower (§5.3)
56  * <li> If AppendEntries fails because of log inconsistency:
57  * decrement nextIndex and retry (§5.3)
58  * </ul>
59  * <li> If there exists an N such that N > commitIndex, a majority
60  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
61  * set commitIndex = N (§5.3, §5.4).
62  */
63 public class Leader extends AbstractRaftActorBehavior {
64
65
66     private final Map<String, FollowerLogInformation> followerToLog =
67         new HashMap();
68
69     private final Map<String, ActorSelection> followerToActor = new HashMap<>();
70
71     private Cancellable heartbeatSchedule = null;
72     private Cancellable appendEntriesSchedule = null;
73     private Cancellable installSnapshotSchedule = null;
74
75     private List<ClientRequestTracker> trackerList = new ArrayList<>();
76
77     private final int minReplicationCount;
78
79     public Leader(RaftActorContext context) {
80         super(context);
81
82         if (lastIndex() >= 0) {
83             context.setCommitIndex(lastIndex());
84         }
85
86         for (String followerId : context.getPeerAddresses().keySet()) {
87             FollowerLogInformation followerLogInformation =
88                 new FollowerLogInformationImpl(followerId,
89                     new AtomicLong(lastIndex()),
90                     new AtomicLong(-1));
91
92             followerToActor.put(followerId,
93                 context.actorSelection(context.getPeerAddress(followerId)));
94
95             followerToLog.put(followerId, followerLogInformation);
96
97         }
98
99         if (followerToActor.size() > 0) {
100             minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
101         } else {
102             minReplicationCount = 0;
103         }
104
105
106         // Immediately schedule a heartbeat
107         // Upon election: send initial empty AppendEntries RPCs
108         // (heartbeat) to each server; repeat during idle periods to
109         // prevent election timeouts (§5.2)
110         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
111
112         scheduleInstallSnapshotCheck(
113             new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
114                 HEART_BEAT_INTERVAL.unit())
115         );
116
117     }
118
119     @Override protected RaftState handleAppendEntries(ActorRef sender,
120         AppendEntries appendEntries, RaftState suggestedState) {
121
122         context.getLogger()
123             .error("An unexpected AppendEntries received in state " + state());
124
125         return suggestedState;
126     }
127
128     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
129         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
130
131         // Do not take any other action since a behavior change is coming
132         if (suggestedState != state())
133             return suggestedState;
134
135         // Update the FollowerLogInformation
136         String followerId = appendEntriesReply.getFollowerId();
137         FollowerLogInformation followerLogInformation =
138             followerToLog.get(followerId);
139         if (appendEntriesReply.isSuccess()) {
140             followerLogInformation
141                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
142             followerLogInformation
143                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
144         } else {
145             followerLogInformation.decrNextIndex();
146         }
147
148         // Now figure out if this reply warrants a change in the commitIndex
149         // If there exists an N such that N > commitIndex, a majority
150         // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
151         // set commitIndex = N (§5.3, §5.4).
152         for (long N = context.getCommitIndex() + 1; ; N++) {
153             int replicatedCount = 1;
154
155             for (FollowerLogInformation info : followerToLog.values()) {
156                 if (info.getMatchIndex().get() >= N) {
157                     replicatedCount++;
158                 }
159             }
160
161             if (replicatedCount >= minReplicationCount) {
162                 ReplicatedLogEntry replicatedLogEntry =
163                     context.getReplicatedLog().get(N);
164                 if (replicatedLogEntry != null
165                     && replicatedLogEntry.getTerm()
166                     == currentTerm()) {
167                     context.setCommitIndex(N);
168                 }
169             } else {
170                 break;
171             }
172         }
173
174         // Apply the change to the state machine
175         if (context.getCommitIndex() > context.getLastApplied()) {
176             applyLogToStateMachine(context.getCommitIndex());
177         }
178
179         return suggestedState;
180     }
181
182     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
183         for (ClientRequestTracker tracker : trackerList) {
184             if (tracker.getIndex() == logIndex) {
185                 return tracker;
186             }
187         }
188
189         return null;
190     }
191
192     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
193         RequestVoteReply requestVoteReply, RaftState suggestedState) {
194         return suggestedState;
195     }
196
197     @Override public RaftState state() {
198         return RaftState.Leader;
199     }
200
201     @Override public RaftState handleMessage(ActorRef sender, Object message) {
202         Preconditions.checkNotNull(sender, "sender should not be null");
203
204         try {
205             if (message instanceof SendHeartBeat) {
206                 return sendHeartBeat();
207             } else if(message instanceof SendInstallSnapshot) {
208                 installSnapshotIfNeeded();
209             } else if (message instanceof Replicate) {
210                 replicate((Replicate) message);
211             } else if (message instanceof InstallSnapshotReply){
212                 // FIXME : Should I be checking the term here too?
213                 handleInstallSnapshotReply(
214                     (InstallSnapshotReply) message);
215             }
216         } finally {
217             scheduleHeartBeat(HEART_BEAT_INTERVAL);
218         }
219
220         return super.handleMessage(sender, message);
221     }
222
223     private void handleInstallSnapshotReply(InstallSnapshotReply message) {
224         InstallSnapshotReply reply = message;
225         String followerId = reply.getFollowerId();
226         FollowerLogInformation followerLogInformation =
227             followerToLog.get(followerId);
228
229         followerLogInformation
230             .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
231         followerLogInformation
232             .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
233     }
234
235     private void replicate(Replicate replicate) {
236         long logIndex = replicate.getReplicatedLogEntry().getIndex();
237
238         context.getLogger().debug("Replicate message " + logIndex);
239
240         if (followerToActor.size() == 0) {
241             context.setCommitIndex(
242                 replicate.getReplicatedLogEntry().getIndex());
243
244             context.getActor()
245                 .tell(new ApplyState(replicate.getClientActor(),
246                         replicate.getIdentifier(),
247                         replicate.getReplicatedLogEntry()),
248                     context.getActor()
249                 );
250         } else {
251
252             // Create a tracker entry we will use this later to notify the
253             // client actor
254             trackerList.add(
255                 new ClientRequestTrackerImpl(replicate.getClientActor(),
256                     replicate.getIdentifier(),
257                     logIndex)
258             );
259
260             sendAppendEntries();
261         }
262     }
263
264     private void sendAppendEntries() {
265         // Send an AppendEntries to all followers
266         for (String followerId : followerToActor.keySet()) {
267             ActorSelection followerActor =
268                 followerToActor.get(followerId);
269
270             FollowerLogInformation followerLogInformation =
271                 followerToLog.get(followerId);
272
273             long nextIndex = followerLogInformation.getNextIndex().get();
274
275             List<ReplicatedLogEntry> entries = Collections.emptyList();
276
277             if(context.getReplicatedLog().isPresent(nextIndex)){
278                 entries =
279                     context.getReplicatedLog().getFrom(nextIndex);
280             }
281
282             followerActor.tell(
283                 new AppendEntries(currentTerm(), context.getId(),
284                     prevLogIndex(nextIndex), prevLogTerm(nextIndex),
285                     entries, context.getCommitIndex()
286                 ),
287                 actor()
288             );
289         }
290     }
291
292     private void installSnapshotIfNeeded(){
293         for (String followerId : followerToActor.keySet()) {
294             ActorSelection followerActor =
295                 followerToActor.get(followerId);
296
297             FollowerLogInformation followerLogInformation =
298                 followerToLog.get(followerId);
299
300             long nextIndex = followerLogInformation.getNextIndex().get();
301
302             if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
303                 followerActor.tell(
304                     new InstallSnapshot(currentTerm(), context.getId(),
305                         context.getReplicatedLog().getSnapshotIndex(),
306                         context.getReplicatedLog().getSnapshotTerm(),
307                         context.getReplicatedLog().getSnapshot()
308                     ),
309                     actor()
310                 );
311             }
312         }
313     }
314
315     private RaftState sendHeartBeat() {
316         if (followerToActor.size() > 0) {
317             sendAppendEntries();
318         }
319         return state();
320     }
321
322     private void stopHeartBeat() {
323         if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
324             heartbeatSchedule.cancel();
325         }
326     }
327
328     private void stopInstallSnapshotSchedule() {
329         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
330             installSnapshotSchedule.cancel();
331         }
332     }
333
334     private void scheduleHeartBeat(FiniteDuration interval) {
335         if(followerToActor.keySet().size() == 0){
336             // Optimization - do not bother scheduling a heartbeat as there are
337             // no followers
338             return;
339         }
340
341         stopHeartBeat();
342
343         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
344         // message is sent to itself.
345         // Scheduling the heartbeat only once here because heartbeats do not
346         // need to be sent if there are other messages being sent to the remote
347         // actor.
348         heartbeatSchedule =
349             context.getActorSystem().scheduler().scheduleOnce(
350                 interval,
351                 context.getActor(), new SendHeartBeat(),
352                 context.getActorSystem().dispatcher(), context.getActor());
353     }
354
355
356     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
357         if(followerToActor.keySet().size() == 0){
358             // Optimization - do not bother scheduling a heartbeat as there are
359             // no followers
360             return;
361         }
362
363         stopInstallSnapshotSchedule();
364
365         // Schedule a message to send append entries to followers that can
366         // accept an append entries with some data in it
367         installSnapshotSchedule =
368             context.getActorSystem().scheduler().scheduleOnce(
369                 interval,
370                 context.getActor(), new SendInstallSnapshot(),
371                 context.getActorSystem().dispatcher(), context.getActor());
372     }
373
374
375
376     @Override public void close() throws Exception {
377         stopHeartBeat();
378     }
379
380     @Override public String getLeaderId() {
381         return context.getId();
382     }
383
384 }