*/
package org.opendaylight.controller.cluster.raft;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
// We define this as ArrayList so we can use ensureCapacity.
- protected ArrayList<ReplicatedLogEntry> journal;
+ private ArrayList<ReplicatedLogEntry> journal;
private long snapshotIndex = -1;
private long snapshotTerm = -1;
private ArrayList<ReplicatedLogEntry> snapshottedJournal;
private long previousSnapshotIndex = -1;
private long previousSnapshotTerm = -1;
- protected int dataSize = 0;
+ private int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.journal = new ArrayList<>(unAppliedEntries);
+
+ for(ReplicatedLogEntry entry: journal) {
+ dataSize += entry.size();
+ }
}
public AbstractReplicatedLogImpl() {
}
@Override
- public void removeFrom(long logEntryIndex) {
+ public long removeFrom(long logEntryIndex) {
int adjustedIndex = adjustedIndex(logEntryIndex);
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
// physical index should be less than list size and >= 0
- return;
+ return -1;
+ }
+
+ for(int i = adjustedIndex; i < journal.size(); i++) {
+ dataSize -= journal.get(i).size();
}
+
journal.subList(adjustedIndex , journal.size()).clear();
+
+ return adjustedIndex;
}
@Override
public void append(ReplicatedLogEntry replicatedLogEntry) {
journal.add(replicatedLogEntry);
+ dataSize += replicatedLogEntry.size();
}
@Override
snapshotTerm = previousSnapshotTerm;
previousSnapshotTerm = -1;
}
+
+ @VisibleForTesting
+ ReplicatedLogEntry getAtPhysicalIndex(int index) {
+ return journal.get(index);
+ }
}
cohort.applyRecoverySnapshot(snapshot.getState());
timer.stop();
- log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog().size(), context.getId(), timer.toString(),
- replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
+ log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
if(log.isDebugEnabled()) {
- log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex());
+ log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
+ logEntry.getIndex(), logEntry.size());
}
replicatedLog().append(logEntry);
* information
*
* @param index the index of the log entry
+ * @return the adjusted index of the first log entry removed or -1 if log entry not found.
*/
- void removeFrom(long index);
+ long removeFrom(long index);
/**
private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
@Override
public void apply(DeleteEntries param) {
- dataSize = 0;
- for (ReplicatedLogEntry entry : journal) {
- dataSize += entry.size();
- }
}
};
@Override
public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
// FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ long adjustedIndex = removeFrom(logEntryIndex);
+ if(adjustedIndex >= 0) {
+ persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure);
+ }
}
@Override
}
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- journal.add(replicatedLogEntry);
+ append(replicatedLogEntry);
// When persisting events with persist it is guaranteed that the
// persistent actor will not receive further commands between the
public void apply(ReplicatedLogEntry evt) throws Exception {
int logEntrySize = replicatedLogEntry.size();
- dataSize += logEntrySize;
- long dataSizeForCheck = dataSize;
+ long dataSizeForCheck = dataSize();
dataSizeSinceLastSnapshot += logEntrySize;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
}
- @After
- public void tearDown() {
- replicatedLogImpl.journal.clear();
- replicatedLogImpl.setSnapshotIndex(-1);
- replicatedLogImpl.setSnapshotTerm(-1);
- replicatedLogImpl = null;
- }
-
@Test
public void testIndexOperations() {
// now create a snapshot of 3 entries, with 1 unapplied entry left in the log
// It removes the entries which have made it to snapshot
// and updates the snapshot index and term
- Map<Long, String> state = takeSnapshot(3);
+ takeSnapshot(3);
// check the values after the snapshot.
// each index value passed in the test is the logical index (log entry index)
assertEquals(2, replicatedLogImpl.getFrom(6).size());
// take a second snapshot with 5 entries with 0 unapplied entries left in the log
- state = takeSnapshot(5);
+ takeSnapshot(5);
assertEquals(0, replicatedLogImpl.size());
assertNull(replicatedLogImpl.last());
assertTrue(replicatedLogImpl.isPresent(5));
}
+ @Test
+ public void testRemoveFrom() {
+
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2)));
+ replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3)));
+
+ assertEquals("dataSize", 9, replicatedLogImpl.dataSize());
+
+ long adjusted = replicatedLogImpl.removeFrom(4);
+ assertEquals("removeFrom - adjusted", 4, adjusted);
+ assertEquals("size", 4, replicatedLogImpl.size());
+ assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+
+ takeSnapshot(1);
+
+ adjusted = replicatedLogImpl.removeFrom(2);
+ assertEquals("removeFrom - adjusted", 1, adjusted);
+ assertEquals("size", 1, replicatedLogImpl.size());
+ assertEquals("dataSize", 1, replicatedLogImpl.dataSize());
+
+ assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0));
+ assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100));
+ }
+
// create a snapshot for test
public Map<Long, String> takeSnapshot(final int numEntries) {
Map<Long, String> map = new HashMap<>(numEntries);
- List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
- for (ReplicatedLogEntry entry : entries) {
+
+ long lastIndex = 0;
+ long lastTerm = 0;
+ for(int i = 0; i < numEntries; i++) {
+ ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i);
map.put(entry.getIndex(), entry.getData().toString());
+ lastIndex = entry.getIndex();
+ lastTerm = entry.getTerm();
}
- int term = (int) replicatedLogImpl.lastTerm();
- int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
- entries.clear();
- replicatedLogImpl.setSnapshotTerm(term);
- replicatedLogImpl.setSnapshotIndex(lastIndex);
+ replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm);
+ replicatedLogImpl.snapshotCommit();
return map;
public void removeFromAndPersist(final long index) {
}
- @Override
- public int dataSize() {
- return -1;
- }
-
- public List<ReplicatedLogEntry> getEntriesTill(final int index) {
- return journal.subList(0, index);
- }
-
@Override
public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
}
// add more entries after snapshot is taken
List<ReplicatedLogEntry> entries = new ArrayList<>();
ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
- new MockRaftActorContext.MockPayload("F"));
+ new MockRaftActorContext.MockPayload("F", 2));
ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
- new MockRaftActorContext.MockPayload("G"));
+ new MockRaftActorContext.MockPayload("G", 3));
ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
- new MockRaftActorContext.MockPayload("H"));
+ new MockRaftActorContext.MockPayload("H", 4));
entries.add(entry2);
entries.add(entry3);
entries.add(entry4);
RaftActorContext context = ref.underlyingActor().getRaftActorContext();
assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
context.getReplicatedLog().size());
+ assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());