Fixup Sonar issues
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.persistence.JournalProtocol;
19 import akka.persistence.SnapshotProtocol;
20 import com.google.common.annotations.VisibleForTesting;
21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Objects;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.time.DurationFormatUtils;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.Nullable;
33 import org.opendaylight.controller.cluster.DataPersistenceProvider;
34 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
35 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
36 import org.opendaylight.controller.cluster.PersistentDataProvider;
37 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
38 import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo;
39 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
40 import org.opendaylight.controller.cluster.notifications.RoleChanged;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
43 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
45 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
46 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
47 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
48 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
49 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
50 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
51 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
52 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
53 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
54 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
55 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
56 import org.opendaylight.controller.cluster.raft.messages.Payload;
57 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
58 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
59 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
60 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
61 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
62 import org.opendaylight.yangtools.concepts.Identifier;
63 import org.opendaylight.yangtools.concepts.Immutable;
64
65 /**
66  * RaftActor encapsulates a state machine that needs to be kept synchronized
67  * in a cluster. It implements the RAFT algorithm as described in the paper
68  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
69  * In Search of an Understandable Consensus Algorithm</a>
70  *
71  * <p>
72  * RaftActor has 3 states and each state has a certain behavior associated
73  * with it. A Raft actor can behave as,
74  * <ul>
75  * <li> A Leader </li>
76  * <li> A Follower (or) </li>
77  * <li> A Candidate </li>
78  * </ul>
79  *
80  * <p>
81  * A RaftActor MUST be a Leader in order to accept requests from clients to
82  * change the state of it's encapsulated state machine. Once a RaftActor becomes
83  * a Leader it is also responsible for ensuring that all followers ultimately
84  * have the same log and therefore the same state machine as itself.
85  *
86  * <p>
87  * The current behavior of a RaftActor determines how election for leadership
88  * is initiated and how peer RaftActors react to request for votes.
89  *
90  * <p>
91  * Each RaftActor also needs to know the current election term. It uses this
92  * information for a couple of things. One is to simply figure out who it
93  * voted for in the last election. Another is to figure out if the message
94  * it received to update it's state is stale.
95  *
96  * <p>
97  * The RaftActor uses akka-persistence to store it's replicated log.
98  * Furthermore through it's behaviors a Raft Actor determines
99  * <ul>
100  * <li> when a log entry should be persisted </li>
101  * <li> when a log entry should be applied to the state machine (and) </li>
102  * <li> when a snapshot should be saved </li>
103  * </ul>
104  */
105 public abstract class RaftActor extends AbstractUntypedPersistentActor {
106
107     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
108
109     /**
110      * This context should NOT be passed directly to any other actor it is
111      * only to be consumed by the RaftActorBehaviors.
112      */
113     private final RaftActorContextImpl context;
114
115     private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
116
117     private final PersistentDataProvider persistentProvider;
118
119     private final BehaviorStateTracker behaviorStateTracker = new BehaviorStateTracker();
120
121     private RaftActorRecoverySupport raftRecovery;
122
123     private RaftActorSnapshotMessageSupport snapshotSupport;
124
125     private RaftActorServerConfigurationSupport serverConfigurationSupport;
126
127     private boolean shuttingDown;
128
129     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
130     protected RaftActor(final String id, final Map<String, String> peerAddresses,
131          final Optional<ConfigParams> configParams, final short payloadVersion) {
132
133         persistentProvider = new PersistentDataProvider(this);
134         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
135
136         context = new RaftActorContextImpl(getSelf(), getContext(), id,
137             new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
138             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
139             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
140
141         context.setPayloadVersion(payloadVersion);
142         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
143     }
144
145     @Override
146     public void preStart() throws Exception {
147         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
148                 context.getConfigParams().getJournalRecoveryLogBatchSize());
149
150         super.preStart();
151
152         snapshotSupport = newRaftActorSnapshotMessageSupport();
153         serverConfigurationSupport = new RaftActorServerConfigurationSupport(this);
154     }
155
156     @Override
157     public void postStop() throws Exception {
158         context.close();
159         super.postStop();
160     }
161
162     @Override
163     protected void handleRecover(final Object message) {
164         if (raftRecovery == null) {
165             raftRecovery = newRaftActorRecoverySupport();
166         }
167
168         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
169         if (recoveryComplete) {
170             onRecoveryComplete();
171
172             initializeBehavior();
173
174             raftRecovery = null;
175         }
176     }
177
178     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
179         return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
180     }
181
182     @VisibleForTesting
183     void initializeBehavior() {
184         changeCurrentBehavior(new Follower(context));
185     }
186
187     @VisibleForTesting
188     @SuppressWarnings("checkstyle:IllegalCatch")
189     protected void changeCurrentBehavior(final RaftActorBehavior newBehavior) {
190         final RaftActorBehavior currentBehavior = getCurrentBehavior();
191         if (currentBehavior != null) {
192             try {
193                 currentBehavior.close();
194             } catch (Exception e) {
195                 LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e);
196             }
197         }
198
199         final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
200         setCurrentBehavior(newBehavior);
201         handleBehaviorChange(state, newBehavior);
202     }
203
204     /**
205      * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)}
206      * for messages which are not handled by this class. Subclasses overriding this class should fall back to this
207      * implementation for messages which they do not handle
208      *
209      * @param message Incoming command message
210      */
211     protected void handleNonRaftCommand(final Object message) {
212         unhandled(message);
213     }
214
215     /**
216      * Handles a message.
217      *
218      * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
219      *             {@link #handleNonRaftCommand(Object)} instead.
220      */
221     @Deprecated
222     @Override
223     // FIXME: make this method final once our unit tests do not need to override it
224     protected void handleCommand(final Object message) {
225         if (serverConfigurationSupport.handleMessage(message, getSender())) {
226             return;
227         }
228         if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
229             return;
230         }
231         if (message instanceof ApplyState applyState) {
232             if (!hasFollowers()) {
233                 // for single node, the capture should happen after the apply state
234                 // as we delete messages from the persistent journal which have made it to the snapshot
235                 // capturing the snapshot before applying makes the persistent journal and snapshot out of sync
236                 // and recovery shows data missing
237                 context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
238
239                 context.getSnapshotManager().trimLog(context.getLastApplied());
240             }
241
242             possiblyHandleBehaviorMessage(message);
243         } else if (message instanceof ApplyJournalEntries applyEntries) {
244             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
245
246             persistence().persistAsync(applyEntries, NoopProcedure.instance());
247         } else if (message instanceof FindLeader) {
248             getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
249         } else if (message instanceof GetOnDemandRaftState) {
250             onGetOnDemandRaftStats();
251         } else if (message instanceof InitiateCaptureSnapshot) {
252             captureSnapshot();
253         } else if (message instanceof SwitchBehavior switchBehavior) {
254             switchBehavior(switchBehavior);
255         } else if (message instanceof LeaderTransitioning leaderTransitioning) {
256             onLeaderTransitioning(leaderTransitioning);
257         } else if (message instanceof Shutdown) {
258             onShutDown();
259         } else if (message instanceof Runnable runnable) {
260             runnable.run();
261         } else if (message instanceof NoopPayload noopPayload) {
262             persistData(null, null, noopPayload, false);
263         } else if (message instanceof RequestLeadership requestLeadership) {
264             onRequestLeadership(requestLeadership);
265         } else if (!possiblyHandleBehaviorMessage(message)) {
266             if (message instanceof JournalProtocol.Response response
267                 && delegatingPersistenceProvider.handleJournalResponse(response)) {
268                 LOG.debug("{}: handled a journal response", persistenceId());
269             } else if (message instanceof SnapshotProtocol.Response response
270                 && delegatingPersistenceProvider.handleSnapshotResponse(response)) {
271                 LOG.debug("{}: handled a snapshot response", persistenceId());
272             } else {
273                 handleNonRaftCommand(message);
274             }
275         }
276     }
277
278     private void onRequestLeadership(final RequestLeadership message) {
279         LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
280         if (!isLeader()) {
281             // non-leader cannot satisfy leadership request
282             LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
283                     + " Current behavior: {}. Sending failure response",
284                     persistenceId(), message, getCurrentBehavior().state());
285             message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
286                     + message.getRequestedFollowerId()
287                     + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
288             return;
289         }
290
291         final String requestedFollowerId = message.getRequestedFollowerId();
292         final ActorRef replyTo = message.getReplyTo();
293         initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
294             @Override
295             public void onSuccess(final ActorRef raftActorRef) {
296                 // sanity check
297                 if (!requestedFollowerId.equals(getLeaderId())) {
298                     onFailure(raftActorRef);
299                 }
300
301                 LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
302                 replyTo.tell(new Status.Success(null), getSelf());
303             }
304
305             @Override
306             public void onFailure(final ActorRef raftActorRef) {
307                 LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
308                 replyTo.tell(new Status.Failure(
309                         new LeadershipTransferFailedException(
310                                 "Failed to transfer leadership to " + requestedFollowerId
311                                         + ". Follower is not ready to become leader")),
312                         getSelf());
313             }
314         }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
315     }
316
317     private boolean possiblyHandleBehaviorMessage(final Object message) {
318         final RaftActorBehavior currentBehavior = getCurrentBehavior();
319         final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
320
321         // A behavior indicates that it processed the change by returning a reference to the next behavior
322         // to be used. A null return indicates it has not processed the message and we should be passing it to
323         // the subclass for handling.
324         final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
325         if (nextBehavior != null) {
326             switchBehavior(state, nextBehavior);
327             return true;
328         }
329
330         return false;
331     }
332
333     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
334             final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
335         LOG.debug("{}: Initiating leader transfer", persistenceId());
336
337         RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
338         if (leadershipTransferInProgress == null) {
339             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
340             leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
341             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
342                 @Override
343                 public void onSuccess(final ActorRef raftActorRef) {
344                     context.setRaftActorLeadershipTransferCohort(null);
345                 }
346
347                 @Override
348                 public void onFailure(final ActorRef raftActorRef) {
349                     context.setRaftActorLeadershipTransferCohort(null);
350                 }
351             });
352
353             leadershipTransferInProgress.addOnComplete(onComplete);
354
355             context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
356             leadershipTransferInProgress.init();
357
358         } else {
359             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
360             leadershipTransferInProgress.addOnComplete(onComplete);
361         }
362     }
363
364     private void onShutDown() {
365         LOG.debug("{}: onShutDown", persistenceId());
366
367         if (shuttingDown) {
368             return;
369         }
370
371         shuttingDown = true;
372
373         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
374         switch (currentBehavior.state()) {
375             case Leader:
376             case PreLeader:
377                 // Fall-through to more work
378                 break;
379             default:
380                 // For non-leaders shutdown is a no-op
381                 self().tell(PoisonPill.getInstance(), self());
382                 return;
383         }
384
385         if (context.hasFollowers()) {
386             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
387                 @Override
388                 public void onSuccess(final ActorRef raftActorRef) {
389                     LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
390                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
391                 }
392
393                 @Override
394                 public void onFailure(final ActorRef raftActorRef) {
395                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
396                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
397                 }
398             }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
399         } else {
400             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
401                 @Override
402                 protected void doRun() {
403                     self().tell(PoisonPill.getInstance(), self());
404                 }
405
406                 @Override
407                 protected void doCancel() {
408                     self().tell(PoisonPill.getInstance(), self());
409                 }
410             });
411         }
412     }
413
414     private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
415         LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
416         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
417         if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
418                 && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
419             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
420                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
421         }
422     }
423
424     private void switchBehavior(final SwitchBehavior message) {
425         if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
426             RaftState newState = message.getNewState();
427             if (newState == RaftState.Leader || newState == RaftState.Follower) {
428                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
429                 switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
430                     AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
431             } else {
432                 LOG.warn("Switching to behavior : {} - not supported", newState);
433             }
434         }
435     }
436
437     private void switchBehavior(final BehaviorState oldBehaviorState, final RaftActorBehavior nextBehavior) {
438         setCurrentBehavior(nextBehavior);
439         handleBehaviorChange(oldBehaviorState, nextBehavior);
440     }
441
442     @VisibleForTesting
443     RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
444         return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
445     }
446
447     private void onGetOnDemandRaftStats() {
448         // Debugging message to retrieve raft stats.
449
450         Map<String, String> peerAddresses = new HashMap<>();
451         Map<String, Boolean> peerVotingStates = new HashMap<>();
452         for (PeerInfo info: context.getPeers()) {
453             peerVotingStates.put(info.getId(), info.isVoting());
454             peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
455         }
456
457         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
458         OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
459                 .commitIndex(context.getCommitIndex())
460                 .currentTerm(context.getTermInformation().getCurrentTerm())
461                 .inMemoryJournalDataSize(replicatedLog().dataSize())
462                 .inMemoryJournalLogSize(replicatedLog().size())
463                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
464                 .lastApplied(context.getLastApplied())
465                 .lastIndex(replicatedLog().lastIndex())
466                 .lastTerm(replicatedLog().lastTerm())
467                 .leader(getLeaderId())
468                 .raftState(currentBehavior.state().toString())
469                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
470                 .snapshotIndex(replicatedLog().getSnapshotIndex())
471                 .snapshotTerm(replicatedLog().getSnapshotTerm())
472                 .votedFor(context.getTermInformation().getVotedFor())
473                 .isVoting(context.isVotingMember())
474                 .peerAddresses(peerAddresses)
475                 .peerVotingStates(peerVotingStates)
476                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
477
478         ReplicatedLogEntry lastLogEntry = replicatedLog().last();
479         if (lastLogEntry != null) {
480             builder.lastLogIndex(lastLogEntry.getIndex());
481             builder.lastLogTerm(lastLogEntry.getTerm());
482         }
483
484         if (getCurrentBehavior() instanceof AbstractLeader leader) {
485             Collection<String> followerIds = leader.getFollowerIds();
486             List<FollowerInfo> followerInfoList = new ArrayList<>(followerIds.size());
487             for (String id : followerIds) {
488                 final FollowerLogInformation info = leader.getFollower(id);
489                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
490                         info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
491                             TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
492                         context.getPeerInfo(info.getId()).isVoting()));
493             }
494
495             builder.followerInfoList(followerInfoList);
496         }
497
498         sender().tell(builder.build(), self());
499
500     }
501
502     protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
503         return OnDemandRaftState.builder();
504     }
505
506     private void handleBehaviorChange(final BehaviorState oldBehaviorState, final RaftActorBehavior currentBehavior) {
507         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
508
509         if (oldBehavior != currentBehavior) {
510             onStateChanged();
511         }
512
513         String lastLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastLeaderId();
514         String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
515         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
516
517         // it can happen that the state has not changed but the leader has changed.
518         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
519         if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
520                 || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
521             if (roleChangeNotifier.isPresent()) {
522                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
523                         currentBehavior.getLeaderPayloadVersion()), getSelf());
524             }
525
526             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
527
528             RaftActorLeadershipTransferCohort leadershipTransferInProgress =
529                     context.getRaftActorLeadershipTransferCohort();
530             if (leadershipTransferInProgress != null) {
531                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
532             }
533
534             serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
535         }
536
537         if (roleChangeNotifier.isPresent()
538                 && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
539             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
540                     currentBehavior.state().name()), getSelf());
541         }
542     }
543
544     private void handleApplyState(final ApplyState applyState) {
545         long startTime = System.nanoTime();
546
547         Payload payload = applyState.getReplicatedLogEntry().getData();
548         if (LOG.isDebugEnabled()) {
549             LOG.debug("{}: Applying state for log index {} data {}",
550                 persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
551         }
552
553         if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
554             applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
555         }
556
557         long elapsedTime = System.nanoTime() - startTime;
558         if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
559             LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
560                     TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
561         }
562
563         // Send the ApplyState message back to self to handle further processing asynchronously.
564         self().tell(applyState, self());
565     }
566
567     protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
568             final short leaderPayloadVersion) {
569         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
570     }
571
572     @Override
573     public long snapshotSequenceNr() {
574         // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
575         // so that we can delete the persistent journal based on the saved sequence-number
576         // However , when akka replays the journal during recovery, it replays it from the sequence number when the
577         // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
578         // last-sequence number known to us.
579         return context.getSnapshotManager().getLastSequenceNumber();
580     }
581
582     /**
583      * Persists the given Payload in the journal and replicates to any followers. After successful completion,
584      * {@link #applyState(ActorRef, Identifier, Object)} is notified.
585      *
586      * @param clientActor optional ActorRef that is provided via the applyState callback
587      * @param identifier the payload identifier
588      * @param data the payload data to persist
589      * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
590      *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
591      */
592     protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
593             final boolean batchHint) {
594         ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
595             context.getReplicatedLog().lastIndex() + 1,
596             context.getTermInformation().getCurrentTerm(), data);
597         replicatedLogEntry.setPersistencePending(true);
598
599         LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
600
601         final RaftActorContext raftContext = getRaftActorContext();
602
603         boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
604             // Clear the persistence pending flag in the log entry.
605             persistedLogEntry.setPersistencePending(false);
606
607             if (!hasFollowers()) {
608                 // Increment the Commit Index and the Last Applied values
609                 raftContext.setCommitIndex(persistedLogEntry.getIndex());
610                 raftContext.setLastApplied(persistedLogEntry.getIndex());
611
612                 // Apply the state immediately.
613                 handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
614
615                 // Send a ApplyJournalEntries message so that we write the fact that we applied
616                 // the state to durable storage
617                 self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
618
619             } else {
620                 context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
621
622                 // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
623                 // normally should still be the leader) to check if consensus has now been reached in conjunction with
624                 // follower replication.
625                 getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
626             }
627         }, true);
628
629         if (wasAppended && hasFollowers()) {
630             // Send log entry for replication.
631             getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
632                     !batchHint));
633         }
634     }
635
636     private ReplicatedLog replicatedLog() {
637         return context.getReplicatedLog();
638     }
639
640     protected String getId() {
641         return context.getId();
642     }
643
644     @VisibleForTesting
645     void setCurrentBehavior(final RaftActorBehavior behavior) {
646         context.setCurrentBehavior(behavior);
647     }
648
649     protected RaftActorBehavior getCurrentBehavior() {
650         return context.getCurrentBehavior();
651     }
652
653     /**
654      * Derived actors can call the isLeader method to check if the current
655      * RaftActor is the Leader or not.
656      *
657      * @return true it this RaftActor is a Leader false otherwise
658      */
659     protected boolean isLeader() {
660         return context.getId().equals(getCurrentBehavior().getLeaderId());
661     }
662
663     protected final boolean isLeaderActive() {
664         return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
665                 && !shuttingDown && !isLeadershipTransferInProgress();
666     }
667
668     protected boolean isLeadershipTransferInProgress() {
669         RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
670         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
671     }
672
673     /**
674      * Derived actor can call getLeader if they need a reference to the Leader.
675      * This would be useful for example in forwarding a request to an actor
676      * which is the leader
677      *
678      * @return A reference to the leader if known, null otherwise
679      */
680     public ActorSelection getLeader() {
681         String leaderAddress = getLeaderAddress();
682
683         if (leaderAddress == null) {
684             return null;
685         }
686
687         return context.actorSelection(leaderAddress);
688     }
689
690     /**
691      * Returns the id of the current leader.
692      *
693      * @return the current leader's id
694      */
695     protected final String getLeaderId() {
696         return getCurrentBehavior().getLeaderId();
697     }
698
699     @VisibleForTesting
700     protected final RaftState getRaftState() {
701         return getCurrentBehavior().state();
702     }
703
704     protected Long getCurrentTerm() {
705         return context.getTermInformation().getCurrentTerm();
706     }
707
708     protected RaftActorContext getRaftActorContext() {
709         return context;
710     }
711
712     protected void updateConfigParams(final ConfigParams configParams) {
713
714         // obtain the RaftPolicy for oldConfigParams and the updated one.
715         String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
716         String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
717
718         LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
719             oldRaftPolicy, newRaftPolicy);
720         context.setConfigParams(configParams);
721         if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
722             // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
723             // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
724             // avoids potential disruption. Otherwise, switch to Follower normally.
725             RaftActorBehavior behavior = getCurrentBehavior();
726             if (behavior != null && behavior.state() == RaftState.Follower) {
727                 String previousLeaderId = behavior.getLeaderId();
728                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
729
730                 LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
731                         previousLeaderId);
732
733                 changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
734             } else {
735                 initializeBehavior();
736             }
737         }
738     }
739
740     public final DataPersistenceProvider persistence() {
741         return delegatingPersistenceProvider.getDelegate();
742     }
743
744     public void setPersistence(final DataPersistenceProvider provider) {
745         delegatingPersistenceProvider.setDelegate(provider);
746     }
747
748     protected void setPersistence(final boolean persistent) {
749         DataPersistenceProvider currentPersistence = persistence();
750         if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
751             setPersistence(new PersistentDataProvider(this));
752
753             if (getCurrentBehavior() != null) {
754                 LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
755                 captureSnapshot();
756             }
757         } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
758             setPersistence(new NonPersistentDataProvider(this) {
759                 /*
760                  * The way snapshotting works is,
761                  * <ol>
762                  * <li> RaftActor calls createSnapshot on the Shard
763                  * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
764                  * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
765                  * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
766                  * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
767                  * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
768                  * </ol>
769                  */
770                 @Override
771                 public void saveSnapshot(final Object object) {
772                     // Make saving Snapshot successful
773                     // Committing the snapshot here would end up calling commit in the creating state which would
774                     // be a state violation. That's why now we send a message to commit the snapshot.
775                     self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
776                 }
777             });
778         }
779     }
780
781     /**
782      * setPeerAddress sets the address of a known peer at a later time.
783      *
784      * <p>
785      * This is to account for situations where a we know that a peer
786      * exists but we do not know an address up-front. This may also be used in
787      * situations where a known peer starts off in a different location and we
788      * need to change it's address
789      *
790      * <p>
791      * Note that if the peerId does not match the list of peers passed to
792      * this actor during construction an IllegalStateException will be thrown.
793      */
794     protected void setPeerAddress(final String peerId, final String peerAddress) {
795         context.setPeerAddress(peerId, peerAddress);
796     }
797
798     /**
799      * The applyState method will be called by the RaftActor when some data
800      * needs to be applied to the actor's state.
801      *
802      * @param clientActor A reference to the client who sent this message. This
803      *                    is the same reference that was passed to persistData
804      *                    by the derived actor. clientActor may be null when
805      *                    the RaftActor is behaving as a follower or during
806      *                    recovery.
807      * @param identifier  The identifier of the persisted data. This is also
808      *                    the same identifier that was passed to persistData by
809      *                    the derived actor. identifier may be null when
810      *                    the RaftActor is behaving as a follower or during
811      *                    recovery
812      * @param data        A piece of data that was persisted by the persistData call.
813      *                    This should NEVER be null.
814      */
815     protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
816
817     /**
818      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
819      */
820     protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
821
822     /**
823      * This method is called when recovery is complete.
824      */
825     protected abstract void onRecoveryComplete();
826
827     /**
828      * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
829      */
830     protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
831
832     /**
833      * This method will be called by the RaftActor when the state of the
834      * RaftActor changes. The derived actor can then use methods like
835      * isLeader or getLeader to do something useful
836      */
837     protected abstract void onStateChanged();
838
839     /**
840      * Notifier Actor for this RaftActor to notify when a role change happens.
841      *
842      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
843      */
844     protected abstract Optional<ActorRef> getRoleChangeNotifier();
845
846     /**
847      * This method is called on the leader when a voting change operation completes.
848      */
849     protected void onVotingStateChangeComplete() {
850     }
851
852     /**
853      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
854      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
855      * work prior to performing the operation. On completion of any work, the run method must be called on the
856      * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
857      * this actor's thread dispatcher as as it modifies internal state.
858      *
859      * <p>
860      * The default implementation immediately runs the operation.
861      *
862      * @param operation the operation to run
863      */
864     protected void pauseLeader(final Runnable operation) {
865         operation.run();
866     }
867
868     /**
869      * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
870      * should resume normal operations.
871      *
872      * <p>
873      * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
874      */
875     protected void unpauseLeader() {
876
877     }
878
879     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
880     }
881
882     private String getLeaderAddress() {
883         if (isLeader()) {
884             return getSelf().path().toString();
885         }
886         String leaderId = getLeaderId();
887         if (leaderId == null) {
888             return null;
889         }
890         String peerAddress = context.getPeerAddress(leaderId);
891         LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
892
893         return peerAddress;
894     }
895
896     protected boolean hasFollowers() {
897         return getRaftActorContext().hasFollowers();
898     }
899
900     private void captureSnapshot() {
901         SnapshotManager snapshotManager = context.getSnapshotManager();
902
903         if (!snapshotManager.isCapturing()) {
904             final long idx = getCurrentBehavior().getReplicatedToAllIndex();
905             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
906                 replicatedLog().last(), idx);
907
908             snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
909         }
910     }
911
912     /**
913      * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
914      * in which case we need to step down.
915      */
916     void becomeNonVoting() {
917         if (isLeader()) {
918             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
919                 @Override
920                 public void onSuccess(final ActorRef raftActorRef) {
921                     LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
922                     ensureFollowerState();
923                 }
924
925                 @Override
926                 public void onFailure(final ActorRef raftActorRef) {
927                     LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
928                     ensureFollowerState();
929                 }
930
931                 private void ensureFollowerState() {
932                     // Whether or not leadership transfer succeeded, we have to step down as leader and
933                     // switch to Follower so ensure that.
934                     if (getRaftState() != RaftState.Follower) {
935                         initializeBehavior();
936                     }
937                 }
938             }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
939         }
940     }
941
942     /**
943      * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
944      */
945     private abstract static class BehaviorState implements Immutable {
946         @Nullable abstract RaftActorBehavior getBehavior();
947
948         @Nullable abstract String getLastValidLeaderId();
949
950         @Nullable abstract String getLastLeaderId();
951
952         abstract short getLeaderPayloadVersion();
953     }
954
955     /**
956      * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state.
957      */
958     private static final class SimpleBehaviorState extends BehaviorState {
959         private final RaftActorBehavior behavior;
960         private final String lastValidLeaderId;
961         private final String lastLeaderId;
962         private final short leaderPayloadVersion;
963
964         SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
965                 final RaftActorBehavior behavior) {
966             this.lastValidLeaderId = lastValidLeaderId;
967             this.lastLeaderId = lastLeaderId;
968             this.behavior = requireNonNull(behavior);
969             leaderPayloadVersion = behavior.getLeaderPayloadVersion();
970         }
971
972         @Override
973         RaftActorBehavior getBehavior() {
974             return behavior;
975         }
976
977         @Override
978         String getLastValidLeaderId() {
979             return lastValidLeaderId;
980         }
981
982         @Override
983         short getLeaderPayloadVersion() {
984             return leaderPayloadVersion;
985         }
986
987         @Override
988         String getLastLeaderId() {
989             return lastLeaderId;
990         }
991     }
992
993     /**
994      * Class tracking behavior-related information, which we need to keep around and pass across behavior switches.
995      * An instance is created for each RaftActor. It has two functions:
996      * - it keeps track of the last leader ID we have encountered since we have been created
997      * - it creates state capture needed to transition from one behavior to the next
998      */
999     private static final class BehaviorStateTracker {
1000         /**
1001          * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
1002          * allowed before we receive the first message, we know the leader ID to be null.
1003          */
1004         private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() {
1005             @Override
1006             RaftActorBehavior getBehavior() {
1007                 return null;
1008             }
1009
1010             @Override
1011             String getLastValidLeaderId() {
1012                 return null;
1013             }
1014
1015             @Override
1016             short getLeaderPayloadVersion() {
1017                 return -1;
1018             }
1019
1020             @Override
1021             String getLastLeaderId() {
1022                 return null;
1023             }
1024         };
1025
1026         private String lastValidLeaderId;
1027         private String lastLeaderId;
1028
1029         BehaviorState capture(final RaftActorBehavior behavior) {
1030             if (behavior == null) {
1031                 verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
1032                 return NULL_BEHAVIOR_STATE;
1033             }
1034
1035             lastLeaderId = behavior.getLeaderId();
1036             if (lastLeaderId != null) {
1037                 lastValidLeaderId = lastLeaderId;
1038             }
1039
1040             return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
1041         }
1042     }
1043 }