import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
*/
private final RaftActorContextImpl context;
- private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+ private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
private final PersistentDataProvider persistentProvider;
Optional<ConfigParams> configParams, short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
+ delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
+
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
super.preStart();
snapshotSupport = newRaftActorSnapshotMessageSupport();
- serverConfigurationSupport = new RaftActorServerConfigurationSupport(getRaftActorContext());
+ serverConfigurationSupport = new RaftActorServerConfigurationSupport(this);
}
@Override
initializeBehavior();
raftRecovery = null;
+
+ if (context.getReplicatedLog().size() > 0) {
+ self().tell(new InitiateCaptureSnapshot(), self());
+ LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
+ } else {
+ LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
+ }
}
}
@Override
public void handleCommand(final Object message) {
- if(serverConfigurationSupport.handleMessage(message, this, getSender())) {
+ if(serverConfigurationSupport.handleMessage(message, getSender())) {
return;
} else if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
captureSnapshot();
} else if(message instanceof SwitchBehavior){
switchBehavior(((SwitchBehavior) message));
- } else if(!snapshotSupport.handleSnapshotMessage(message)) {
+ } else if(message instanceof LeaderTransitioning) {
+ onLeaderTransitioning();
+ } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
+ private void onLeaderTransitioning() {
+ LOG.debug("{}: onLeaderTransitioning", persistenceId());
+ Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+ if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+ currentBehavior.getLeaderPayloadVersion()), getSelf());
+ }
+ }
+
private void switchBehavior(SwitchBehavior message) {
if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
private void onGetOnDemandRaftStats() {
// Debugging message to retrieve raft stats.
+ Map<String, String> peerAddresses = new HashMap<>();
+ for(String peerId: context.getPeerIds()) {
+ peerAddresses.put(peerId, context.getPeerAddress(peerId));
+ }
+
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
- .peerAddresses(context.getPeerAddresses());
+ .peerAddresses(peerAddresses)
+ .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
}
protected void updateConfigParams(ConfigParams configParams) {
+
+ // obtain the RaftPolicy for oldConfigParams and the updated one.
+ String oldRaftPolicy = context.getConfigParams().
+ getCustomRaftPolicyImplementationClass();
+ String newRaftPolicy = configParams.
+ getCustomRaftPolicyImplementationClass();
+
+ LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
+ oldRaftPolicy, newRaftPolicy);
context.setConfigParams(configParams);
+ if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
+ // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
+ // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
+ // avoids potential disruption. Otherwise, switch to Follower normally.
+ RaftActorBehavior behavior = currentBehavior.getDelegate();
+ if(behavior instanceof Follower) {
+ String previousLeaderId = ((Follower)behavior).getLeaderId();
+
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+
+ changeCurrentBehavior(new Follower(context, previousLeaderId));
+ } else {
+ initializeBehavior();
+ }
+ }
}
public final DataPersistenceProvider persistence() {