From fc54ab8853d36fb1d7aebf2a09ef10567e66aa0d Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Thu, 29 Jan 2015 11:44:30 -0800 Subject: [PATCH] Bug-2590: Clustering : Minimize usage of in-memory journal In order to minimize the memory usage of the in-memory journal, we can remove the entries from the Leader's journal once it has been successfully replicated to ALL its followers. This does not intefere with snapshots, as we capture snapshots on demand. The followers follow the leader in cleaning the in-memory journal, there by ensuring that all the journals have more or less same entries. This is done by the leader passing its replicatedToAllIndex as part of the AppendEntries. Change-Id: I579a1f90d3c4e5d6be4ce699072688788b07bd48 Signed-off-by: Kamal Rameshan --- .../controller/cluster/raft/RaftActor.java | 12 +- .../cluster/raft/RaftActorContext.java | 11 +- .../cluster/raft/RaftActorContextImpl.java | 13 +- .../raft/behaviors/AbstractLeader.java | 21 +- .../behaviors/AbstractRaftActorBehavior.java | 14 + .../cluster/raft/behaviors/Follower.java | 4 + .../cluster/raft/messages/AppendEntries.java | 13 +- .../raft/AbstractReplicatedLogImplTest.java | 27 ++ .../cluster/raft/MockRaftActorContext.java | 11 + .../cluster/raft/RaftActorTest.java | 244 ++++++++++++++++-- .../AbstractRaftActorBehaviorTest.java | 39 ++- .../cluster/raft/behaviors/CandidateTest.java | 9 +- .../cluster/raft/behaviors/FollowerTest.java | 12 +- .../raft/messages/AppendEntriesTest.java | 12 +- .../common/actor/AbstractUntypedActor.java | 4 +- ...siteModificationByteStringPayloadTest.java | 7 +- .../CompositeModificationPayloadTest.java | 2 +- .../programs/appendentries/Client.java | 4 +- 18 files changed, 401 insertions(+), 58 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index aa7b4533b7..766b80e73d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private CaptureSnapshot captureSnapshot = null; - private volatile boolean hasSnapshotCaptureInitiated = false; - private Stopwatch recoveryTimer; private int currentRecoveryBatchCount; - - public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -436,7 +432,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self()); // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot - if(!hasSnapshotCaptureInitiated){ + if(!context.isSnapshotCaptureInitiated()){ raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(), raftContext.getTermInformation().getCurrentTerm()); raftContext.getReplicatedLog().snapshotCommit(); @@ -693,7 +689,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } captureSnapshot = null; - hasSnapshotCaptureInitiated = false; + context.setSnapshotCaptureInitiated(false); } protected boolean hasFollowers(){ @@ -794,7 +790,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; // when a snaphsot is being taken, captureSnapshot != null - if (hasSnapshotCaptureInitiated == false && + if (!context.isSnapshotCaptureInitiated() && ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 || dataSizeForCheck > dataThreshold)) { @@ -827,7 +823,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { getSelf().tell(new CaptureSnapshot( lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), null); - hasSnapshotCaptureInitiated = true; + context.setSnapshotCaptureInitiated(true); } if(callback != null){ callback.apply(replicatedLogEntry); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 0eb4b73779..0e1f20b246 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -89,7 +89,7 @@ public interface RaftActorContext { * * @param replicatedLog */ - public void setReplicatedLog(ReplicatedLog replicatedLog); + void setReplicatedLog(ReplicatedLog replicatedLog); /** * @return A representation of the log @@ -137,7 +137,7 @@ public interface RaftActorContext { * * @param name */ - public void removePeer(String name); + void removePeer(String name); /** * Given a peerId return the corresponding actor @@ -165,5 +165,10 @@ public interface RaftActorContext { /** * @return ConfigParams */ - public ConfigParams getConfigParams(); + ConfigParams getConfigParams(); + + void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated); + + boolean isSnapshotCaptureInitiated(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index e4aef0a844..5438fe7c48 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -14,7 +14,6 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActorContext; import akka.event.LoggingAdapter; - import java.util.Map; import static com.google.common.base.Preconditions.checkState; @@ -41,6 +40,8 @@ public class RaftActorContextImpl implements RaftActorContext { private final ConfigParams configParams; + private boolean snapshotCaptureInitiated; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, @@ -130,6 +131,16 @@ public class RaftActorContextImpl implements RaftActorContext { return configParams; } + @Override + public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { + this.snapshotCaptureInitiated = snapshotCaptureInitiated; + } + + @Override + public boolean isSnapshotCaptureInitiated() { + return snapshotCaptureInitiated; + } + @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index da1627b98e..e28e4b066d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; + private long replicatedToAllIndex = -1; + public AbstractLeader(RaftActorContext context) { super(context); @@ -226,9 +228,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } + if (!context.isSnapshotCaptureInitiated()) { + purgeInMemoryLog(); + } + return this; } + private void purgeInMemoryLog() { + //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + // we would delete the in-mem log from that index on, in-order to minimize mem usage + // we would also share this info thru AE with the followers so that they can delete their log entries as well. + long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + for (FollowerLogInformation info : followerToLog.values()) { + minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); + } + + replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + } + @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { final Iterator it = trackerList.iterator(); @@ -460,7 +478,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), + context.getCommitIndex(), + replicatedToAllIndex).toSerializable(), actor() ); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index dbeafe9eb8..99824b0bb4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -422,4 +422,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return numMajority; } + + protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) { + + // we would want to keep the lastApplied as its used while capturing snapshots + long tempMin = Math.min(minReplicatedToAllIndex, + (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1)); + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm()); + context.getReplicatedLog().snapshotCommit(); + return tempMin; + } + return currentReplicatedIndex; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 31b5efbe38..410b3c266c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -254,6 +254,10 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm()), actor()); + if (!context.isSnapshotCaptureInitiated()) { + fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex()); + } + return this; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 8198106217..97bcd6a708 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -50,14 +50,18 @@ public class AppendEntries extends AbstractRaftRPC { // leader's commitIndex private final long leaderCommit; + // index which has been replicated successfully to all followers, -1 if none + private final long replicatedToAllIndex; + public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit) { + long prevLogTerm, List entries, long leaderCommit, long replicatedToAllIndex) { super(term); this.leaderId = leaderId; this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; this.entries = entries; this.leaderCommit = leaderCommit; + this.replicatedToAllIndex = replicatedToAllIndex; } private void writeObject(ObjectOutputStream out) throws IOException { @@ -102,6 +106,10 @@ public class AppendEntries extends AbstractRaftRPC { return leaderCommit; } + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + @Override public String toString() { final StringBuilder sb = @@ -112,6 +120,7 @@ public class AppendEntries extends AbstractRaftRPC { sb.append(", prevLogTerm=").append(prevLogTerm); sb.append(", entries=").append(entries); sb.append(", leaderCommit=").append(leaderCommit); + sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex); sb.append('}'); return sb.toString(); } @@ -203,7 +212,7 @@ public class AppendEntries extends AbstractRaftRPC { from.getPrevLogIndex(), from.getPrevLogTerm(), logEntryList, - from.getLeaderCommit()); + from.getLeaderCommit(), -1); return to; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index d53ccf2500..ffd8edfbe1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -128,6 +128,33 @@ public class AbstractReplicatedLogImplTest { } + @Test + public void testSnapshotPreCommit() { + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H"))); + + replicatedLogImpl.snapshotPreCommit(4, 3); + assertEquals(3, replicatedLogImpl.size()); + assertEquals(4, replicatedLogImpl.getSnapshotIndex()); + + replicatedLogImpl.snapshotPreCommit(6, 3); + assertEquals(1, replicatedLogImpl.size()); + assertEquals(6, replicatedLogImpl.getSnapshotIndex()); + + replicatedLogImpl.snapshotPreCommit(7, 3); + assertEquals(0, replicatedLogImpl.size()); + assertEquals(7, replicatedLogImpl.getSnapshotIndex()); + + //running it again on an empty list should not throw exception + replicatedLogImpl.snapshotPreCommit(7, 3); + assertEquals(0, replicatedLogImpl.size()); + assertEquals(7, replicatedLogImpl.getSnapshotIndex()); + + + } + // create a snapshot for test public Map takeSnapshot(final int numEntries) { Map map = new HashMap<>(numEntries); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index cd852eaae2..9d3e5dcb12 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -34,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext { private ReplicatedLog replicatedLog; private Map peerAddresses = new HashMap<>(); private ConfigParams configParams; + private boolean snapshotCaptureInitiated; public MockRaftActorContext(){ electionTerm = null; @@ -185,6 +186,16 @@ public class MockRaftActorContext implements RaftActorContext { return configParams; } + @Override + public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) { + this.snapshotCaptureInitiated = snapshotCaptureInitiated; + } + + @Override + public boolean isSnapshotCaptureInitiated() { + return snapshotCaptureInitiated; + } + public void setConfigParams(ConfigParams configParams) { this.configParams = configParams; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 6b266d710e..30893810f5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,17 +1,5 @@ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; @@ -41,6 +29,7 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -61,6 +50,8 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; @@ -70,6 +61,20 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class RaftActorTest extends AbstractActorTest { @@ -86,6 +91,7 @@ public class RaftActorTest extends AbstractActorTest { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; + private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); public static final class MockRaftActorCreator implements Creator { private static final long serialVersionUID = 1L; @@ -114,7 +120,8 @@ public class RaftActorTest extends AbstractActorTest { } } - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { + public MockRaftActor(String id, Map peerAddresses, Optional config, + DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); this.delegate = mock(RaftActor.class); @@ -133,6 +140,14 @@ public class RaftActorTest extends AbstractActorTest { } } + public void waitForInitializeBehaviorComplete() { + try { + assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public List getState() { return state; } @@ -176,6 +191,12 @@ public class RaftActorTest extends AbstractActorTest { recoveryComplete.countDown(); } + @Override + protected void initializeBehavior() { + super.initializeBehavior(); + initializeBehaviorComplete.countDown(); + } + @Override protected void applyRecoverySnapshot(byte[] bytes) { delegate.applyRecoverySnapshot(bytes); @@ -339,10 +360,10 @@ public class RaftActorTest extends AbstractActorTest { // 4 messages as part of snapshot, which are applied to state ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , @@ -909,6 +930,195 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "leader1"; + + ActorRef followerActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + leaderActor.getRaftActorContext().setCommitIndex(4); + leaderActor.getRaftActorContext().setLastApplied(4); + leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + leaderActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1)); + leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(leaderActor.delegate).createSnapshot(); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //fake snapshot on index 5 + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1)); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1)); + assertEquals(8, leaderActor.getReplicatedLog().size()); + + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + assertEquals(8, leaderActor.getReplicatedLog().size()); + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, leaderActor.getReplicatedLog().size()); + assertEquals(7, leaderActor.getReplicatedLog().lastIndex()); + + // add another non-replicated entry + leaderActor.getReplicatedLog().append( + new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); + + //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied + leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1)); + assertEquals(2, leaderActor.getReplicatedLog().size()); + assertEquals(8, leaderActor.getReplicatedLog().lastIndex()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + + @Test + public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception { + new JavaTestKit(getSystem()) { + { + String persistenceId = "follower1"; + + ActorRef leaderActor1 = + getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("leader", leaderActor1.path().toString()); + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor followerActor = mockActorRef.underlyingActor(); + followerActor.getRaftActorContext().setCommitIndex(4); + followerActor.getRaftActorContext().setLastApplied(4); + followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId); + + followerActor.waitForInitializeBehaviorComplete(); + + // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot + Follower follower = new Follower(followerActor.getRaftActorContext()); + followerActor.setCurrentBehavior(follower); + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder(); + followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build()); + + // log as indices 0-5 + assertEquals(6, followerActor.getReplicatedLog().size()); + + //snapshot on 4 + followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1)); + followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true); + verify(followerActor.delegate).createSnapshot(); + + assertEquals(6, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 6 + List entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, + new MockRaftActorContext.MockPayload("foo-6")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5)); + assertEquals(7, followerActor.getReplicatedLog().size()); + + //fake snapshot on index 7 + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, + new MockRaftActorContext.MockPayload("foo-7")) + ); + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6)); + assertEquals(8, followerActor.getReplicatedLog().size()); + + assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); + + + ByteString snapshotBytes = fromObject(Arrays.asList( + new MockRaftActorContext.MockPayload("foo-0"), + new MockRaftActorContext.MockPayload("foo-1"), + new MockRaftActorContext.MockPayload("foo-2"), + new MockRaftActorContext.MockPayload("foo-3"), + new MockRaftActorContext.MockPayload("foo-4"))); + followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); + assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated()); + + // capture snapshot reply should remove the snapshotted entries only + assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log + assertEquals(7, followerActor.getReplicatedLog().lastIndex()); + + entries = + Arrays.asList( + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, + new MockRaftActorContext.MockPayload("foo-7")) + ); + // send an additional entry 8 with leaderCommit = 7 + followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7)); + + // 7 and 8, as lastapplied is 7 + assertEquals(2, followerActor.getReplicatedLog().size()); + + mockActorRef.tell(PoisonPill.getInstance(), getRef()); + + } + }; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 3893018008..42a7911be3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -74,7 +74,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { context.getTermInformation().update(1000, "test"); AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + new AppendEntries(100, "leader-1", 0, 0, null, 101, -1); RaftActorBehavior behavior = createBehavior(context); @@ -131,7 +131,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); AppendEntries appendEntries = - new AppendEntries(2, "leader-1", -1, 1, entries, 0); + new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1); RaftActorBehavior behavior = createBehavior(context); @@ -301,6 +301,39 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { }}; } + @Test + public void testFakeSnapshots() { + MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor); + AbstractRaftActorBehavior behavior = new Leader(context); + context.getTermInformation().update(1, "leader"); + + //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + context.setLastApplied(0); + assertEquals(-1, behavior.fakeSnapshot(0, -1)); + assertEquals(1, context.getReplicatedLog().size()); + + //2 entries, lastApplied still 0, no purging. + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setLastApplied(0); + assertEquals(-1, behavior.fakeSnapshot(0, -1)); + assertEquals(2, context.getReplicatedLog().size()); + + //2 entries, lastApplied still 0, no purging. + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build()); + context.setLastApplied(1); + assertEquals(0, behavior.fakeSnapshot(0, -1)); + assertEquals(1, context.getReplicatedLog().size()); + + //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged + context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build()); + context.setLastApplied(2); + assertEquals(1, behavior.fakeSnapshot(3, 1)); + assertEquals(3, context.getReplicatedLog().size()); + + + } + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm( ActorRef actorRef, RaftRPC rpc) { @@ -347,7 +380,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } protected AppendEntries createAppendEntriesWithNewerTerm() { - return new AppendEntries(100, "leader-1", 0, 0, null, 1); + return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1); } protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 485ee4b316..0dc68c2461 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -3,6 +3,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -16,9 +19,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; + import static org.junit.Assert.assertEquals; public class CandidateTest extends AbstractRaftActorBehaviorTest { @@ -167,7 +168,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Candidate candidate = new Candidate(createActorContext(getTestActor())); - candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.emptyList(), 0)); + candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.emptyList(), 0, -1)); final Boolean out = new ExpectMsg(duration("1 seconds"), "AppendEntriesResponse") { // do not put code outside this method, will run afterwards diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index a04d6aeb55..719a8256a0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -181,7 +181,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // The new commitIndex is 101 AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 100, 1, entries, 101); + new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); RaftActorBehavior raftBehavior = createBehavior(context).handleMessage(getRef(), appendEntries); @@ -217,7 +217,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + new AppendEntries(100, "leader-1", 0, 0, null, 101, -1); RaftActorBehavior behavior = createBehavior(context); @@ -293,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // This will not work for a Candidate because as soon as a Candidate // is created it increments the term AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 2, 1, entries, 4); + new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); RaftActorBehavior behavior = createBehavior(context); @@ -373,7 +373,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // This will not work for a Candidate because as soon as a Candidate // is created it increments the term AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 1, 1, entries, 3); + new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1); RaftActorBehavior behavior = createBehavior(context); @@ -446,7 +446,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1"))); AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 3, 1, entries, 4); + new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1); RaftActorBehavior behavior = createBehavior(context); @@ -502,7 +502,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1"))); AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 3, 1, entries, 4); + new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3); RaftActorBehavior behavior = createBehavior(context); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java index abde51bde5..5f5d73dbe6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java @@ -7,8 +7,6 @@ */ package org.opendaylight.controller.cluster.raft.messages; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -21,6 +19,9 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + /** * Unit tests for AppendEntries. * @@ -34,7 +35,7 @@ public class AppendEntriesTest { ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2")); - AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L); + AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1); AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected); @@ -44,7 +45,7 @@ public class AppendEntriesTest { @Test public void testToAndFromSerializable() { AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, - Collections.emptyList(), 10L); + Collections.emptyList(), 10L, -1); assertSame("toSerializable", entries, entries.toSerializable()); assertSame("fromSerializable", entries, @@ -54,7 +55,7 @@ public class AppendEntriesTest { @Test public void testToAndFromLegacySerializable() { ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload")); - AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L); + AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1); Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION); Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries); @@ -71,6 +72,7 @@ public class AppendEntriesTest { assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit()); assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex()); assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm()); + assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex()); assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size()); Iterator iter = expected.getEntries().iterator(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java index cf37cbdd00..21a0cb6a88 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java @@ -30,11 +30,11 @@ public abstract class AbstractUntypedActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { final String messageType = message.getClass().getSimpleName(); if(LOG.isDebugEnabled()) { - LOG.debug("Received message {}", messageType); +// LOG.debug("Received message {}", messageType); } handleReceive(message); if(LOG.isDebugEnabled()) { - LOG.debug("Done handling message {}", messageType); +// LOG.debug("Done handling message {}", messageType); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java index 5b7002eda2..ce7d6303ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.SerializationUtils; @@ -24,6 +22,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; + @Deprecated public class CompositeModificationByteStringPayloadTest { @@ -69,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest { entries.add(new ReplicatedLogImplEntry(0, 1, payload)); - assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable()); + assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java index a55f6b865d..90b978821f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java @@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest { }); AppendEntries appendEntries = - new AppendEntries(1, "member-1", 0, 100, entries, 1); + new AppendEntries(1, "member-1", 0, 100, entries, 1, -1); AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable(RaftVersions.HELIUM_VERSION); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java index 79c1bb4720..28fc6b0f57 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java @@ -98,7 +98,7 @@ public class Client { } }); - return new AppendEntries(1, "member-1", 0, 100, modification, 1); + return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1); } public static AppendEntries keyValueAppendEntries() { @@ -123,6 +123,6 @@ public class Client { } }); - return new AppendEntries(1, "member-1", 0, 100, modification, 1); + return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1); } } -- 2.36.6