applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
+ if (!hasFollowers()) {
+ // for single node, the capture should happen after the apply state
+ // as we delete messages from the persistent journal which have made it to the snapshot
+ // capturing the snapshot before applying makes the persistent journal and snapshot out of sync
+ // and recovery shows data missing
+ context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
+
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+ }
+
} else if (message instanceof ApplyJournalEntries){
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
return new LeaderStateChanged(memberId, leaderId);
}
+ @Override
+ public long snapshotSequenceNr() {
+ // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
+ // so that we can delete the persistent journal based on the saved sequence-number
+ // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
+ // was saved and not the number we saved.
+ // We would want to override it , by asking akka to use the last-sequence number known to us.
+ return context.getSnapshotManager().getLastSequenceNumber();
+ }
+
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
- if(!hasFollowers()){
+ if (!hasFollowers()){
// Increment the Commit Index and the Last Applied values
raftContext.setCommitIndex(replicatedLogEntry.getIndex());
raftContext.setLastApplied(replicatedLogEntry.getIndex());
- // Apply the state immediately
+ // Apply the state immediately.
self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
-
} else if (clientActor != null) {
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+
// Send message for replication
currentBehavior.handleMessage(getSelf(),
new Replicate(clientActor, identifier, replicatedLogEntry));
long lastUnappliedIndex = context.getLastApplied() + 1;
if(log.isDebugEnabled()) {
+ // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
+ // but the entry itself has made it to that state and recovered via the snapshot
log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
context.getId(), lastUnappliedIndex, toIndex);
}
*/
int dataSize();
+ /**
+ * We decide if snapshot need to be captured based on the count/memory consumed.
+ * @param replicatedLogEntry
+ */
+ void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
+
}
appendAndPersist(replicatedLogEntry, null);
}
+ @Override
+ public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+ long journalSize = replicatedLogEntry.getIndex() + 1;
+ long dataThreshold = context.getTotalMemory() *
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || getDataSizeForSnapshotCheck() > dataThreshold)) {
+
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
+ if (started) {
+ if (!context.hasFollowers()) {
+ dataSizeSinceLastSnapshot = 0;
+ }
+ }
+ }
+ }
+
+ private long getDataSizeForSnapshotCheck() {
+ long dataSizeForCheck = dataSize();
+ if (!context.hasFollowers()) {
+ // When we do not have followers we do not maintain an in-memory log
+ // due to this the journalSize will never become anything close to the
+ // snapshot batch count. In fact will mostly be 1.
+ // Similarly since the journal's dataSize depends on the entries in the
+ // journal the journal's dataSize will never reach a value close to the
+ // memory threshold.
+ // By maintaining the dataSize outside the journal we are tracking essentially
+ // what we have written to the disk however since we no longer are in
+ // need of doing a snapshot just for the sake of freeing up memory we adjust
+ // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+ // as if we were maintaining a real snapshot
+ dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+ }
+ return dataSizeForCheck;
+ }
+
@Override
public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
final Procedure<ReplicatedLogEntry> callback) {
context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry);
int logEntrySize = replicatedLogEntry.size();
-
- long dataSizeForCheck = dataSize();
-
dataSizeSinceLastSnapshot += logEntrySize;
- if (!context.hasFollowers()) {
- // When we do not have followers we do not maintain an in-memory log
- // due to this the journalSize will never become anything close to the
- // snapshot batch count. In fact will mostly be 1.
- // Similarly since the journal's dataSize depends on the entries in the
- // journal the journal's dataSize will never reach a value close to the
- // memory threshold.
- // By maintaining the dataSize outside the journal we are tracking essentially
- // what we have written to the disk however since we no longer are in
- // need of doing a snapshot just for the sake of freeing up memory we adjust
- // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
- // as if we were maintaining a real snapshot
- dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
- }
- long journalSize = replicatedLogEntry.getIndex() + 1;
- long dataThreshold = context.getTotalMemory() *
- context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
- || dataSizeForCheck > dataThreshold)) {
-
- boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
- currentBehavior.getReplicatedToAllIndex());
-
- if(started){
- dataSizeSinceLastSnapshot = 0;
- }
- }
-
if (callback != null){
callback.apply(replicatedLogEntry);
}
this.createSnapshotProcedure = createSnapshotProcedure;
}
+ public long getLastSequenceNumber() {
+ return lastSequenceNumber;
+ }
+
@VisibleForTesting
public CaptureSnapshot getCaptureSnapshot() {
return captureSnapshot;
@Override
public void commit(long sequenceNumber) {
+ LOG.debug("Snapshot success sequence number:", sequenceNumber);
context.getReplicatedLog().snapshotCommit();
context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
this.term = -1L;
if (!hasFollowers) {
if(lastLogEntry != null) {
+ // since we have persisted the last-log-entry to persistent journal before the capture,
+ // we would want to snapshot from this entry.
index = lastLogEntry.getIndex();
term = lastLogEntry.getTerm();
}
@Override
public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
}
+
+ @Override
+ public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+ }
+
+
}
}
}
public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
- @Override public void appendAndPersist(
+ @Override
+ public void appendAndPersist(
ReplicatedLogEntry replicatedLogEntry) {
append(replicatedLogEntry);
}
return -1;
}
+ @Override
+ public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+ }
+
@Override public void removeFromAndPersist(long index) {
removeFrom(index);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recovery Integration Test for single node
+ */
+public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrationTest {
+
+ static final Logger LOG = LoggerFactory.getLogger(RecoveryIntegrationSingleNodeTest.class);
+
+ @Before
+ public void setup() {
+ leaderConfigParams = newLeaderConfigParams();
+ }
+
+
+ @Test
+ public void testJournalReplayAfterSnapshotWithSingleNode() throws Exception {
+
+ String persistenceId = factory.generateActorId("singleNode");
+ TestActorRef<AbstractRaftActorIntegrationTest.TestRaftActor> singleNodeActorRef = newTestRaftActor(persistenceId,
+ ImmutableMap.<String, String>builder().build(), leaderConfigParams);
+
+ waitUntilLeader(singleNodeActorRef);
+
+ ActorRef singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
+ RaftActorContext singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext();
+
+
+ MockRaftActorContext.MockPayload payload0 = sendPayloadData(singleNodeActorRef, "zero");
+ MockRaftActorContext.MockPayload payload1 = sendPayloadData(singleNodeActorRef, "one");
+ MockRaftActorContext.MockPayload payload2 = sendPayloadData(singleNodeActorRef, "two");
+
+ MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 3);
+
+ // this should trigger a snapshot
+ MockRaftActorContext.MockPayload payload3 = sendPayloadData(singleNodeActorRef, "three");
+
+ MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 4);
+
+ //add 2 more
+ MockRaftActorContext.MockPayload payload4 = sendPayloadData(singleNodeActorRef, "four");
+ MockRaftActorContext.MockPayload payload5 = sendPayloadData(singleNodeActorRef, "five");
+
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(singleNodeCollectorActor, SaveSnapshotSuccess.class);
+
+ MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 6);
+
+ assertEquals("Last applied", 5, singleNodeContext.getLastApplied());
+
+ assertEquals("Incorrect State after snapshot success is received ",
+ Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
+
+ // we get 2 log entries (4 and 5 indexes) and 3 ApplyJournalEntries (for 3, 4, and 5 indexes)
+ assertEquals(5, InMemoryJournal.get(persistenceId).size());
+
+ List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+ assertEquals(1, persistedSnapshots.size());
+
+ List<Object> snapshottedState = (List<Object>)MockRaftActor.toObject(persistedSnapshots.get(0).getState());
+ assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3), snapshottedState);
+
+ //recovery logic starts
+ killActor(singleNodeActorRef);
+
+ singleNodeActorRef = newTestRaftActor(persistenceId,
+ ImmutableMap.<String, String>builder().build(), leaderConfigParams);
+
+ singleNodeActorRef.underlyingActor().waitForRecoveryComplete();
+
+ assertEquals("Incorrect State after Recovery ",
+ Lists.newArrayList(payload0, payload1, payload2, payload3, payload4, payload5), singleNodeActorRef.underlyingActor().getState());
+
+ }
+}
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.reset;
@Mock
private RaftActorBehavior mockBehavior;
- @Mock
- private SnapshotManager mockSnapshotManager;
-
private RaftActorContext context;
private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
context = new RaftActorContextImpl(null, null, "test",
new ElectionTermImpl(mockPersistence, "test", LOG),
- -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG) {
- @Override
- public SnapshotManager getSnapshotManager() {
- return mockSnapshotManager;
- }
- };
+ -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
}
private void verifyPersist(Object message) throws Exception {
verifyPersist(logEntry);
verify(mockCallback).apply(same(logEntry));
- verifyNoMoreInteractions(mockSnapshotManager);
assertEquals("size", 2, log.size());
}
log.appendAndPersist(logEntry1);
verifyPersist(logEntry1);
- verifyNoMoreInteractions(mockSnapshotManager);
reset(mockPersistence);
log.appendAndPersist(logEntry2);
verifyPersist(logEntry2);
- verify(mockSnapshotManager).capture(same(logEntry2), eq(1L));
assertEquals("size", 2, log.size());
}
int dataSize = 600;
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
- doReturn(true).when(mockSnapshotManager).capture(same(logEntry), eq(1L));
-
log.appendAndPersist(logEntry);
verifyPersist(logEntry);
- verify(mockSnapshotManager).capture(same(logEntry), eq(1L));
-
- reset(mockPersistence, mockSnapshotManager);
+ reset(mockPersistence);
logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
log.appendAndPersist(logEntry);
verifyPersist(logEntry);
- verifyNoMoreInteractions(mockSnapshotManager);
-
assertEquals("size", 2, log.size());
}
}
@Override
- public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
- long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+ public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
+ final long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
return Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
}
Map<Long, Object> journal = journals.get(persistenceId);
- if(journal == null) {
+ if (journal == null) {
return null;
}
synchronized (journal) {
for (Map.Entry<Long,Object> entry : journal.entrySet()) {
- PersistentRepr persistentMessage =
- new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
- false, null, null);
- replayCallback.apply(persistentMessage);
+ if (entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+ false, null, null);
+ replayCallback.apply(persistentMessage);
+ }
}
}
public class ShardTest extends AbstractShardTest {
private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
+ private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
+
+ final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+ protected Props newShardPropsWithRecoveryComplete() {
+
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT) {
+ @Override
+ protected void onRecoveryComplete() {
+ try {
+ super.onRecoveryComplete();
+ } finally {
+ recoveryComplete.countDown();
+ }
+ }
+ };
+ }
+ };
+ return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
+ }
+
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
@Test
public void testApplyStateWithCandidatePayload() throws Exception {
- TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+
+ recoveryComplete.await(5, TimeUnit.SECONDS);
NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
final DataTreeModification writeMod = source.takeSnapshot().newModification();
writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
+
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+ InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
int nListEntries = 16;
Set<Integer> listEntryKeys = new HashSet<>();
final DataTreeModification mod = source.takeSnapshot().newModification();
mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
payloadForModification(source, mod)));
}
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
+ InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
+
+ InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
new WriteModification(TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
newModificationPayload(mod)));
}
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
new ApplyJournalEntries(nListEntries));
testRecovery(listEntryKeys);
getNormalizedNode().toByteString().toByteArray(),
Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
+ "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
+
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+ InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
new WriteModification(TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
newLegacyPayload(mod)));
}
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
newLegacyByteStringPayload(mod)));
}
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
testRecovery(listEntryKeys);
}