import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.Lists;
-import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
- (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+ configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
context.getSnapshotManager().trimLog(context.getLastApplied());
}
+ // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
+ possiblyHandleBehaviorMessage(message);
+
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
} else if(message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
} else if(message instanceof SwitchBehavior) {
- switchBehavior(((SwitchBehavior) message));
+ switchBehavior((SwitchBehavior) message);
} else if(message instanceof LeaderTransitioning) {
onLeaderTransitioning();
} else if(message instanceof Shutdown) {
((Runnable)message).run();
} else if(message instanceof NoopPayload) {
persistData(null, null, (NoopPayload)message);
- } else {
- // Processing the message may affect the state, hence we need to capture it
- final RaftActorBehavior currentBehavior = getCurrentBehavior();
- final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
-
- // A behavior indicates that it processed the change by returning a reference to the next behavior
- // to be used. A null return indicates it has not processed the message and we should be passing it to
- // the subclass for handling.
- final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
- if (nextBehavior != null) {
- switchBehavior(state, nextBehavior);
- } else {
- handleNonRaftCommand(message);
- }
+ } else if (!possiblyHandleBehaviorMessage(message)) {
+ handleNonRaftCommand(message);
}
}
+ private boolean possiblyHandleBehaviorMessage(final Object message) {
+ final RaftActorBehavior currentBehavior = getCurrentBehavior();
+ final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+ // A behavior indicates that it processed the change by returning a reference to the next behavior
+ // to be used. A null return indicates it has not processed the message and we should be passing it to
+ // the subclass for handling.
+ final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+ if (nextBehavior != null) {
+ switchBehavior(state, nextBehavior);
+ return true;
+ }
+
+ return false;
+ }
+
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
}
if (roleChangeNotifier.isPresent() &&
- (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+ (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
*
* @return A reference to the leader if known, null otherwise
*/
- protected ActorSelection getLeader(){
+ public ActorSelection getLeader(){
String leaderAddress = getLeaderAddress();
if(leaderAddress == null){
}
protected void setPersistence(boolean persistent) {
- if(persistent) {
+ DataPersistenceProvider currentPersistence = persistence();
+ if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
setPersistence(new PersistentDataProvider(this));
- } else {
+
+ if(getCurrentBehavior() != null) {
+ LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+ captureSnapshot();
+ }
+ } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
setPersistence(new NonPersistentDataProvider() {
/**
* The way snapshotting works is,
}
}
- /**
- * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.persisted.DeleteEntries}
- * whose type for fromIndex is long instead of int. This class was kept for backwards
- * compatibility with Helium.
- */
- // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
- @SuppressWarnings("serial")
- @Deprecated
- static class DeleteEntries implements Serializable {
- private final int fromIndex;
-
- public DeleteEntries(int fromIndex) {
- this.fromIndex = fromIndex;
- }
-
- public int getFromIndex() {
- return fromIndex;
- }
-
- private Object readResolve() {
- return org.opendaylight.controller.cluster.raft.persisted.DeleteEntries.createMigrated(fromIndex);
- }
- }
-
- /**
- * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm}
- * which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
- */
- // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
- @SuppressWarnings("serial")
- @Deprecated
- static class UpdateElectionTerm implements Serializable {
- private final long currentTerm;
- private final String votedFor;
-
- public UpdateElectionTerm(long currentTerm, String votedFor) {
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
- }
-
- public long getCurrentTerm() {
- return currentTerm;
- }
-
- public String getVotedFor() {
- return votedFor;
- }
-
- private Object readResolve() {
- return org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm.createMigrated(
- currentTerm, votedFor);
- }
- }
-
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/