From 294db15b1d9f51d9e8b4a708856ade7e3d5f657f Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 3 Jun 2016 11:59:31 -0400 Subject: [PATCH 1/1] Guard against duplicate log indexes We saw an issue where a duplicate log index was added to the journal. The duplicates were contiguous. It is unclear at this point how it happened but we should guard against it so I added a check to ensure the new index > the last index. Change-Id: I5acc0fa5b4fe7f4352fc7935e7262834894878f3 Signed-off-by: Tom Pantelis --- .../raft/AbstractReplicatedLogImpl.java | 29 +++++++++++----- .../cluster/raft/ReplicatedLog.java | 4 ++- .../cluster/raft/ReplicatedLogImpl.java | 6 ++-- .../raft/AbstractReplicatedLogImplTest.java | 2 -- .../cluster/raft/ReplicatedLogImplTest.java | 34 ++++++++++++++++--- 5 files changed, 57 insertions(+), 18 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index c245206f64..0d125ed6e2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -12,12 +12,17 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract class handling the mapping of * logical LogEntry Index and the physical list index. */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class); + + private final String logContext; // We define this as ArrayList so we can use ensureCapacity. private ArrayList journal; @@ -32,18 +37,19 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { private int dataSize = 0; public AbstractReplicatedLogImpl(long snapshotIndex, - long snapshotTerm, List unAppliedEntries) { + long snapshotTerm, List unAppliedEntries, String logContext) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; - this.journal = new ArrayList<>(unAppliedEntries); + this.logContext = logContext; - for(ReplicatedLogEntry entry: journal) { - dataSize += entry.size(); + this.journal = new ArrayList<>(unAppliedEntries.size()); + for(ReplicatedLogEntry entry: unAppliedEntries) { + append(entry); } } public AbstractReplicatedLogImpl() { - this(-1L, -1L, Collections.emptyList()); + this(-1L, -1L, Collections.emptyList(), ""); } protected int adjustedIndex(long logEntryIndex) { @@ -112,9 +118,16 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void append(ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); - dataSize += replicatedLogEntry.size(); + public boolean append(ReplicatedLogEntry replicatedLogEntry) { + if(replicatedLogEntry.getIndex() > lastIndex()) { + journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); + return true; + } else { + LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", + logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace")); + return false; + } } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index ffa4cb1172..7ff6e86227 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -65,8 +65,10 @@ public interface ReplicatedLog { * Appends an entry to the log. * * @param replicatedLogEntry the entry to append + * @return true if the entry was successfully appended, false otherwise. An entry can fail to append if + * the index is already included in the log. */ - void append(ReplicatedLogEntry replicatedLogEntry); + boolean append(ReplicatedLogEntry replicatedLogEntry); /** * Optimization method to increase the capacity of the journal log prior to appending entries. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 7e1cb69aff..9eaf6c3505 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -30,7 +30,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List unAppliedEntries, final RaftActorContext context) { - super(snapshotIndex, snapshotTerm, unAppliedEntries); + super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId()); this.context = Preconditions.checkNotNull(context); } @@ -98,7 +98,9 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs - append(replicatedLogEntry); + if(!append(replicatedLogEntry)) { + return; + } // When persisting events with persist it is guaranteed that the // persistent actor will not receive further commands between the diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index a3e4396f37..8a8d313ed8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -332,7 +332,5 @@ public class AbstractReplicatedLogImplTest { @Override public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { } - - } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java index c6ee950d14..d2fe7dd4fe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java @@ -77,24 +77,48 @@ public class ReplicatedLogImplTest { public void testAppendAndPersistExpectingNoCapture() throws Exception { ReplicatedLog log = ReplicatedLogImpl.newInstance(context); - MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1")); + MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 1, new MockPayload("1")); - log.appendAndPersist(logEntry); + log.appendAndPersist(logEntry1); - verifyPersist(logEntry); + verifyPersist(logEntry1); assertEquals("size", 1, log.size()); reset(mockPersistence); + MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 2, new MockPayload("2")); + Procedure mockCallback = Mockito.mock(Procedure.class); + log.appendAndPersist(logEntry2, mockCallback); + + verifyPersist(logEntry2); + + verify(mockCallback).apply(same(logEntry2)); + + assertEquals("size", 2, log.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void testAppendAndPersisWithDuplicateEntry() throws Exception { + ReplicatedLog log = ReplicatedLogImpl.newInstance(context); + Procedure mockCallback = Mockito.mock(Procedure.class); + MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1")); + log.appendAndPersist(logEntry, mockCallback); verifyPersist(logEntry); - verify(mockCallback).apply(same(logEntry)); + assertEquals("size", 1, log.size()); - assertEquals("size", 2, log.size()); + reset(mockPersistence, mockCallback); + + log.appendAndPersist(logEntry, mockCallback); + + verifyNoMoreInteractions(mockPersistence, mockCallback); + + assertEquals("size", 1, log.size()); } @Test -- 2.36.6