import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
*/
private final RaftActorContextImpl context;
- private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
+ private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
private final PersistentDataProvider persistentProvider;
Optional<ConfigParams> configParams, short payloadVersion) {
persistentProvider = new PersistentDataProvider(this);
+ delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
+
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
initializeBehavior();
raftRecovery = null;
+
+ if (context.getReplicatedLog().size() > 0) {
+ self().tell(new InitiateCaptureSnapshot(), self());
+ LOG.info("Snapshot capture initiated after recovery");
+ } else {
+ LOG.info("Snapshot capture NOT initiated after recovery, journal empty");
+ }
}
}
captureSnapshot();
} else if(message instanceof SwitchBehavior){
switchBehavior(((SwitchBehavior) message));
- } else if(!snapshotSupport.handleSnapshotMessage(message)) {
+ } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
private void onGetOnDemandRaftStats() {
// Debugging message to retrieve raft stats.
+ Map<String, String> peerAddresses = new HashMap<>();
+ for(String peerId: context.getPeerIds()) {
+ peerAddresses.put(peerId, context.getPeerAddress(peerId));
+ }
+
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
- .peerAddresses(context.getPeerAddresses());
+ .peerAddresses(peerAddresses);
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
}
protected void updateConfigParams(ConfigParams configParams) {
+
+ // obtain the RaftPolicy for oldConfigParams and the updated one.
+ String oldRaftPolicy = context.getConfigParams().
+ getCustomRaftPolicyImplementationClass();
+ String newRaftPolicy = configParams.
+ getCustomRaftPolicyImplementationClass();
+
+ LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
+ oldRaftPolicy, newRaftPolicy);
context.setConfigParams(configParams);
+ if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
+ // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
+ // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
+ // avoids potential disruption. Otherwise, switch to Follower normally.
+ RaftActorBehavior behavior = currentBehavior.getDelegate();
+ if(behavior instanceof Follower) {
+ String previousLeaderId = ((Follower)behavior).getLeaderId();
+
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+
+ changeCurrentBehavior(new Follower(context, previousLeaderId));
+ } else {
+ initializeBehavior();
+ }
+ }
}
public final DataPersistenceProvider persistence() {