import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
- if (snapshotTracker != null) {
+ if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm(), context.getPayloadVersion());
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
context.getReplicatedLog().appendAndPersist(entry);
+
+ if(entry.getData() instanceof ServerConfigurationPayload) {
+ applyServerConfiguration((ServerConfigurationPayload)entry.getData());
+ }
}
LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
return super.handleMessage(sender, message);
}
- private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+ private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
try {
+ final InstallSnapshotReply reply = new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())){
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor());
- actor().tell(new ApplySnapshot(snapshot), actor());
+ ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
+ @Override
+ public void onSuccess() {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
- snapshotTracker = null;
-
- }
+ sender.tell(reply, actor());
+ }
- InstallSnapshotReply reply = new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+ @Override
+ public void onFailure() {
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
+ }
+ };
- LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor());
- sender.tell(reply, actor());
+ snapshotTracker = null;
+ } else {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ sender.tell(reply, actor());
+ }
} catch (SnapshotTracker.InvalidChunkException e) {
LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);