620b2eb422bfb5576c7f8dc73fd74e631c45aa74
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.cluster.raft;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.japi.Procedure;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.Lists;
19 import java.io.Serializable;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.concurrent.TimeUnit;
26 import java.util.function.Supplier;
27 import javax.annotation.Nonnull;
28 import org.apache.commons.lang3.time.DurationFormatUtils;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
31 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
32 import org.opendaylight.controller.cluster.PersistentDataProvider;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
34 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
35 import org.opendaylight.controller.cluster.notifications.RoleChanged;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
38 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
39 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
40 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
41 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
42 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
43 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
45 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
49 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
50 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * RaftActor encapsulates a state machine that needs to be kept synchronized
58  * in a cluster. It implements the RAFT algorithm as described in the paper
59  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
60  * In Search of an Understandable Consensus Algorithm</a>
61  * <p/>
62  * RaftActor has 3 states and each state has a certain behavior associated
63  * with it. A Raft actor can behave as,
64  * <ul>
65  * <li> A Leader </li>
66  * <li> A Follower (or) </li>
67  * <li> A Candidate </li>
68  * </ul>
69  * <p/>
70  * <p/>
71  * A RaftActor MUST be a Leader in order to accept requests from clients to
72  * change the state of it's encapsulated state machine. Once a RaftActor becomes
73  * a Leader it is also responsible for ensuring that all followers ultimately
74  * have the same log and therefore the same state machine as itself.
75  * <p/>
76  * <p/>
77  * The current behavior of a RaftActor determines how election for leadership
78  * is initiated and how peer RaftActors react to request for votes.
79  * <p/>
80  * <p/>
81  * Each RaftActor also needs to know the current election term. It uses this
82  * information for a couple of things. One is to simply figure out who it
83  * voted for in the last election. Another is to figure out if the message
84  * it received to update it's state is stale.
85  * <p/>
86  * <p/>
87  * The RaftActor uses akka-persistence to store it's replicated log.
88  * Furthermore through it's behaviors a Raft Actor determines
89  * <p/>
90  * <ul>
91  * <li> when a log entry should be persisted </li>
92  * <li> when a log entry should be applied to the state machine (and) </li>
93  * <li> when a snapshot should be saved </li>
94  * </ul>
95  */
96 public abstract class RaftActor extends AbstractUntypedPersistentActor {
97
98     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
99
100     protected final Logger LOG = LoggerFactory.getLogger(getClass());
101
102     /**
103      * This context should NOT be passed directly to any other actor it is
104      * only to be consumed by the RaftActorBehaviors
105      */
106     private final RaftActorContextImpl context;
107
108     private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
109
110     private final PersistentDataProvider persistentProvider;
111
112     private RaftActorRecoverySupport raftRecovery;
113
114     private RaftActorSnapshotMessageSupport snapshotSupport;
115
116     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
117
118     private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
119
120     private RaftActorServerConfigurationSupport serverConfigurationSupport;
121
122     private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
123
124     private boolean shuttingDown;
125
126     public RaftActor(String id, Map<String, String> peerAddresses,
127          Optional<ConfigParams> configParams, short payloadVersion) {
128
129         persistentProvider = new PersistentDataProvider(this);
130         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
131
132         context = new RaftActorContextImpl(this.getSelf(),
133             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
134             -1, -1, peerAddresses,
135             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
136             delegatingPersistenceProvider, LOG);
137
138         context.setPayloadVersion(payloadVersion);
139         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
140     }
141
142     @Override
143     public void preStart() throws Exception {
144         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
145                 context.getConfigParams().getJournalRecoveryLogBatchSize());
146
147         super.preStart();
148
149         snapshotSupport = newRaftActorSnapshotMessageSupport();
150         serverConfigurationSupport = new RaftActorServerConfigurationSupport(this);
151     }
152
153     @Override
154     public void postStop() {
155         context.close();
156         super.postStop();
157     }
158
159     @Override
160     protected void handleRecover(Object message) {
161         if(raftRecovery == null) {
162             raftRecovery = newRaftActorRecoverySupport();
163         }
164
165         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
166         if(recoveryComplete) {
167             onRecoveryComplete();
168
169             initializeBehavior();
170
171             raftRecovery = null;
172
173             if (context.getReplicatedLog().size() > 0) {
174                 self().tell(new InitiateCaptureSnapshot(), self());
175                 LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
176             } else {
177                 LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
178             }
179         }
180     }
181
182     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
183         return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
184     }
185
186     @VisibleForTesting
187     void initializeBehavior(){
188         changeCurrentBehavior(new Follower(context));
189     }
190
191     @VisibleForTesting
192     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
193         if(getCurrentBehavior() != null) {
194             try {
195                 getCurrentBehavior().close();
196             } catch(Exception e) {
197                 LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e);
198             }
199         }
200
201         reusableBehaviorStateHolder.init(getCurrentBehavior());
202         setCurrentBehavior(newBehavior);
203         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
204     }
205
206     @Override
207     protected void handleCommand(final Object message) {
208         if (serverConfigurationSupport.handleMessage(message, getSender())) {
209             return;
210         }
211         if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
212             return;
213         }
214
215         if (message instanceof ApplyState) {
216             ApplyState applyState = (ApplyState) message;
217
218             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
219             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
220                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
221                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
222             }
223
224             if(LOG.isDebugEnabled()) {
225                 LOG.debug("{}: Applying state for log index {} data {}",
226                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
227                     applyState.getReplicatedLogEntry().getData());
228             }
229
230             applyState(applyState.getClientActor(), applyState.getIdentifier(),
231                 applyState.getReplicatedLogEntry().getData());
232
233             if (!hasFollowers()) {
234                 // for single node, the capture should happen after the apply state
235                 // as we delete messages from the persistent journal which have made it to the snapshot
236                 // capturing the snapshot before applying makes the persistent journal and snapshot out of sync
237                 // and recovery shows data missing
238                 context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
239
240                 context.getSnapshotManager().trimLog(context.getLastApplied());
241             }
242
243         } else if (message instanceof ApplyJournalEntries) {
244             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
245             if(LOG.isDebugEnabled()) {
246                 LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
247             }
248
249             persistence().persist(applyEntries, NoopProcedure.instance());
250
251         } else if (message instanceof FindLeader) {
252             getSender().tell(
253                 new FindLeaderReply(getLeaderAddress()),
254                 getSelf()
255             );
256         } else if(message instanceof GetOnDemandRaftState) {
257             onGetOnDemandRaftStats();
258         } else if(message instanceof InitiateCaptureSnapshot) {
259             captureSnapshot();
260         } else if(message instanceof SwitchBehavior) {
261             switchBehavior(((SwitchBehavior) message));
262         } else if(message instanceof LeaderTransitioning) {
263             onLeaderTransitioning();
264         } else if(message instanceof Shutdown) {
265             onShutDown();
266         } else if(message instanceof Runnable) {
267             ((Runnable)message).run();
268         } else {
269             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
270         }
271     }
272
273     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
274         LOG.debug("{}: Initiating leader transfer", persistenceId());
275
276         if(leadershipTransferInProgress == null) {
277             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
278             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
279                 @Override
280                 public void onSuccess(ActorRef raftActorRef) {
281                     leadershipTransferInProgress = null;
282                 }
283
284                 @Override
285                 public void onFailure(ActorRef raftActorRef) {
286                     leadershipTransferInProgress = null;
287                 }
288             });
289
290             leadershipTransferInProgress.addOnComplete(onComplete);
291             leadershipTransferInProgress.init();
292         } else {
293             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
294             leadershipTransferInProgress.addOnComplete(onComplete);
295         }
296     }
297
298     private void onShutDown() {
299         LOG.debug("{}: onShutDown", persistenceId());
300
301         if(shuttingDown) {
302             return;
303         }
304
305         shuttingDown = true;
306
307         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
308         if (currentBehavior.state() != RaftState.Leader) {
309             // For non-leaders shutdown is a no-op
310             self().tell(PoisonPill.getInstance(), self());
311             return;
312         }
313
314         if (context.hasFollowers()) {
315             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
316                 @Override
317                 public void onSuccess(ActorRef raftActorRef) {
318                     LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
319                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
320                 }
321
322                 @Override
323                 public void onFailure(ActorRef raftActorRef) {
324                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
325                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
326                 }
327             });
328         } else {
329             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
330                 @Override
331                 protected void doRun() {
332                     self().tell(PoisonPill.getInstance(), self());
333                 }
334
335                 @Override
336                 protected void doCancel() {
337                     self().tell(PoisonPill.getInstance(), self());
338                 }
339             });
340         }
341     }
342
343     private void onLeaderTransitioning() {
344         LOG.debug("{}: onLeaderTransitioning", persistenceId());
345         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
346         if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
347             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
348                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
349         }
350     }
351
352     private void switchBehavior(SwitchBehavior message) {
353         if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
354             RaftState newState = message.getNewState();
355             if( newState == RaftState.Leader || newState == RaftState.Follower) {
356                 switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
357                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
358             } else {
359                 LOG.warn("Switching to behavior : {} - not supported", newState);
360             }
361         }
362     }
363
364     private void switchBehavior(Supplier<RaftActorBehavior> supplier){
365         reusableBehaviorStateHolder.init(getCurrentBehavior());
366
367         setCurrentBehavior(supplier.get());
368
369         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
370     }
371
372     @VisibleForTesting
373     RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
374         return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
375     }
376
377     private void onGetOnDemandRaftStats() {
378         // Debugging message to retrieve raft stats.
379
380         Map<String, String> peerAddresses = new HashMap<>();
381         for(String peerId: context.getPeerIds()) {
382             peerAddresses.put(peerId, context.getPeerAddress(peerId));
383         }
384
385         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
386         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
387                 .commitIndex(context.getCommitIndex())
388                 .currentTerm(context.getTermInformation().getCurrentTerm())
389                 .inMemoryJournalDataSize(replicatedLog().dataSize())
390                 .inMemoryJournalLogSize(replicatedLog().size())
391                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
392                 .lastApplied(context.getLastApplied())
393                 .lastIndex(replicatedLog().lastIndex())
394                 .lastTerm(replicatedLog().lastTerm())
395                 .leader(getLeaderId())
396                 .raftState(currentBehavior.state().toString())
397                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
398                 .snapshotIndex(replicatedLog().getSnapshotIndex())
399                 .snapshotTerm(replicatedLog().getSnapshotTerm())
400                 .votedFor(context.getTermInformation().getVotedFor())
401                 .peerAddresses(peerAddresses)
402                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
403
404         ReplicatedLogEntry lastLogEntry = replicatedLog().last();
405         if (lastLogEntry != null) {
406             builder.lastLogIndex(lastLogEntry.getIndex());
407             builder.lastLogTerm(lastLogEntry.getTerm());
408         }
409
410         if(getCurrentBehavior() instanceof AbstractLeader) {
411             AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
412             Collection<String> followerIds = leader.getFollowerIds();
413             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
414             for(String id: followerIds) {
415                 final FollowerLogInformation info = leader.getFollower(id);
416                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
417                         info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
418             }
419
420             builder.followerInfoList(followerInfoList);
421         }
422
423         sender().tell(builder.build(), self());
424
425     }
426
427     private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
428         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
429
430         if (oldBehavior != currentBehavior){
431             onStateChanged();
432         }
433
434         String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
435         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
436
437         // it can happen that the state has not changed but the leader has changed.
438         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
439         if(!Objects.equals(lastValidLeaderId, currentBehavior.getLeaderId()) ||
440            oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
441             if(roleChangeNotifier.isPresent()) {
442                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
443                         currentBehavior.getLeaderPayloadVersion()), getSelf());
444             }
445
446             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
447
448             if(leadershipTransferInProgress != null) {
449                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
450             }
451
452             serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
453         }
454
455         if (roleChangeNotifier.isPresent() &&
456                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
457             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
458                     currentBehavior.state().name()), getSelf());
459         }
460     }
461
462     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
463         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
464     }
465
466     @Override
467     public long snapshotSequenceNr() {
468         // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
469         // so that we can delete the persistent journal based on the saved sequence-number
470         // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
471         // was saved and not the number we saved.
472         // We would want to override it , by asking akka to use the last-sequence number known to us.
473         return context.getSnapshotManager().getLastSequenceNumber();
474     }
475
476     /**
477      * When a derived RaftActor needs to persist something it must call
478      * persistData.
479      *
480      * @param clientActor
481      * @param identifier
482      * @param data
483      */
484     protected void persistData(final ActorRef clientActor, final String identifier,
485         final Payload data) {
486
487         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
488             context.getReplicatedLog().lastIndex() + 1,
489             context.getTermInformation().getCurrentTerm(), data);
490
491         if(LOG.isDebugEnabled()) {
492             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
493         }
494
495         final RaftActorContext raftContext = getRaftActorContext();
496
497         replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
498             @Override
499             public void apply(ReplicatedLogEntry replicatedLogEntry) {
500                 if (!hasFollowers()){
501                     // Increment the Commit Index and the Last Applied values
502                     raftContext.setCommitIndex(replicatedLogEntry.getIndex());
503                     raftContext.setLastApplied(replicatedLogEntry.getIndex());
504
505                     // Apply the state immediately.
506                     self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
507
508                     // Send a ApplyJournalEntries message so that we write the fact that we applied
509                     // the state to durable storage
510                     self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
511
512                 } else if (clientActor != null) {
513                     context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
514
515                     // Send message for replication
516                     getCurrentBehavior().handleMessage(getSelf(),
517                             new Replicate(clientActor, identifier, replicatedLogEntry));
518                 }
519             }
520         });
521     }
522
523     private ReplicatedLog replicatedLog() {
524         return context.getReplicatedLog();
525     }
526
527     protected String getId() {
528         return context.getId();
529     }
530
531     @VisibleForTesting
532     void setCurrentBehavior(RaftActorBehavior behavior) {
533         context.setCurrentBehavior(behavior);
534     }
535
536     protected RaftActorBehavior getCurrentBehavior() {
537         return context.getCurrentBehavior();
538     }
539
540     /**
541      * Derived actors can call the isLeader method to check if the current
542      * RaftActor is the Leader or not
543      *
544      * @return true it this RaftActor is a Leader false otherwise
545      */
546     protected boolean isLeader() {
547         return context.getId().equals(getCurrentBehavior().getLeaderId());
548     }
549
550     protected final boolean isLeaderActive() {
551         return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
552                 !isLeadershipTransferInProgress();
553     }
554
555     private boolean isLeadershipTransferInProgress() {
556         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
557     }
558
559     /**
560      * Derived actor can call getLeader if they need a reference to the Leader.
561      * This would be useful for example in forwarding a request to an actor
562      * which is the leader
563      *
564      * @return A reference to the leader if known, null otherwise
565      */
566     protected ActorSelection getLeader(){
567         String leaderAddress = getLeaderAddress();
568
569         if(leaderAddress == null){
570             return null;
571         }
572
573         return context.actorSelection(leaderAddress);
574     }
575
576     /**
577      *
578      * @return the current leader's id
579      */
580     protected final String getLeaderId(){
581         return getCurrentBehavior().getLeaderId();
582     }
583
584     @VisibleForTesting
585     protected final RaftState getRaftState() {
586         return getCurrentBehavior().state();
587     }
588
589     protected Long getCurrentTerm(){
590         return context.getTermInformation().getCurrentTerm();
591     }
592
593     protected RaftActorContext getRaftActorContext() {
594         return context;
595     }
596
597     protected void updateConfigParams(ConfigParams configParams) {
598
599         // obtain the RaftPolicy for oldConfigParams and the updated one.
600         String oldRaftPolicy = context.getConfigParams().
601             getCustomRaftPolicyImplementationClass();
602         String newRaftPolicy = configParams.
603             getCustomRaftPolicyImplementationClass();
604
605         LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
606             oldRaftPolicy, newRaftPolicy);
607         context.setConfigParams(configParams);
608         if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
609             // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
610             // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
611             // avoids potential disruption. Otherwise, switch to Follower normally.
612             RaftActorBehavior behavior = getCurrentBehavior();
613             if(behavior instanceof Follower) {
614                 String previousLeaderId = ((Follower)behavior).getLeaderId();
615                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
616
617                 LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
618
619                 changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
620             } else {
621                 initializeBehavior();
622             }
623         }
624     }
625
626     public final DataPersistenceProvider persistence() {
627         return delegatingPersistenceProvider.getDelegate();
628     }
629
630     public void setPersistence(DataPersistenceProvider provider) {
631         delegatingPersistenceProvider.setDelegate(provider);
632     }
633
634     protected void setPersistence(boolean persistent) {
635         if(persistent) {
636             setPersistence(new PersistentDataProvider(this));
637         } else {
638             setPersistence(new NonPersistentDataProvider() {
639                 /**
640                  * The way snapshotting works is,
641                  * <ol>
642                  * <li> RaftActor calls createSnapshot on the Shard
643                  * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
644                  * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
645                  * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
646                  * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
647                  * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
648                  * </ol>
649                  */
650                 @Override
651                 public void saveSnapshot(Object o) {
652                     // Make saving Snapshot successful
653                     // Committing the snapshot here would end up calling commit in the creating state which would
654                     // be a state violation. That's why now we send a message to commit the snapshot.
655                     self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
656                 }
657             });
658         }
659     }
660
661     /**
662      * setPeerAddress sets the address of a known peer at a later time.
663      * <p>
664      * This is to account for situations where a we know that a peer
665      * exists but we do not know an address up-front. This may also be used in
666      * situations where a known peer starts off in a different location and we
667      * need to change it's address
668      * <p>
669      * Note that if the peerId does not match the list of peers passed to
670      * this actor during construction an IllegalStateException will be thrown.
671      *
672      * @param peerId
673      * @param peerAddress
674      */
675     protected void setPeerAddress(String peerId, String peerAddress){
676         context.setPeerAddress(peerId, peerAddress);
677     }
678
679     /**
680      * The applyState method will be called by the RaftActor when some data
681      * needs to be applied to the actor's state
682      *
683      * @param clientActor A reference to the client who sent this message. This
684      *                    is the same reference that was passed to persistData
685      *                    by the derived actor. clientActor may be null when
686      *                    the RaftActor is behaving as a follower or during
687      *                    recovery.
688      * @param identifier  The identifier of the persisted data. This is also
689      *                    the same identifier that was passed to persistData by
690      *                    the derived actor. identifier may be null when
691      *                    the RaftActor is behaving as a follower or during
692      *                    recovery
693      * @param data        A piece of data that was persisted by the persistData call.
694      *                    This should NEVER be null.
695      */
696     protected abstract void applyState(ActorRef clientActor, String identifier,
697         Object data);
698
699     /**
700      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
701      */
702     @Nonnull
703     protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
704
705     /**
706      * This method is called when recovery is complete.
707      */
708     protected abstract void onRecoveryComplete();
709
710     /**
711      * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
712      */
713     @Nonnull
714     protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
715
716     /**
717      * This method will be called by the RaftActor when the state of the
718      * RaftActor changes. The derived actor can then use methods like
719      * isLeader or getLeader to do something useful
720      */
721     protected abstract void onStateChanged();
722
723     /**
724      * Notifier Actor for this RaftActor to notify when a role change happens
725      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
726      */
727     protected abstract Optional<ActorRef> getRoleChangeNotifier();
728
729     /**
730      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
731      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
732      * work prior to performing the operation. On completion of any work, the run method must be called on the
733      * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
734      * this actor's thread dispatcher as as it modifies internal state.
735      * <p>
736      * The default implementation immediately runs the operation.
737      *
738      * @param operation the operation to run
739      */
740     protected void pauseLeader(Runnable operation) {
741         operation.run();
742     }
743
744     protected void onLeaderChanged(String oldLeader, String newLeader) {
745
746     };
747
748     private String getLeaderAddress(){
749         if(isLeader()){
750             return getSelf().path().toString();
751         }
752         String leaderId = getLeaderId();
753         if (leaderId == null) {
754             return null;
755         }
756         String peerAddress = context.getPeerAddress(leaderId);
757         if(LOG.isDebugEnabled()) {
758             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
759                     persistenceId(), leaderId, peerAddress);
760         }
761
762         return peerAddress;
763     }
764
765     protected boolean hasFollowers(){
766         return getRaftActorContext().hasFollowers();
767     }
768
769     private void captureSnapshot() {
770         SnapshotManager snapshotManager = context.getSnapshotManager();
771
772         if (!snapshotManager.isCapturing()) {
773             final long idx = getCurrentBehavior().getReplicatedToAllIndex();
774             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
775                 replicatedLog().last(), idx);
776
777             snapshotManager.capture(replicatedLog().last(), idx);
778         }
779     }
780
781     /**
782      * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
783      * in which case we need to step down.
784      */
785     void becomeNonVoting() {
786         if (isLeader()) {
787             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
788                 @Override
789                 public void onSuccess(ActorRef raftActorRef) {
790                     LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
791                     ensureFollowerState();
792                 }
793
794                 @Override
795                 public void onFailure(ActorRef raftActorRef) {
796                     LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
797                     ensureFollowerState();
798                 }
799
800                 private void ensureFollowerState() {
801                     // Whether or not leadership transfer succeeded, we have to step down as leader and
802                     // switch to Follower so ensure that.
803                     if (getRaftState() != RaftState.Follower) {
804                         initializeBehavior();
805                     }
806                 }
807             });
808         }
809     }
810
811     /**
812      * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
813      *             whose type for fromIndex is long instead of int. This class was kept for backwards
814      *             compatibility with Helium.
815      */
816     // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
817     @SuppressWarnings("serial")
818     @Deprecated
819     static class DeleteEntries implements Serializable {
820         private final int fromIndex;
821
822         public DeleteEntries(int fromIndex) {
823             this.fromIndex = fromIndex;
824         }
825
826         public int getFromIndex() {
827             return fromIndex;
828         }
829     }
830
831     /**
832      * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
833      *             which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
834      */
835     // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
836     @SuppressWarnings("serial")
837     @Deprecated
838     static class UpdateElectionTerm implements Serializable {
839         private final long currentTerm;
840         private final String votedFor;
841
842         public UpdateElectionTerm(long currentTerm, String votedFor) {
843             this.currentTerm = currentTerm;
844             this.votedFor = votedFor;
845         }
846
847         public long getCurrentTerm() {
848             return currentTerm;
849         }
850
851         public String getVotedFor() {
852             return votedFor;
853         }
854     }
855
856     private static class BehaviorStateHolder {
857         private RaftActorBehavior behavior;
858         private String lastValidLeaderId;
859         private short leaderPayloadVersion;
860
861         void init(RaftActorBehavior behavior) {
862             this.behavior = behavior;
863             this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1;
864
865             String behaviorLeaderId = behavior != null ? behavior.getLeaderId() : null;
866             if(behaviorLeaderId != null) {
867                 this.lastValidLeaderId = behaviorLeaderId;
868             }
869         }
870
871         RaftActorBehavior getBehavior() {
872             return behavior;
873         }
874
875         String getLastValidLeaderId() {
876             return lastValidLeaderId;
877         }
878
879         short getLeaderPayloadVersion() {
880             return leaderPayloadVersion;
881         }
882     }
883
884     private class SwitchBehaviorSupplier implements Supplier<RaftActorBehavior> {
885         private Object message;
886         private ActorRef sender;
887
888         public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){
889             this.sender = sender;
890             this.message = message;
891             return this;
892         }
893
894         @Override
895         public RaftActorBehavior get() {
896             if(this.message instanceof SwitchBehavior){
897                 return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
898             }
899             return getCurrentBehavior().handleMessage(sender, message);
900         }
901     }
902 }