import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
import java.util.Collections;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
+
/**
* Support class that handles persistence recovery for a RaftActor.
*
anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
- if(isMigratedSerializable(message)) {
+ if (isMigratedSerializable(message)) {
hasMigratedDataRecovered = true;
}
return recoveryComplete;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void possiblyRestoreFromSnapshot() {
- byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
- if(restoreFromSnapshot == null) {
+ Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ if (restoreFromSnapshot == null) {
return;
}
- if(anyDataRecovered) {
+ if (anyDataRecovered) {
log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
context.getId());
return;
}
- try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
- Snapshot snapshot = (Snapshot) ois.readObject();
-
- log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
+ log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
- context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
- } catch(Exception e) {
- log.error("{}: Error deserializing snapshot restore", context.getId(), e);
- }
+ context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
}
private ReplicatedLog replicatedLog() {
}
private void initRecoveryTimer() {
- if(recoveryTimer == null) {
+ if (recoveryTimer == null) {
recoveryTimer = Stopwatch.createStarted();
}
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(log.isDebugEnabled()) {
- log.debug("{}: SnapshotOffer called..", context.getId());
- }
+ log.debug("{}: SnapshotOffer called.", context.getId());
initRecoveryTimer();
Snapshot snapshot = (Snapshot) offer.snapshot();
- for(ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
- if(isMigratedPayload(entry)) {
+ for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
+ if (isMigratedPayload(entry)) {
hasMigratedDataRecovered = true;
}
}
- if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (!context.getPersistenceProvider().isRecoveryApplicable()) {
// We may have just transitioned to disabled and have a snapshot containing state data and/or log
// entries - we don't want to preserve these, only the server config and election term info.
- snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+ snapshot = Snapshot.create(
+ EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
}
Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
- cohort.applyRecoverySnapshot(snapshot.getState());
+ if (!(snapshot.getState() instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshot.getState());
+ }
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
-
- if(isMigratedSerializable(snapshot.getServerConfiguration())) {
- hasMigratedDataRecovered = true;
- }
}
timer.stop();
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
logEntry.getIndex(), logEntry.size());
}
- if(isServerConfigurationPayload(logEntry)){
+ if (isServerConfigurationPayload(logEntry)) {
context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
}
- if(isMigratedPayload(logEntry)) {
+ if (isMigratedPayload(logEntry)) {
hasMigratedDataRecovered = true;
}
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (context.getPersistenceProvider().isRecoveryApplicable()) {
replicatedLog().append(logEntry);
- } else if(!isPersistentPayload(logEntry)) {
+ } else if (!isPersistentPayload(logEntry)) {
dataRecoveredWithPersistenceDisabled = true;
}
}
private void onRecoveredApplyLogEntries(long toIndex) {
- if(!context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (!context.getPersistenceProvider().isRecoveryApplicable()) {
dataRecoveredWithPersistenceDisabled = true;
return;
}
long lastUnappliedIndex = context.getLastApplied() + 1;
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
// it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
// but the entry itself has made it to that state and recovered via the snapshot
log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
long lastApplied = lastUnappliedIndex - 1;
for (long i = lastUnappliedIndex; i <= toIndex; i++) {
ReplicatedLogEntry logEntry = replicatedLog().get(i);
- if(logEntry != null) {
+ if (logEntry != null) {
lastApplied++;
batchRecoveredLogEntry(logEntry);
} else {
}
private void onDeleteEntries(DeleteEntries deleteEntries) {
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (context.getPersistenceProvider().isRecoveryApplicable()) {
replicatedLog().removeFrom(deleteEntries.getFromIndex());
} else {
dataRecoveredWithPersistenceDisabled = true;
initRecoveryTimer();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(!isServerConfigurationPayload(logEntry)){
- if(currentRecoveryBatchCount == 0) {
+ if (!isServerConfigurationPayload(logEntry)) {
+ if (currentRecoveryBatchCount == 0) {
cohort.startLogRecoveryBatch(batchSize);
}
cohort.appendRecoveredLogEntry(logEntry.getData());
- if(++currentRecoveryBatchCount >= batchSize) {
+ if (++currentRecoveryBatchCount >= batchSize) {
endCurrentLogRecoveryBatch();
}
}
}
private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
- if(currentRecoveryBatchCount > 0) {
+ if (currentRecoveryBatchCount > 0) {
endCurrentLogRecoveryBatch();
}
String recoveryTime = "";
- if(recoveryTimer != null) {
+ if (recoveryTimer != null) {
recoveryTimer.stop();
recoveryTime = " in " + recoveryTimer.toString();
recoveryTimer = null;
}
- log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
- "Persistence Id = " + context.getId() +
- " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
- "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+ log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id = "
+ + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
+ + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
replicatedLog().getSnapshotTerm(), replicatedLog().size());
- if(dataRecoveredWithPersistenceDisabled ||
- hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
- if(hasMigratedDataRecovered) {
+ if (dataRecoveredWithPersistenceDisabled
+ || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
+ if (hasMigratedDataRecovered) {
log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
} else {
log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
// messages. Either way, we persist a snapshot and delete all the messages from the akka journal
// to clean out unwanted messages.
- Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+ Snapshot snapshot = Snapshot.create(
+ EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
+ -1, -1, -1, -1,
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));
persistentProvider.saveSnapshot(snapshot);
persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
- } else if(hasMigratedDataRecovered) {
+ } else if (hasMigratedDataRecovered) {
log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
context.getSnapshotManager().capture(replicatedLog().last(), -1);
}
}
- private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
return repLogEntry.getData() instanceof ServerConfigurationPayload;
}
- private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
return repLogEntry.getData() instanceof PersistentPayload;
}
- private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
return isMigratedSerializable(repLogEntry.getData());
}
- private static boolean isMigratedSerializable(Object message){
+ private static boolean isMigratedSerializable(Object message) {
return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
}
}