initializeBehavior();
raftRecovery = null;
+
+ if (context.getReplicatedLog().size() > 0) {
+ self().tell(new InitiateCaptureSnapshot(), self());
+ LOG.info("Snapshot capture initiated after recovery");
+ } else {
+ LOG.info("Snapshot capture NOT initiated after recovery, journal empty");
+ }
}
}
@Override
public void handleCommand(final Object message) {
- if (message instanceof ApplyState){
+ if(serverConfigurationSupport.handleMessage(message, this, getSender())) {
+ return;
+ } else if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- boolean result = serverConfigurationSupport.handleMessage(message, this, getSender());
- if(result){
- return;
- }
-
long elapsedTime = (System.nanoTime() - applyState.getStartTime());
if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
captureSnapshot();
} else if(message instanceof SwitchBehavior){
switchBehavior(((SwitchBehavior) message));
- } else if(!snapshotSupport.handleSnapshotMessage(message) &&
- !serverConfigurationSupport.handleMessage(message, this, getSender())) {
+ } else if(!snapshotSupport.handleSnapshotMessage(message)) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
private void onGetOnDemandRaftStats() {
// Debugging message to retrieve raft stats.
+ Map<String, String> peerAddresses = new HashMap<>();
+ for(String peerId: context.getPeerIds()) {
+ peerAddresses.put(peerId, context.getPeerAddress(peerId));
+ }
+
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
- .peerAddresses(new HashMap<>(context.getPeerAddresses()));
+ .peerAddresses(peerAddresses);
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
@Override
- public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+ public void apply(ReplicatedLogEntry replicatedLogEntry) {
if (!hasFollowers()){
// Increment the Commit Index and the Last Applied values
raftContext.setCommitIndex(replicatedLogEntry.getIndex());
}
protected void updateConfigParams(ConfigParams configParams) {
+
+ // obtain the RaftPolicy for oldConfigParams and the updated one.
+ String oldRaftPolicy = context.getConfigParams().
+ getCustomRaftPolicyImplementationClass();
+ String newRaftPolicy = configParams.
+ getCustomRaftPolicyImplementationClass();
+
+ LOG.debug ("RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}",
+ oldRaftPolicy, newRaftPolicy);
context.setConfigParams(configParams);
+ if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
+ //RaftPolicy is modifed for the Actor. Re-initialize its current behaviour
+ initializeBehavior();
+ }
}
public final DataPersistenceProvider persistence() {