import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
this.log = context.getLogger();
}
- boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+ boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
}
}
- private void onRecoveredSnapshot(SnapshotOffer offer) {
+ private void onRecoveredSnapshot(final SnapshotOffer offer) {
log.debug("{}: SnapshotOffer called.", context.getId());
initRecoveryTimer();
- Object snapshotObj = offer.snapshot();
- Snapshot snapshot;
- if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
- org.opendaylight.controller.cluster.raft.Snapshot legacy =
- (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
- snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
- legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
- legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
- legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
- hasMigratedDataRecovered = true;
- } else {
- snapshot = (Snapshot) offer.snapshot();
- }
+ Snapshot snapshot = (Snapshot) offer.snapshot();
for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
if (isMigratedPayload(entry)) {
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
- Stopwatch timer = Stopwatch.createStarted();
+ final Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
- if (!(snapshot.getState() instanceof EmptyState)) {
- cohort.applyRecoverySnapshot(snapshot.getState());
+ final State snapshotState = snapshot.getState();
+ if (snapshotState.needsMigration()) {
+ hasMigratedDataRecovered = true;
+ }
+ if (!(snapshotState instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshotState);
}
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
-
- if (isMigratedSerializable(snapshot.getServerConfiguration())) {
- hasMigratedDataRecovered = true;
- }
}
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
- context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+ replicatedLog().size());
}
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
if (log.isDebugEnabled()) {
log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
logEntry.getIndex(), logEntry.size());
}
}
- private void onRecoveredApplyLogEntries(long toIndex) {
+ private void onRecoveredApplyLogEntries(final long toIndex) {
if (!context.getPersistenceProvider().isRecoveryApplicable()) {
dataRecoveredWithPersistenceDisabled = true;
return;
context.setCommitIndex(lastApplied);
}
- private void onDeleteEntries(DeleteEntries deleteEntries) {
+ private void onDeleteEntries(final DeleteEntries deleteEntries) {
if (context.getPersistenceProvider().isRecoveryApplicable()) {
replicatedLog().removeFrom(deleteEntries.getFromIndex());
} else {
}
}
- private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
initRecoveryTimer();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
currentRecoveryBatchCount = 0;
}
- private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
+ private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
if (currentRecoveryBatchCount > 0) {
endCurrentLogRecoveryBatch();
}
recoveryTimer = null;
}
- log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id = "
- + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
- + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+ + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
+ replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
if (dataRecoveredWithPersistenceDisabled
|| hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
}
}
- private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
+ private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
return repLogEntry.getData() instanceof ServerConfigurationPayload;
}
- private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
+ private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
return repLogEntry.getData() instanceof PersistentPayload;
}
- private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
+ private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
return isMigratedSerializable(repLogEntry.getData());
}
- private static boolean isMigratedSerializable(Object message) {
+ private static boolean isMigratedSerializable(final Object message) {
return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
}
}