We saw an issue where a duplicate log index was added to the journal.
The duplicates were contiguous. It is unclear at this point how it
happened but we should guard against it so I added a check to ensure the
new index > the last index.
Change-Id: I5acc0fa5b4fe7f4352fc7935e7262834894878f3
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstract class handling the mapping of
* logical LogEntry Index and the physical list index.
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
+
+ private final String logContext;
// We define this as ArrayList so we can use ensureCapacity.
private ArrayList<ReplicatedLogEntry> journal;
private int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
- long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+ long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
- this.journal = new ArrayList<>(unAppliedEntries);
+ this.logContext = logContext;
- for(ReplicatedLogEntry entry: journal) {
- dataSize += entry.size();
+ this.journal = new ArrayList<>(unAppliedEntries.size());
+ for(ReplicatedLogEntry entry: unAppliedEntries) {
+ append(entry);
}
}
public AbstractReplicatedLogImpl() {
- this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
+ this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
}
protected int adjustedIndex(long logEntryIndex) {
}
@Override
- public void append(ReplicatedLogEntry replicatedLogEntry) {
- journal.add(replicatedLogEntry);
- dataSize += replicatedLogEntry.size();
+ public boolean append(ReplicatedLogEntry replicatedLogEntry) {
+ if(replicatedLogEntry.getIndex() > lastIndex()) {
+ journal.add(replicatedLogEntry);
+ dataSize += replicatedLogEntry.size();
+ return true;
+ } else {
+ LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
+ logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
+ return false;
+ }
}
@Override
* Appends an entry to the log.
*
* @param replicatedLogEntry the entry to append
+ * @return true if the entry was successfully appended, false otherwise. An entry can fail to append if
+ * the index is already included in the log.
*/
- void append(ReplicatedLogEntry replicatedLogEntry);
+ boolean append(ReplicatedLogEntry replicatedLogEntry);
/**
* Optimization method to increase the capacity of the journal log prior to appending entries.
private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
final RaftActorContext context) {
- super(snapshotIndex, snapshotTerm, unAppliedEntries);
+ super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
this.context = Preconditions.checkNotNull(context);
}
context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- append(replicatedLogEntry);
+ if(!append(replicatedLogEntry)) {
+ return;
+ }
// When persisting events with persist it is guaranteed that the
// persistent actor will not receive further commands between the
@Override
public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
}
-
-
}
}
public void testAppendAndPersistExpectingNoCapture() throws Exception {
ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
- MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+ MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
- log.appendAndPersist(logEntry);
+ log.appendAndPersist(logEntry1);
- verifyPersist(logEntry);
+ verifyPersist(logEntry1);
assertEquals("size", 1, log.size());
reset(mockPersistence);
+ MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+ Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+ log.appendAndPersist(logEntry2, mockCallback);
+
+ verifyPersist(logEntry2);
+
+ verify(mockCallback).apply(same(logEntry2));
+
+ assertEquals("size", 2, log.size());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAppendAndPersisWithDuplicateEntry() throws Exception {
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
+
Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+ MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+
log.appendAndPersist(logEntry, mockCallback);
verifyPersist(logEntry);
- verify(mockCallback).apply(same(logEntry));
+ assertEquals("size", 1, log.size());
- assertEquals("size", 2, log.size());
+ reset(mockPersistence, mockCallback);
+
+ log.appendAndPersist(logEntry, mockCallback);
+
+ verifyNoMoreInteractions(mockPersistence, mockCallback);
+
+ assertEquals("size", 1, log.size());
}
@Test