import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.base.Stopwatch;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
/**
*/
class RaftActorRecoverySupport {
private final RaftActorContext context;
- private final RaftActorBehavior currentBehavior;
private final RaftActorRecoveryCohort cohort;
private int currentRecoveryBatchCount;
private boolean dataRecoveredWithPersistenceDisabled;
+ private boolean anyDataRecovered;
private Stopwatch recoveryTimer;
private final Logger log;
- RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
- RaftActorRecoveryCohort cohort) {
+ RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
this.context = context;
- this.currentBehavior = currentBehavior;
this.cohort = cohort;
this.log = context.getLogger();
}
boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
- log.trace("handleRecoveryMessage: {}", message);
+ log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
+
+ anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
boolean recoveryComplete = false;
DataPersistenceProvider persistence = context.getPersistenceProvider();
replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
} else if (message instanceof RecoveryCompleted) {
onRecoveryCompletedMessage();
+ possiblyRestoreFromSnapshot();
recoveryComplete = true;
}
} else if (message instanceof RecoveryCompleted) {
// Delete all the akka snapshots as they will not be needed
persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
- scala.Long.MaxValue()));
+ scala.Long.MaxValue(), 0L, 0L));
// Since we cleaned out the journal, we need to re-write the current election info.
context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor());
}
+
+ possiblyRestoreFromSnapshot();
} else {
- dataRecoveredWithPersistenceDisabled = true;
+ boolean isServerConfigPayload = false;
+ if(message instanceof ReplicatedLogEntry){
+ ReplicatedLogEntry repLogEntry = (ReplicatedLogEntry)message;
+ if(isServerConfigurationPayload(repLogEntry)){
+ isServerConfigPayload = true;
+ context.updatePeerIds((ServerConfigurationPayload)repLogEntry.getData());
+ }
+ }
+
+ if(!isServerConfigPayload){
+ dataRecoveredWithPersistenceDisabled = true;
+ }
}
return recoveryComplete;
}
+ private void possiblyRestoreFromSnapshot() {
+ byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ if(restoreFromSnapshot == null) {
+ return;
+ }
+
+ if(anyDataRecovered) {
+ log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
+ context.getId());
+ return;
+ }
+
+ try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
+ Snapshot snapshot = (Snapshot) ois.readObject();
+
+ log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
+
+ context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
+ } catch(Exception e) {
+ log.error("{}: Error deserializing snapshot restore", context.getId(), e);
+ }
+ }
+
private ReplicatedLog replicatedLog() {
return context.getReplicatedLog();
}
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
// Apply the snapshot to the actors state
cohort.applyRecoverySnapshot(snapshot.getState());
+ if (snapshot.getServerConfiguration() != null) {
+ context.updatePeerIds(snapshot.getServerConfiguration());
+ }
+
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
logEntry.getIndex(), logEntry.size());
}
+ if(isServerConfigurationPayload(logEntry)){
+ context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
+ }
replicatedLog().append(logEntry);
}
batchRecoveredLogEntry(logEntry);
} else {
// Shouldn't happen but cover it anyway.
- log.error("Log entry not found for index {}", i);
+ log.error("{}: Log entry not found for index {}", context.getId(), i);
break;
}
}
initRecoveryTimer();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- cohort.startLogRecoveryBatch(batchSize);
- }
+ if(!isServerConfigurationPayload(logEntry)){
+ if(currentRecoveryBatchCount == 0) {
+ cohort.startLogRecoveryBatch(batchSize);
+ }
- cohort.appendRecoveredLogEntry(logEntry.getData());
+ cohort.appendRecoveredLogEntry(logEntry.getData());
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
}
}
"journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
replicatedLog().getSnapshotTerm(), replicatedLog().size());
}
+
+ private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+ return (repLogEntry.getData() instanceof ServerConfigurationPayload);
+ }
}