Guard against duplicate log indexes 75/39975/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 3 Jun 2016 15:59:31 +0000 (11:59 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 14 Jun 2016 02:28:33 +0000 (02:28 +0000)
We saw an issue where a duplicate log index was added to the journal.
The duplicates were contiguous. It is unclear at this point how it
happened but we should guard against it so I added a check to ensure the
new index > the last index.

Change-Id: I5acc0fa5b4fe7f4352fc7935e7262834894878f3
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java

index c245206..0d125ed 100644 (file)
@@ -12,12 +12,17 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract class handling the mapping of
  * logical LogEntry Index and the physical list index.
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
+
+    private final String logContext;
 
     // We define this as ArrayList so we can use ensureCapacity.
     private ArrayList<ReplicatedLogEntry> journal;
@@ -32,18 +37,19 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     private int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
-        long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
-        this.journal = new ArrayList<>(unAppliedEntries);
+        this.logContext = logContext;
 
-        for(ReplicatedLogEntry entry: journal) {
-            dataSize += entry.size();
+        this.journal = new ArrayList<>(unAppliedEntries.size());
+        for(ReplicatedLogEntry entry: unAppliedEntries) {
+            append(entry);
         }
     }
 
     public AbstractReplicatedLogImpl() {
-        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
+        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
     }
 
     protected int adjustedIndex(long logEntryIndex) {
@@ -112,9 +118,16 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     @Override
-    public void append(ReplicatedLogEntry replicatedLogEntry) {
-        journal.add(replicatedLogEntry);
-        dataSize += replicatedLogEntry.size();
+    public boolean append(ReplicatedLogEntry replicatedLogEntry) {
+        if(replicatedLogEntry.getIndex() > lastIndex()) {
+            journal.add(replicatedLogEntry);
+            dataSize += replicatedLogEntry.size();
+            return true;
+        } else {
+            LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
+                    logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
+            return false;
+        }
     }
 
     @Override
index ffa4cb1..7ff6e86 100644 (file)
@@ -65,8 +65,10 @@ public interface ReplicatedLog {
      * Appends an entry to the log.
      *
      * @param replicatedLogEntry the entry to append
+     * @return true if the entry was successfully appended, false otherwise. An entry can fail to append if
+     *         the index is already included in the log.
      */
-    void append(ReplicatedLogEntry replicatedLogEntry);
+    boolean append(ReplicatedLogEntry replicatedLogEntry);
 
     /**
      * Optimization method to increase the capacity of the journal log prior to appending entries.
index 7e1cb69..9eaf6c3 100644 (file)
@@ -30,7 +30,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
             final RaftActorContext context) {
-        super(snapshotIndex, snapshotTerm, unAppliedEntries);
+        super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
         this.context = Preconditions.checkNotNull(context);
     }
 
@@ -98,7 +98,9 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
 
         // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-        append(replicatedLogEntry);
+        if(!append(replicatedLogEntry)) {
+            return;
+        }
 
         // When persisting events with persist it is guaranteed that the
         // persistent actor will not receive further commands between the
index c6ee950..d2fe7dd 100644 (file)
@@ -77,24 +77,48 @@ public class ReplicatedLogImplTest {
     public void testAppendAndPersistExpectingNoCapture() throws Exception {
         ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
 
-        MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+        MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
 
-        log.appendAndPersist(logEntry);
+        log.appendAndPersist(logEntry1);
 
-        verifyPersist(logEntry);
+        verifyPersist(logEntry1);
 
         assertEquals("size", 1, log.size());
 
         reset(mockPersistence);
 
+        MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+        Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+        log.appendAndPersist(logEntry2, mockCallback);
+
+        verifyPersist(logEntry2);
+
+        verify(mockCallback).apply(same(logEntry2));
+
+        assertEquals("size", 2, log.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAppendAndPersisWithDuplicateEntry() throws Exception {
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
+
         Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+        MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+
         log.appendAndPersist(logEntry, mockCallback);
 
         verifyPersist(logEntry);
 
-        verify(mockCallback).apply(same(logEntry));
+        assertEquals("size", 1, log.size());
 
-        assertEquals("size", 2, log.size());
+        reset(mockPersistence, mockCallback);
+
+        log.appendAndPersist(logEntry, mockCallback);
+
+        verifyNoMoreInteractions(mockPersistence, mockCallback);
+
+        assertEquals("size", 1, log.size());
     }
 
     @Test