Add ShutDown message to RaftActor to transfer leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.cluster.raft;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.japi.Procedure;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.Objects;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Supplier;
20 import com.google.common.collect.Lists;
21 import java.io.Serializable;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import org.apache.commons.lang3.time.DurationFormatUtils;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.PersistentDataProvider;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
34 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
35 import org.opendaylight.controller.cluster.notifications.RoleChanged;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
40 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
41 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
42 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
43 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
45 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
49 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
50 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * RaftActor encapsulates a state machine that needs to be kept synchronized
58  * in a cluster. It implements the RAFT algorithm as described in the paper
59  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
60  * In Search of an Understandable Consensus Algorithm</a>
61  * <p/>
62  * RaftActor has 3 states and each state has a certain behavior associated
63  * with it. A Raft actor can behave as,
64  * <ul>
65  * <li> A Leader </li>
66  * <li> A Follower (or) </li>
67  * <li> A Candidate </li>
68  * </ul>
69  * <p/>
70  * <p/>
71  * A RaftActor MUST be a Leader in order to accept requests from clients to
72  * change the state of it's encapsulated state machine. Once a RaftActor becomes
73  * a Leader it is also responsible for ensuring that all followers ultimately
74  * have the same log and therefore the same state machine as itself.
75  * <p/>
76  * <p/>
77  * The current behavior of a RaftActor determines how election for leadership
78  * is initiated and how peer RaftActors react to request for votes.
79  * <p/>
80  * <p/>
81  * Each RaftActor also needs to know the current election term. It uses this
82  * information for a couple of things. One is to simply figure out who it
83  * voted for in the last election. Another is to figure out if the message
84  * it received to update it's state is stale.
85  * <p/>
86  * <p/>
87  * The RaftActor uses akka-persistence to store it's replicated log.
88  * Furthermore through it's behaviors a Raft Actor determines
89  * <p/>
90  * <ul>
91  * <li> when a log entry should be persisted </li>
92  * <li> when a log entry should be applied to the state machine (and) </li>
93  * <li> when a snapshot should be saved </li>
94  * </ul>
95  */
96 public abstract class RaftActor extends AbstractUntypedPersistentActor {
97
98     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
99
100     protected final Logger LOG = LoggerFactory.getLogger(getClass());
101
102     /**
103      * The current state determines the current behavior of a RaftActor
104      * A Raft Actor always starts off in the Follower State
105      */
106     private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
107
108     /**
109      * This context should NOT be passed directly to any other actor it is
110      * only to be consumed by the RaftActorBehaviors
111      */
112     private final RaftActorContextImpl context;
113
114     private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
115
116     private final PersistentDataProvider persistentProvider;
117
118     private RaftActorRecoverySupport raftRecovery;
119
120     private RaftActorSnapshotMessageSupport snapshotSupport;
121
122     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
123
124     private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
125
126     private RaftActorServerConfigurationSupport serverConfigurationSupport;
127
128     private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
129
130     private boolean shuttingDown;
131
132     public RaftActor(String id, Map<String, String> peerAddresses,
133          Optional<ConfigParams> configParams, short payloadVersion) {
134
135         persistentProvider = new PersistentDataProvider(this);
136         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
137
138         context = new RaftActorContextImpl(this.getSelf(),
139             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
140             -1, -1, peerAddresses,
141             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
142             delegatingPersistenceProvider, LOG);
143
144         context.setPayloadVersion(payloadVersion);
145         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
146     }
147
148     @Override
149     public void preStart() throws Exception {
150         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
151                 context.getConfigParams().getJournalRecoveryLogBatchSize());
152
153         super.preStart();
154
155         snapshotSupport = newRaftActorSnapshotMessageSupport();
156         serverConfigurationSupport = new RaftActorServerConfigurationSupport(this);
157     }
158
159     @Override
160     public void postStop() {
161         if(currentBehavior.getDelegate() != null) {
162             try {
163                 currentBehavior.close();
164             } catch (Exception e) {
165                 LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
166             }
167         }
168
169         super.postStop();
170     }
171
172     @Override
173     public void handleRecover(Object message) {
174         if(raftRecovery == null) {
175             raftRecovery = newRaftActorRecoverySupport();
176         }
177
178         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
179         if(recoveryComplete) {
180             onRecoveryComplete();
181
182             initializeBehavior();
183
184             raftRecovery = null;
185
186             if (context.getReplicatedLog().size() > 0) {
187                 self().tell(new InitiateCaptureSnapshot(), self());
188                 LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
189             } else {
190                 LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
191             }
192         }
193     }
194
195     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
196         return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
197     }
198
199     protected void initializeBehavior(){
200         changeCurrentBehavior(new Follower(context));
201     }
202
203     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
204         reusableBehaviorStateHolder.init(getCurrentBehavior());
205         setCurrentBehavior(newBehavior);
206         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
207     }
208
209     @Override
210     public void handleCommand(final Object message) {
211         if(serverConfigurationSupport.handleMessage(message, getSender())) {
212             return;
213         } else if (message instanceof ApplyState){
214             ApplyState applyState = (ApplyState) message;
215
216             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
217             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
218                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
219                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
220             }
221
222             if(LOG.isDebugEnabled()) {
223                 LOG.debug("{}: Applying state for log index {} data {}",
224                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
225                     applyState.getReplicatedLogEntry().getData());
226             }
227
228             applyState(applyState.getClientActor(), applyState.getIdentifier(),
229                 applyState.getReplicatedLogEntry().getData());
230
231             if (!hasFollowers()) {
232                 // for single node, the capture should happen after the apply state
233                 // as we delete messages from the persistent journal which have made it to the snapshot
234                 // capturing the snapshot before applying makes the persistent journal and snapshot out of sync
235                 // and recovery shows data missing
236                 context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
237
238                 context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
239             }
240
241         } else if (message instanceof ApplyJournalEntries){
242             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
243             if(LOG.isDebugEnabled()) {
244                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
245             }
246
247             persistence().persist(applyEntries, NoopProcedure.instance());
248
249         } else if (message instanceof FindLeader) {
250             getSender().tell(
251                 new FindLeaderReply(getLeaderAddress()),
252                 getSelf()
253             );
254         } else if(message instanceof GetOnDemandRaftState) {
255             onGetOnDemandRaftStats();
256         } else if(message instanceof InitiateCaptureSnapshot) {
257             captureSnapshot();
258         } else if(message instanceof SwitchBehavior){
259             switchBehavior(((SwitchBehavior) message));
260         } else if(message instanceof LeaderTransitioning) {
261             onLeaderTransitioning();
262         } else if(message instanceof Shutdown) {
263             onShutDown();
264         } else if(message instanceof Runnable) {
265             ((Runnable)message).run();
266         } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
267             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
268         }
269     }
270
271     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
272         LOG.debug("{}: Initiating leader transfer", persistenceId());
273
274         if(leadershipTransferInProgress == null) {
275             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
276             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
277                 @Override
278                 public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
279                     leadershipTransferInProgress = null;
280                 }
281
282                 @Override
283                 public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
284                     leadershipTransferInProgress = null;
285                 }
286             });
287
288             leadershipTransferInProgress.addOnComplete(onComplete);
289             leadershipTransferInProgress.init();
290         } else {
291             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
292             leadershipTransferInProgress.addOnComplete(onComplete);
293         }
294     }
295
296     private void onShutDown() {
297         LOG.debug("{}: onShutDown", persistenceId());
298
299         if(shuttingDown) {
300             return;
301         }
302
303         shuttingDown = true;
304         if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
305             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
306                 @Override
307                 public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
308                     LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
309                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
310                 }
311
312                 @Override
313                 public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
314                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
315                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
316                 }
317             });
318         } else if(currentBehavior.state() == RaftState.Leader) {
319             pauseLeader(new Runnable() {
320                 @Override
321                 public void run() {
322                     self().tell(PoisonPill.getInstance(), self());
323                 }
324             });
325         } else {
326             self().tell(PoisonPill.getInstance(), self());
327         }
328     }
329
330     private void onLeaderTransitioning() {
331         LOG.debug("{}: onLeaderTransitioning", persistenceId());
332         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
333         if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
334             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
335                     currentBehavior.getLeaderPayloadVersion()), getSelf());
336         }
337     }
338
339     private void switchBehavior(SwitchBehavior message) {
340         if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
341             RaftState newState = message.getNewState();
342             if( newState == RaftState.Leader || newState == RaftState.Follower) {
343                 switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
344                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
345             } else {
346                 LOG.warn("Switching to behavior : {} - not supported", newState);
347             }
348         }
349     }
350
351     private void switchBehavior(Supplier<RaftActorBehavior> supplier){
352         reusableBehaviorStateHolder.init(getCurrentBehavior());
353
354         setCurrentBehavior(supplier.get());
355
356         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
357     }
358
359     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
360         return new RaftActorSnapshotMessageSupport(context, currentBehavior,
361                 getRaftActorSnapshotCohort());
362     }
363
364     private void onGetOnDemandRaftStats() {
365         // Debugging message to retrieve raft stats.
366
367         Map<String, String> peerAddresses = new HashMap<>();
368         for(String peerId: context.getPeerIds()) {
369             peerAddresses.put(peerId, context.getPeerAddress(peerId));
370         }
371
372         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
373                 .commitIndex(context.getCommitIndex())
374                 .currentTerm(context.getTermInformation().getCurrentTerm())
375                 .inMemoryJournalDataSize(replicatedLog().dataSize())
376                 .inMemoryJournalLogSize(replicatedLog().size())
377                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
378                 .lastApplied(context.getLastApplied())
379                 .lastIndex(replicatedLog().lastIndex())
380                 .lastTerm(replicatedLog().lastTerm())
381                 .leader(getLeaderId())
382                 .raftState(currentBehavior.state().toString())
383                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
384                 .snapshotIndex(replicatedLog().getSnapshotIndex())
385                 .snapshotTerm(replicatedLog().getSnapshotTerm())
386                 .votedFor(context.getTermInformation().getVotedFor())
387                 .peerAddresses(peerAddresses)
388                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
389
390         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
391         if (lastLogEntry != null) {
392             builder.lastLogIndex(lastLogEntry.getIndex());
393             builder.lastLogTerm(lastLogEntry.getTerm());
394         }
395
396         if(getCurrentBehavior() instanceof AbstractLeader) {
397             AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
398             Collection<String> followerIds = leader.getFollowerIds();
399             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
400             for(String id: followerIds) {
401                 final FollowerLogInformation info = leader.getFollower(id);
402                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
403                         info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
404             }
405
406             builder.followerInfoList(followerInfoList);
407         }
408
409         sender().tell(builder.build(), self());
410
411     }
412
413     private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
414         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
415
416         if (oldBehavior != currentBehavior){
417             onStateChanged();
418         }
419
420         String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
421         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
422
423         // it can happen that the state has not changed but the leader has changed.
424         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
425         if(!Objects.equal(lastValidLeaderId, currentBehavior.getLeaderId()) ||
426            oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
427             if(roleChangeNotifier.isPresent()) {
428                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
429                         currentBehavior.getLeaderPayloadVersion()), getSelf());
430             }
431
432             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
433
434             if(leadershipTransferInProgress != null) {
435                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
436             }
437         }
438
439         if (roleChangeNotifier.isPresent() &&
440                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
441             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
442                     currentBehavior.state().name()), getSelf());
443         }
444     }
445
446     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
447         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
448     }
449
450     @Override
451     public long snapshotSequenceNr() {
452         // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
453         // so that we can delete the persistent journal based on the saved sequence-number
454         // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
455         // was saved and not the number we saved.
456         // We would want to override it , by asking akka to use the last-sequence number known to us.
457         return context.getSnapshotManager().getLastSequenceNumber();
458     }
459
460     /**
461      * When a derived RaftActor needs to persist something it must call
462      * persistData.
463      *
464      * @param clientActor
465      * @param identifier
466      * @param data
467      */
468     protected void persistData(final ActorRef clientActor, final String identifier,
469         final Payload data) {
470
471         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
472             context.getReplicatedLog().lastIndex() + 1,
473             context.getTermInformation().getCurrentTerm(), data);
474
475         if(LOG.isDebugEnabled()) {
476             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
477         }
478
479         final RaftActorContext raftContext = getRaftActorContext();
480
481         replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
482             @Override
483             public void apply(ReplicatedLogEntry replicatedLogEntry) {
484                 if (!hasFollowers()){
485                     // Increment the Commit Index and the Last Applied values
486                     raftContext.setCommitIndex(replicatedLogEntry.getIndex());
487                     raftContext.setLastApplied(replicatedLogEntry.getIndex());
488
489                     // Apply the state immediately.
490                     self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
491
492                     // Send a ApplyJournalEntries message so that we write the fact that we applied
493                     // the state to durable storage
494                     self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
495
496                 } else if (clientActor != null) {
497                     context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
498
499                     // Send message for replication
500                     currentBehavior.handleMessage(getSelf(),
501                             new Replicate(clientActor, identifier, replicatedLogEntry));
502                 }
503             }
504         });
505     }
506
507     private ReplicatedLog replicatedLog() {
508         return context.getReplicatedLog();
509     }
510
511     protected String getId() {
512         return context.getId();
513     }
514
515     @VisibleForTesting
516     void setCurrentBehavior(RaftActorBehavior behavior) {
517         currentBehavior.setDelegate(behavior);
518     }
519
520     protected RaftActorBehavior getCurrentBehavior() {
521         return currentBehavior.getDelegate();
522     }
523
524     /**
525      * Derived actors can call the isLeader method to check if the current
526      * RaftActor is the Leader or not
527      *
528      * @return true it this RaftActor is a Leader false otherwise
529      */
530     protected boolean isLeader() {
531         return context.getId().equals(currentBehavior.getLeaderId());
532     }
533
534     protected boolean isLeaderActive() {
535         return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
536     }
537
538     /**
539      * Derived actor can call getLeader if they need a reference to the Leader.
540      * This would be useful for example in forwarding a request to an actor
541      * which is the leader
542      *
543      * @return A reference to the leader if known, null otherwise
544      */
545     protected ActorSelection getLeader(){
546         String leaderAddress = getLeaderAddress();
547
548         if(leaderAddress == null){
549             return null;
550         }
551
552         return context.actorSelection(leaderAddress);
553     }
554
555     /**
556      *
557      * @return the current leader's id
558      */
559     protected String getLeaderId(){
560         return currentBehavior.getLeaderId();
561     }
562
563     protected RaftState getRaftState() {
564         return currentBehavior.state();
565     }
566
567     protected ReplicatedLogEntry getLastLogEntry() {
568         return replicatedLog().last();
569     }
570
571     protected Long getCurrentTerm(){
572         return context.getTermInformation().getCurrentTerm();
573     }
574
575     protected Long getCommitIndex(){
576         return context.getCommitIndex();
577     }
578
579     protected Long getLastApplied(){
580         return context.getLastApplied();
581     }
582
583     protected RaftActorContext getRaftActorContext() {
584         return context;
585     }
586
587     protected void updateConfigParams(ConfigParams configParams) {
588
589         // obtain the RaftPolicy for oldConfigParams and the updated one.
590         String oldRaftPolicy = context.getConfigParams().
591             getCustomRaftPolicyImplementationClass();
592         String newRaftPolicy = configParams.
593             getCustomRaftPolicyImplementationClass();
594
595         LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
596             oldRaftPolicy, newRaftPolicy);
597         context.setConfigParams(configParams);
598         if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
599             // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
600             // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
601             // avoids potential disruption. Otherwise, switch to Follower normally.
602             RaftActorBehavior behavior = currentBehavior.getDelegate();
603             if(behavior instanceof Follower) {
604                 String previousLeaderId = ((Follower)behavior).getLeaderId();
605
606                 LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
607
608                 changeCurrentBehavior(new Follower(context, previousLeaderId));
609             } else {
610                 initializeBehavior();
611             }
612         }
613     }
614
615     public final DataPersistenceProvider persistence() {
616         return delegatingPersistenceProvider.getDelegate();
617     }
618
619     public void setPersistence(DataPersistenceProvider provider) {
620         delegatingPersistenceProvider.setDelegate(provider);
621     }
622
623     protected void setPersistence(boolean persistent) {
624         if(persistent) {
625             setPersistence(new PersistentDataProvider(this));
626         } else {
627             setPersistence(new NonPersistentDataProvider() {
628                 /**
629                  * The way snapshotting works is,
630                  * <ol>
631                  * <li> RaftActor calls createSnapshot on the Shard
632                  * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
633                  * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
634                  * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
635                  * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
636                  * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
637                  * </ol>
638                  */
639                 @Override
640                 public void saveSnapshot(Object o) {
641                     // Make saving Snapshot successful
642                     // Committing the snapshot here would end up calling commit in the creating state which would
643                     // be a state violation. That's why now we send a message to commit the snapshot.
644                     self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
645                 }
646             });
647         }
648     }
649
650     /**
651      * setPeerAddress sets the address of a known peer at a later time.
652      * <p>
653      * This is to account for situations where a we know that a peer
654      * exists but we do not know an address up-front. This may also be used in
655      * situations where a known peer starts off in a different location and we
656      * need to change it's address
657      * <p>
658      * Note that if the peerId does not match the list of peers passed to
659      * this actor during construction an IllegalStateException will be thrown.
660      *
661      * @param peerId
662      * @param peerAddress
663      */
664     protected void setPeerAddress(String peerId, String peerAddress){
665         context.setPeerAddress(peerId, peerAddress);
666     }
667
668     /**
669      * The applyState method will be called by the RaftActor when some data
670      * needs to be applied to the actor's state
671      *
672      * @param clientActor A reference to the client who sent this message. This
673      *                    is the same reference that was passed to persistData
674      *                    by the derived actor. clientActor may be null when
675      *                    the RaftActor is behaving as a follower or during
676      *                    recovery.
677      * @param identifier  The identifier of the persisted data. This is also
678      *                    the same identifier that was passed to persistData by
679      *                    the derived actor. identifier may be null when
680      *                    the RaftActor is behaving as a follower or during
681      *                    recovery
682      * @param data        A piece of data that was persisted by the persistData call.
683      *                    This should NEVER be null.
684      */
685     protected abstract void applyState(ActorRef clientActor, String identifier,
686         Object data);
687
688     /**
689      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
690      */
691     @Nonnull
692     protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
693
694     /**
695      * This method is called when recovery is complete.
696      */
697     protected abstract void onRecoveryComplete();
698
699     /**
700      * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
701      */
702     @Nonnull
703     protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
704
705     /**
706      * This method will be called by the RaftActor when the state of the
707      * RaftActor changes. The derived actor can then use methods like
708      * isLeader or getLeader to do something useful
709      */
710     protected abstract void onStateChanged();
711
712     /**
713      * Notifier Actor for this RaftActor to notify when a role change happens
714      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
715      */
716     protected abstract Optional<ActorRef> getRoleChangeNotifier();
717
718     /**
719      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
720      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
721      * work prior to performing the operation. On completion of any work, the run method must be called to
722      * proceed with the given operation.
723      * <p>
724      * The default implementation immediately runs the operation.
725      *
726      * @param operation the operation to run
727      */
728     protected void pauseLeader(Runnable operation) {
729         operation.run();
730     }
731
732     protected void onLeaderChanged(String oldLeader, String newLeader){};
733
734     private String getLeaderAddress(){
735         if(isLeader()){
736             return getSelf().path().toString();
737         }
738         String leaderId = currentBehavior.getLeaderId();
739         if (leaderId == null) {
740             return null;
741         }
742         String peerAddress = context.getPeerAddress(leaderId);
743         if(LOG.isDebugEnabled()) {
744             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
745                     persistenceId(), leaderId, peerAddress);
746         }
747
748         return peerAddress;
749     }
750
751     protected boolean hasFollowers(){
752         return getRaftActorContext().hasFollowers();
753     }
754
755     private void captureSnapshot() {
756         SnapshotManager snapshotManager = context.getSnapshotManager();
757
758         if(!snapshotManager.isCapturing()) {
759             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
760                 replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
761
762             snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
763         }
764     }
765
766     /**
767      * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
768      *             whose type for fromIndex is long instead of int. This class was kept for backwards
769      *             compatibility with Helium.
770      */
771     // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
772     @SuppressWarnings("serial")
773     @Deprecated
774     static class DeleteEntries implements Serializable {
775         private final int fromIndex;
776
777         public DeleteEntries(int fromIndex) {
778             this.fromIndex = fromIndex;
779         }
780
781         public int getFromIndex() {
782             return fromIndex;
783         }
784     }
785
786     /**
787      * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
788      *             which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
789      */
790     // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
791     @SuppressWarnings("serial")
792     @Deprecated
793     static class UpdateElectionTerm implements Serializable {
794         private final long currentTerm;
795         private final String votedFor;
796
797         public UpdateElectionTerm(long currentTerm, String votedFor) {
798             this.currentTerm = currentTerm;
799             this.votedFor = votedFor;
800         }
801
802         public long getCurrentTerm() {
803             return currentTerm;
804         }
805
806         public String getVotedFor() {
807             return votedFor;
808         }
809     }
810
811     private static class BehaviorStateHolder {
812         private RaftActorBehavior behavior;
813         private String lastValidLeaderId;
814         private short leaderPayloadVersion;
815
816         void init(RaftActorBehavior behavior) {
817             this.behavior = behavior;
818             this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1;
819
820             String behaviorLeaderId = behavior != null ? behavior.getLeaderId() : null;
821             if(behaviorLeaderId != null) {
822                 this.lastValidLeaderId = behaviorLeaderId;
823             }
824         }
825
826         RaftActorBehavior getBehavior() {
827             return behavior;
828         }
829
830         String getLastValidLeaderId() {
831             return lastValidLeaderId;
832         }
833
834         short getLeaderPayloadVersion() {
835             return leaderPayloadVersion;
836         }
837     }
838
839     private class SwitchBehaviorSupplier implements Supplier<RaftActorBehavior> {
840         private Object message;
841         private ActorRef sender;
842
843         public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){
844             this.sender = sender;
845             this.message = message;
846             return this;
847         }
848
849         @Override
850         public RaftActorBehavior get() {
851             if(this.message instanceof SwitchBehavior){
852                 return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext());
853             }
854             return currentBehavior.handleMessage(sender, message);
855         }
856     }
857 }