import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
/**
* A sample actor showing how the RaftActor is to be extended
*/
-public class ExampleActor extends RaftActor {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
private final Map<String, String> state = new HashMap();
}
@Override
- protected void startLogRecoveryBatch(int maxBatchSize) {
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return this;
}
@Override
- protected void appendRecoveredLogEntry(Payload data) {
+ public void startLogRecoveryBatch(int maxBatchSize) {
}
@Override
- protected void applyCurrentLogRecoveryBatch() {
+ public void appendRecoveredLogEntry(Payload data) {
}
@Override
- protected void onRecoveryComplete() {
+ public void applyCurrentLogRecoveryBatch() {
}
@Override
- protected void applyRecoverySnapshot(byte[] snapshot) {
+ public void onRecoveryComplete() {
+ }
+
+ @Override
+ public void applyRecoverySnapshot(byte[] snapshot) {
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
- private Stopwatch recoveryTimer;
-
- private int currentRecoveryBatchCount;
+ private RaftActorRecoverySupport raftRecovery;
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
}
- private void initRecoveryTimer() {
- if(recoveryTimer == null) {
- recoveryTimer = Stopwatch.createStarted();
- }
- }
-
@Override
public void preStart() throws Exception {
LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@Override
public void handleRecover(Object message) {
- if(persistence().isRecoveryApplicable()) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyLogEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
- }
- } else {
- if (message instanceof RecoveryCompleted) {
+ if(raftRecovery == null) {
+ raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+ getRaftActorRecoveryCohort());
+ }
+
+ boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+ if(recoveryComplete) {
+ if(!persistence().isRecoveryApplicable()) {
// Delete all the messages from the akka journal so that we do not end up with consistency issues
// Note I am not using the dataPersistenceProvider and directly using the akka api here
deleteMessages(lastSequenceNr());
// Delete all the akka snapshots as they will not be needed
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
- onRecoveryComplete();
-
- initializeBehavior();
}
- }
- }
-
- private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: SnapshotOffer called..", persistenceId());
- }
- initRecoveryTimer();
+ onRecoveryComplete();
- Snapshot snapshot = (Snapshot) offer.snapshot();
+ initializeBehavior();
- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
-
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
-
- Stopwatch timer = Stopwatch.createStarted();
-
- // Apply the snapshot to the actors state
- applyRecoverySnapshot(snapshot.getState());
-
- timer.stop();
- LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog().size(), persistenceId(), timer.toString(),
- replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
- }
-
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+ raftRecovery = null;
}
-
- replicatedLog().append(logEntry);
- }
-
- private void onRecoveredApplyLogEntries(long toIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, toIndex);
- }
-
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
- }
-
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
- }
-
- private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
-
- int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- startLogRecoveryBatch(batchSize);
- }
-
- appendRecoveredLogEntry(logEntry.getData());
-
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();
- }
- }
-
- private void endCurrentLogRecoveryBatch() {
- applyCurrentLogRecoveryBatch();
- currentRecoveryBatchCount = 0;
- }
-
- private void onRecoveryCompletedMessage() {
- if(currentRecoveryBatchCount > 0) {
- endCurrentLogRecoveryBatch();
- }
-
- onRecoveryComplete();
-
- String recoveryTime = "";
- if(recoveryTimer != null) {
- recoveryTimer.stop();
- recoveryTime = " in " + recoveryTimer.toString();
- recoveryTimer = null;
- }
-
- LOG.info(
- "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
- initializeBehavior();
}
protected void initializeBehavior(){
Object data);
/**
- * This method is called during recovery at the start of a batch of state entries. Derived
- * classes should perform any initialization needed to start a batch.
- */
- protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
- /**
- * This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startLogRecoveryBatch}.
- *
- * @param data the state data
- */
- protected abstract void appendRecoveredLogEntry(Payload data);
-
- /**
- * This method is called during recovery to reconstruct the state of the actor.
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
- /**
- * This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+ * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- protected abstract void applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Interface for a class that participates in raft actor persistence recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorRecoveryCohort {
+
+ /**
+ * This method is called during recovery at the start of a batch of state entries. Derived
+ * classes should perform any initialization needed to start a batch.
+ */
+ void startLogRecoveryBatch(int maxBatchSize);
+
+ /**
+ * This method is called during recovery to append state data to the current batch. This method
+ * is called 1 or more times after {@link #startLogRecoveryBatch}.
+ *
+ * @param data the state data
+ */
+ void appendRecoveredLogEntry(Payload data);
+
+ /**
+ * This method is called during recovery to reconstruct the state of the actor.
+ *
+ * @param snapshotBytes A snapshot of the state of the actor
+ */
+ void applyRecoverySnapshot(byte[] snapshotBytes);
+
+ /**
+ * This method is called during recovery at the end of a batch to apply the current batched
+ * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+ */
+ void applyCurrentLogRecoveryBatch();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Support class that handles persistence recovery for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorRecoverySupport {
+ private final DataPersistenceProvider persistence;
+ private final RaftActorContext context;
+ private final RaftActorBehavior currentBehavior;
+ private final RaftActorRecoveryCohort cohort;
+
+ private int currentRecoveryBatchCount;
+
+ private Stopwatch recoveryTimer;
+ private final Logger log;
+
+ RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
+ RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
+ this.persistence = persistence;
+ this.context = context;
+ this.currentBehavior = currentBehavior;
+ this.cohort = cohort;
+ this.log = context.getLogger();
+ }
+
+ boolean handleRecoveryMessage(Object message) {
+ boolean recoveryComplete = false;
+ if(persistence.isRecoveryApplicable()) {
+ if (message instanceof SnapshotOffer) {
+ onRecoveredSnapshot((SnapshotOffer) message);
+ } else if (message instanceof ReplicatedLogEntry) {
+ onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+ } else if (message instanceof ApplyLogEntries) {
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+ } else if (message instanceof ApplyJournalEntries) {
+ onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if (message instanceof RecoveryCompleted) {
+ onRecoveryCompletedMessage();
+ recoveryComplete = true;
+ }
+ } else if (message instanceof RecoveryCompleted) {
+ recoveryComplete = true;
+ }
+
+ return recoveryComplete;
+ }
+
+ private ReplicatedLog replicatedLog() {
+ return context.getReplicatedLog();
+ }
+
+ private void initRecoveryTimer() {
+ if(recoveryTimer == null) {
+ recoveryTimer = Stopwatch.createStarted();
+ }
+ }
+
+ private void onRecoveredSnapshot(SnapshotOffer offer) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: SnapshotOffer called..", context.getId());
+ }
+
+ initRecoveryTimer();
+
+ Snapshot snapshot = (Snapshot) offer.snapshot();
+
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+ Stopwatch timer = Stopwatch.createStarted();
+
+ // Apply the snapshot to the actors state
+ cohort.applyRecoverySnapshot(snapshot.getState());
+
+ timer.stop();
+ log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+ replicatedLog().size(), context.getId(), timer.toString(),
+ replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
+ }
+
+ private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex());
+ }
+
+ replicatedLog().append(logEntry);
+ }
+
+ private void onRecoveredApplyLogEntries(long toIndex) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ context.getId(), context.getLastApplied() + 1, toIndex);
+ }
+
+ for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
+ batchRecoveredLogEntry(replicatedLog().get(i));
+ }
+
+ context.setLastApplied(toIndex);
+ context.setCommitIndex(toIndex);
+ }
+
+ private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ initRecoveryTimer();
+
+ int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+ if(currentRecoveryBatchCount == 0) {
+ cohort.startLogRecoveryBatch(batchSize);
+ }
+
+ cohort.appendRecoveredLogEntry(logEntry.getData());
+
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
+ }
+
+ private void endCurrentLogRecoveryBatch() {
+ cohort.applyCurrentLogRecoveryBatch();
+ currentRecoveryBatchCount = 0;
+ }
+
+ private void onRecoveryCompletedMessage() {
+ if(currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+
+ String recoveryTime = "";
+ if(recoveryTimer != null) {
+ recoveryTimer.stop();
+ recoveryTime = " in " + recoveryTimer.toString();
+ recoveryTimer = null;
+ }
+
+ log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+ "Persistence Id = " + context.getId() +
+ " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
+ "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ }
+}
}
@Override
- protected void applyRecoverySnapshot(byte[] bytes) {
+ public void applyRecoverySnapshot(byte[] bytes) {
}
void setSnapshot(byte[] snapshot) {
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
InMemorySnapshotStore.clear();
}
- public static class MockRaftActor extends RaftActor {
+ public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
- private final RaftActor delegate;
+ private final RaftActor actorDelegate;
+ private final RaftActorRecoveryCohort cohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
DataPersistenceProvider dataPersistenceProvider) {
super(id, peerAddresses, config);
state = new ArrayList<>();
- this.delegate = mock(RaftActor.class);
+ this.actorDelegate = mock(RaftActor.class);
+ this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
if(dataPersistenceProvider == null){
setPersistence(true);
} else {
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
- delegate.applyState(clientActor, identifier, data);
+ actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called", persistenceId());
}
@Override
- protected void startLogRecoveryBatch(int maxBatchSize) {
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return this;
}
@Override
- protected void appendRecoveredLogEntry(Payload data) {
+ public void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ public void appendRecoveredLogEntry(Payload data) {
state.add(data);
}
@Override
- protected void applyCurrentLogRecoveryBatch() {
+ public void applyCurrentLogRecoveryBatch() {
}
@Override
protected void onRecoveryComplete() {
- delegate.onRecoveryComplete();
+ actorDelegate.onRecoveryComplete();
recoveryComplete.countDown();
}
}
@Override
- protected void applyRecoverySnapshot(byte[] bytes) {
- delegate.applyRecoverySnapshot(bytes);
+ public void applyRecoverySnapshot(byte[] bytes) {
+ cohortDelegate.applyRecoverySnapshot(bytes);
try {
Object data = toObject(bytes);
if (data instanceof List) {
@Override protected void createSnapshot() {
LOG.info("{}: createSnapshot called", persistenceId());
- delegate.createSnapshot();
+ actorDelegate.createSnapshot();
}
@Override protected void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
- delegate.applySnapshot(snapshot);
+ actorDelegate.applySnapshot(snapshot);
}
@Override protected void onStateChanged() {
- delegate.onStateChanged();
+ actorDelegate.onStateChanged();
}
@Override
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+ verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
- verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+ verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
- verify(mockRaftActor.delegate).createSnapshot();
+ verify(mockRaftActor.actorDelegate).createSnapshot();
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
- verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+ verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
}
};
mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
- verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
+ verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
assertTrue("The replicatedLog should have changed",
oldReplicatedLog != mockRaftActor.getReplicatedLog());
.capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
new MockRaftActorContext.MockPayload("x")), 4);
- verify(leaderActor.delegate).createSnapshot();
+ verify(leaderActor.actorDelegate).createSnapshot();
assertEquals(8, leaderActor.getReplicatedLog().size());
new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("D")), 4);
- verify(followerActor.delegate).createSnapshot();
+ verify(followerActor.actorDelegate).createSnapshot();
assertEquals(6, followerActor.getReplicatedLog().size());
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
Serialization.serializedActorPath(getSelf()));
-
- /**
- * Coordinates persistence recovery on startup.
- */
- private ShardRecoveryCoordinator recoveryCoordinator;
-
private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
-
- recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
}
@Override
- protected
- void startLogRecoveryBatch(final int maxBatchSize) {
- recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
- }
-
- @Override
- protected void appendRecoveredLogEntry(final Payload data) {
- recoveryCoordinator.appendRecoveredLogPayload(data);
- }
-
- @Override
- protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
- }
-
- @Override
- protected void applyCurrentLogRecoveryBatch() {
- recoveryCoordinator.applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
- recoveryCoordinator = null;
-
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
*
* @author Thomas Panetelis
*/
-class ShardRecoveryCoordinator {
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
private final InMemoryDOMDataStore store;
private List<ModificationPayload> currentLogRecoveryBatch;
this.log = log;
}
- void startLogRecoveryBatch(int maxBatchSize) {
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
}
- void appendRecoveredLogPayload(Payload payload) {
+ @Override
+ public void appendRecoveredLogEntry(Payload payload) {
try {
if(payload instanceof ModificationPayload) {
currentLogRecoveryBatch.add((ModificationPayload) payload);
/**
* Applies the current batched log entries to the data store.
*/
- void applyCurrentLogRecoveryBatch() {
+ @Override
+ public void applyCurrentLogRecoveryBatch() {
log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
*
* @param snapshotBytes the serialized snapshot
*/
- void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+ @Override
+ public void applyRecoverySnapshot(final byte[] snapshotBytes) {
log.debug("{}: Applyng recovered sbapshot", shardName);
DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();