From 8360177d8f021df9078ac54919a816a73fbee0a0 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sat, 28 Mar 2015 10:23:40 -0400 Subject: [PATCH] Calculate replicated log data size on recovery We maintain the replicated log data size at runtme but we should also calculate the data size for recovered entries on startup. I changed the AbstractReplicatedLogImpl#append method to also add to the dataSize. Previously dataSize was adjusted in ReplicatedLogImpl#appendAndPersist after it was persisted. I'm not sure why it was done this way but if persistence failed then the entry would've been added to the in-memory log without increasing the dataSize. This seems inconsistent - if we add to the log we should always increase the dataSize. The same with removing entries from the log - ReplicatedLogImpl re-calculated the dataSize after it was persisted. So I changed removeFrom to adjust dataSize and changed ReplicatedLogImpl#removeFromAndPersist to call AbstractReplicatedLogImpl#removeFrom (code was duplicated). To avoid out-of-band changes to dataSize I made it private. Same with journal. I think this is safer - these should be owned by AbstractReplicatedLogImpl and derived classes shouldn't modify these directly. Change-Id: I114cbac1d6a450bc0a1c8c6ee60042ad28a89bf4 Signed-off-by: Tom Pantelis --- .../raft/AbstractReplicatedLogImpl.java | 26 ++++++-- .../raft/RaftActorRecoverySupport.java | 9 +-- .../cluster/raft/ReplicatedLog.java | 3 +- .../cluster/raft/ReplicatedLogImpl.java | 22 ++----- .../raft/AbstractReplicatedLogImplTest.java | 62 +++++++++++-------- .../cluster/raft/RaftActorTest.java | 7 ++- 6 files changed, 74 insertions(+), 55 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 1aecc89eea..c27ff37386 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; @@ -19,7 +20,7 @@ import java.util.List; public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { // We define this as ArrayList so we can use ensureCapacity. - protected ArrayList journal; + private ArrayList journal; private long snapshotIndex = -1; private long snapshotTerm = -1; @@ -28,13 +29,17 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { private ArrayList snapshottedJournal; private long previousSnapshotIndex = -1; private long previousSnapshotTerm = -1; - protected int dataSize = 0; + private int dataSize = 0; public AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List unAppliedEntries) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; this.journal = new ArrayList<>(unAppliedEntries); + + for(ReplicatedLogEntry entry: journal) { + dataSize += entry.size(); + } } public AbstractReplicatedLogImpl() { @@ -90,18 +95,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void removeFrom(long logEntryIndex) { + public long removeFrom(long logEntryIndex) { int adjustedIndex = adjustedIndex(logEntryIndex); if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { // physical index should be less than list size and >= 0 - return; + return -1; + } + + for(int i = adjustedIndex; i < journal.size(); i++) { + dataSize -= journal.get(i).size(); } + journal.subList(adjustedIndex , journal.size()).clear(); + + return adjustedIndex; } @Override public void append(ReplicatedLogEntry replicatedLogEntry) { journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); } @Override @@ -230,4 +243,9 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { snapshotTerm = previousSnapshotTerm; previousSnapshotTerm = -1; } + + @VisibleForTesting + ReplicatedLogEntry getAtPhysicalIndex(int index) { + return journal.get(index); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 5f33c738e1..d2c14de73d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -104,14 +104,15 @@ class RaftActorRecoverySupport { cohort.applyRecoverySnapshot(snapshot.getState()); timer.stop(); - log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + - replicatedLog().size(), context.getId(), timer.toString(), - replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm()); + log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}", + context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(), + replicatedLog().getSnapshotTerm(), replicatedLog().size()); } private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { if(log.isDebugEnabled()) { - log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex()); + log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(), + logEntry.getIndex(), logEntry.size()); } replicatedLog().append(logEntry); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 3e4d727c71..8388eaf743 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -51,8 +51,9 @@ public interface ReplicatedLog { * information * * @param index the index of the log entry + * @return the adjusted index of the first log entry removed or -1 if log entry not found. */ - void removeFrom(long index); + long removeFrom(long index); /** diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index fdb6305381..5a77b9aea3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -28,10 +28,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private final Procedure deleteProcedure = new Procedure() { @Override public void apply(DeleteEntries param) { - dataSize = 0; - for (ReplicatedLogEntry entry : journal) { - dataSize += entry.size(); - } } }; @@ -57,16 +53,11 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { @Override public void removeFromAndPersist(long logEntryIndex) { - int adjustedIndex = adjustedIndex(logEntryIndex); - - if (adjustedIndex < 0) { - return; - } - // FIXME: Maybe this should be done after the command is saved - journal.subList(adjustedIndex , journal.size()).clear(); - - persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure); + long adjustedIndex = removeFrom(logEntryIndex); + if(adjustedIndex >= 0) { + persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure); + } } @Override @@ -83,7 +74,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs - journal.add(replicatedLogEntry); + append(replicatedLogEntry); // When persisting events with persist it is guaranteed that the // persistent actor will not receive further commands between the @@ -96,8 +87,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public void apply(ReplicatedLogEntry evt) throws Exception { int logEntrySize = replicatedLogEntry.size(); - dataSize += logEntrySize; - long dataSizeForCheck = dataSize; + long dataSizeForCheck = dataSize(); dataSizeSinceLastSnapshot += logEntrySize; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index 8fdb7ea226..c99f253657 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -15,7 +15,6 @@ import akka.japi.Procedure; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -40,14 +39,6 @@ public class AbstractReplicatedLogImplTest { } - @After - public void tearDown() { - replicatedLogImpl.journal.clear(); - replicatedLogImpl.setSnapshotIndex(-1); - replicatedLogImpl.setSnapshotTerm(-1); - replicatedLogImpl = null; - } - @Test public void testIndexOperations() { @@ -65,7 +56,7 @@ public class AbstractReplicatedLogImplTest { // now create a snapshot of 3 entries, with 1 unapplied entry left in the log // It removes the entries which have made it to snapshot // and updates the snapshot index and term - Map state = takeSnapshot(3); + takeSnapshot(3); // check the values after the snapshot. // each index value passed in the test is the logical index (log entry index) @@ -101,7 +92,7 @@ public class AbstractReplicatedLogImplTest { assertEquals(2, replicatedLogImpl.getFrom(6).size()); // take a second snapshot with 5 entries with 0 unapplied entries left in the log - state = takeSnapshot(5); + takeSnapshot(5); assertEquals(0, replicatedLogImpl.size()); assertNull(replicatedLogImpl.last()); @@ -187,19 +178,45 @@ public class AbstractReplicatedLogImplTest { assertTrue(replicatedLogImpl.isPresent(5)); } + @Test + public void testRemoveFrom() { + + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3))); + + assertEquals("dataSize", 9, replicatedLogImpl.dataSize()); + + long adjusted = replicatedLogImpl.removeFrom(4); + assertEquals("removeFrom - adjusted", 4, adjusted); + assertEquals("size", 4, replicatedLogImpl.size()); + assertEquals("dataSize", 4, replicatedLogImpl.dataSize()); + + takeSnapshot(1); + + adjusted = replicatedLogImpl.removeFrom(2); + assertEquals("removeFrom - adjusted", 1, adjusted); + assertEquals("size", 1, replicatedLogImpl.size()); + assertEquals("dataSize", 1, replicatedLogImpl.dataSize()); + + assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0)); + assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100)); + } + // create a snapshot for test public Map takeSnapshot(final int numEntries) { Map map = new HashMap<>(numEntries); - List entries = replicatedLogImpl.getEntriesTill(numEntries); - for (ReplicatedLogEntry entry : entries) { + + long lastIndex = 0; + long lastTerm = 0; + for(int i = 0; i < numEntries; i++) { + ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i); map.put(entry.getIndex(), entry.getData().toString()); + lastIndex = entry.getIndex(); + lastTerm = entry.getTerm(); } - int term = (int) replicatedLogImpl.lastTerm(); - int lastIndex = (int) entries.get(entries.size() - 1).getIndex(); - entries.clear(); - replicatedLogImpl.setSnapshotTerm(term); - replicatedLogImpl.setSnapshotIndex(lastIndex); + replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm); + replicatedLogImpl.snapshotCommit(); return map; @@ -213,15 +230,6 @@ public class AbstractReplicatedLogImplTest { public void removeFromAndPersist(final long index) { } - @Override - public int dataSize() { - return -1; - } - - public List getEntriesTill(final int index) { - return journal.subList(0, index); - } - @Override public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback) { } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 9b8ca5c5ca..14bfd1d348 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -417,11 +417,11 @@ public class RaftActorTest extends AbstractActorTest { // add more entries after snapshot is taken List entries = new ArrayList<>(); ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F")); + new MockRaftActorContext.MockPayload("F", 2)); ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("G")); + new MockRaftActorContext.MockPayload("G", 3)); ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("H")); + new MockRaftActorContext.MockPayload("H", 4)); entries.add(entry2); entries.add(entry3); entries.add(entry4); @@ -451,6 +451,7 @@ public class RaftActorTest extends AbstractActorTest { RaftActorContext context = ref.underlyingActor().getRaftActorContext(); assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size()); + assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize()); assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex()); assertEquals("Last applied", lastAppliedToState, context.getLastApplied()); assertEquals("Commit index", lastAppliedToState, context.getCommitIndex()); -- 2.36.6