From: Moiz Raja Date: Sun, 22 Feb 2015 18:17:44 +0000 (-0800) Subject: BUG 2741 : Fix issue with real snapshotting when replicatedToAllIndex=-1 X-Git-Tag: release/lithium~497^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=6a91d8954fe481a97801107d9a0c8dca8581481e BUG 2741 : Fix issue with real snapshotting when replicatedToAllIndex=-1 Change-Id: I59b236fd88a329021365f204077170ca260e0b32 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index e2aa16918e..1aecc89eea 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -187,9 +188,14 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) { + Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex, + "snapshotCapturedIndex must be greater than or equal to snapshotIndex"); + snapshottedJournal = new ArrayList<>(journal.size()); - snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex))); + List snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + snapshottedJournal.addAll(snapshotJournalEntries); clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); previousSnapshotIndex = snapshotIndex; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 3ec8cc5c58..285be39c0b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -677,12 +677,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - } else { + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ // clear the log based on replicatedToAllIndex context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), captureSnapshot.getReplicatedToAllTerm()); + + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else { + // The replicatedToAllIndex was not found in the log + // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. + // In this scenario we may need to save the snapshot to the akka persistence + // snapshot for recovery but we do not need to do the replicated log trimming. + context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(), + replicatedLog.getSnapshotTerm()); } - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 24581d6d2a..297d781251 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -134,6 +134,16 @@ public class MockRaftActorContext implements RaftActorContext { } @Override + // FIXME : A lot of tests try to manipulate the replicated log by setting it using this method + // This is OK to do if the underlyingActor is not RafActor or a derived class. If not then you should not + // used this way to manipulate the log because the RaftActor actually has a field replicatedLog + // which it creates internally and sets on the RaftActorContext + // The only right way to manipulate the replicated log therefore is to get it from either the RaftActor + // or the RaftActorContext and modify the entries in there instead of trying to replace it by using this setter + // Simple assertion that will fail if you do so + // ReplicatedLog log = new ReplicatedLogImpl(); + // raftActor.underlyingActor().getRaftActorContext().setReplicatedLog(log); + // assertEquals(log, raftActor.underlyingActor().getReplicatedLog()) public void setReplicatedLog(ReplicatedLog replicatedLog) { this.replicatedLog = replicatedLog; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 83868b6a2a..56bfc21f23 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1227,6 +1227,128 @@ public class RaftActorTest extends AbstractActorTest { }; } + + private static class NonPersistentProvider implements DataPersistenceProvider { + @Override + public boolean isRecoveryApplicable() { + return false; + } + + @Override + public void persist(T o, Procedure procedure) { + try { + procedure.apply(o); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void saveSnapshot(Object o) { + + } + + @Override + public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + + } + + @Override + public void deleteMessages(long sequenceNumber) { + + } + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + for(int i=0;i< 4;i++) { + leaderActor.getReplicatedLog() + .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + new MockRaftActorContext.MockPayload("A"))); + } + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(-1, leader.getReplicatedToAllIndex()); + + }}; + } + + @Test + public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception { + new JavaTestKit(getSystem()) {{ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider(); + + Map peerAddresses = new HashMap<>(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(3); + leaderActor.getRaftActorContext().setLastApplied(3); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + leaderActor.getReplicatedLog().setSnapshotIndex(3); + + leaderActor.waitForInitializeBehaviorComplete(); + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + leader.setReplicatedToAllIndex(3); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + // Persist another entry (this will cause a CaptureSnapshot to be triggered + leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + + // Now send a CaptureSnapshotReply + mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); + + // Trimming log in this scenario is a no-op + assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + assertEquals(3, leader.getReplicatedToAllIndex()); + + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null;