List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
snapshottedJournal.addAll(snapshotJournalEntries);
- clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+ snapshotJournalEntries.clear();
previousSnapshotIndex = snapshotIndex;
setSnapshotIndex(snapshotCapturedIndex);
/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
}
+ @Test
+ public void testEmptyLog() {
+ replicatedLogImpl = new MockAbstractReplicatedLogImpl();
+
+ assertEquals("size", 0, replicatedLogImpl.size());
+ assertEquals("dataSize", 0, replicatedLogImpl.dataSize());
+ assertEquals("getSnapshotIndex", -1, replicatedLogImpl.getSnapshotIndex());
+ assertEquals("getSnapshotTerm", -1, replicatedLogImpl.getSnapshotTerm());
+ assertEquals("lastIndex", -1, replicatedLogImpl.lastIndex());
+ assertEquals("lastTerm", -1, replicatedLogImpl.lastTerm());
+ assertEquals("isPresent", false, replicatedLogImpl.isPresent(0));
+ assertEquals("isInSnapshot", false, replicatedLogImpl.isInSnapshot(0));
+ Assert.assertNull("get(0)", replicatedLogImpl.get(0));
+ Assert.assertNull("last", replicatedLogImpl.last());
+
+ List<ReplicatedLogEntry> list = replicatedLogImpl.getFrom(0, 1);
+ assertEquals("getFrom size", 0, list.size());
+
+ assertEquals("removeFrom", -1, replicatedLogImpl.removeFrom(1));
+
+ replicatedLogImpl.setSnapshotIndex(2);
+ replicatedLogImpl.setSnapshotTerm(1);
+
+ assertEquals("getSnapshotIndex", 2, replicatedLogImpl.getSnapshotIndex());
+ assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+ assertEquals("lastIndex", 2, replicatedLogImpl.lastIndex());
+ assertEquals("lastTerm", 1, replicatedLogImpl.lastTerm());
+ }
+
@Test
public void testIndexOperations() {
replicatedLogImpl.snapshotPreCommit(-1, -1);
assertEquals(8, replicatedLogImpl.size());
assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+ assertEquals(-1, replicatedLogImpl.getSnapshotTerm());
- replicatedLogImpl.snapshotPreCommit(4, 3);
+ replicatedLogImpl.snapshotPreCommit(4, 2);
assertEquals(3, replicatedLogImpl.size());
assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+ assertEquals(2, replicatedLogImpl.getSnapshotTerm());
replicatedLogImpl.snapshotPreCommit(6, 3);
assertEquals(1, replicatedLogImpl.size());
assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+ assertEquals(3, replicatedLogImpl.getSnapshotTerm());
replicatedLogImpl.snapshotPreCommit(7, 3);
assertEquals(0, replicatedLogImpl.size());
assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+ assertEquals(3, replicatedLogImpl.getSnapshotTerm());
//running it again on an empty list should not throw exception
replicatedLogImpl.snapshotPreCommit(7, 3);
assertEquals(0, replicatedLogImpl.size());
assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+ assertEquals(3, replicatedLogImpl.getSnapshotTerm());
+ }
+
+ @Test
+ public void testSnapshotCommit() {
+
+ replicatedLogImpl.snapshotPreCommit(1, 1);
+
+ replicatedLogImpl.snapshotCommit();
+
+ assertEquals("size", 2, replicatedLogImpl.size());
+ assertEquals("dataSize", 2, replicatedLogImpl.dataSize());
+ assertEquals("getSnapshotIndex", 1, replicatedLogImpl.getSnapshotIndex());
+ assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+ assertEquals("lastIndex", 3, replicatedLogImpl.lastIndex());
+ assertEquals("lastTerm", 2, replicatedLogImpl.lastTerm());
+
+ Assert.assertNull("get(0)", replicatedLogImpl.get(0));
+ Assert.assertNull("get(1)", replicatedLogImpl.get(1));
+ Assert.assertNotNull("get(2)", replicatedLogImpl.get(2));
+ Assert.assertNotNull("get(3)", replicatedLogImpl.get(3));
+ }
+
+ @Test
+ public void testSnapshotRollback() {
+ replicatedLogImpl.snapshotPreCommit(1, 1);
+
+ assertEquals("size", 2, replicatedLogImpl.size());
+ assertEquals("getSnapshotIndex", 1, replicatedLogImpl.getSnapshotIndex());
+ assertEquals("getSnapshotTerm", 1, replicatedLogImpl.getSnapshotTerm());
+
+ replicatedLogImpl.snapshotRollback();
+
+ assertEquals("size", 4, replicatedLogImpl.size());
+ assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+ assertEquals("getSnapshotIndex", -1, replicatedLogImpl.getSnapshotIndex());
+ assertEquals("getSnapshotTerm", -1, replicatedLogImpl.getSnapshotTerm());
+ Assert.assertNotNull("get(0)", replicatedLogImpl.get(0));
+ Assert.assertNotNull("get(3)", replicatedLogImpl.get(3));
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import akka.japi.Procedure;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ElectionTermImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class ElectionTermImplTest {
+ private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+ @Mock
+ private DataPersistenceProvider mockPersistence;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testUpdateAndPersist() throws Exception {
+ ElectionTermImpl impl = new ElectionTermImpl(mockPersistence, "test", LOG);
+
+ impl.updateAndPersist(10, "member-1");
+
+ assertEquals("getCurrentTerm", 10, impl.getCurrentTerm());
+ assertEquals("getVotedFor", "member-1", impl.getVotedFor());
+
+ ArgumentCaptor<Object> message = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistence).persist(message.capture(), procedure.capture());
+
+ assertEquals("Message type", UpdateElectionTerm.class, message.getValue().getClass());
+ UpdateElectionTerm update = (UpdateElectionTerm)message.getValue();
+ assertEquals("getCurrentTerm", 10, update.getCurrentTerm());
+ assertEquals("getVotedFor", "member-1", update.getVotedFor());
+
+ procedure.getValue().apply(null);
+ }
+}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
}
- @Test
- public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- CountDownLatch persistLatch = new CountDownLatch(1);
- DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
- dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
-
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
- mockRaftActor.waitForInitializeBehaviorComplete();
-
- mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
-
- assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
- }
- };
- }
-
- @Test
- public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
- mockRaftActor.waitForInitializeBehaviorComplete();
-
- MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
-
- mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
-
- verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
- }
- };
- }
-
- @Test
- public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
-
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
- mockRaftActor.waitForInitializeBehaviorComplete();
-
- mockRaftActor.waitUntilLeader();
-
- mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
-
- mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
-
- verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
- }
- };
- }
-
@Test
public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
new JavaTestKit(getSystem()) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.japi.Procedure;
+import com.google.common.base.Supplier;
+import java.util.Collections;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.internal.matchers.Same;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ReplicatedLogImpl.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReplicatedLogImplTest {
+ private static final Logger LOG = LoggerFactory.getLogger(RaftActorRecoverySupportTest.class);
+
+ @Mock
+ private DataPersistenceProvider mockPersistence;
+
+ @Mock
+ private RaftActorBehavior mockBehavior;
+
+ @Mock
+ private SnapshotManager mockSnapshotManager;
+
+ private RaftActorContext context;
+ private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ context = new RaftActorContextImpl(null, null, "test",
+ new ElectionTermImpl(mockPersistence, "test", LOG),
+ -1, -1, Collections.<String,String>emptyMap(), configParams, LOG) {
+ @Override
+ public SnapshotManager getSnapshotManager() {
+ return mockSnapshotManager;
+ }
+ };
+ }
+
+ private void verifyPersist(Object message) throws Exception {
+ verifyPersist(message, new Same(message));
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void verifyPersist(Object message, Matcher<?> matcher) throws Exception {
+ ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+
+ procedure.getValue().apply(message);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAppendAndPersistExpectingNoCapture() throws Exception {
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+ MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
+
+ log.appendAndPersist(logEntry);
+
+ verifyPersist(logEntry);
+
+ assertEquals("size", 1, log.size());
+
+ reset(mockPersistence);
+
+ Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
+ log.appendAndPersist(logEntry, mockCallback);
+
+ verifyPersist(logEntry);
+
+ verify(mockCallback).apply(same(logEntry));
+ verifyNoMoreInteractions(mockSnapshotManager);
+
+ assertEquals("size", 2, log.size());
+ }
+
+ @Test
+ public void testAppendAndPersistExpectingCaptureDueToJournalCount() throws Exception {
+ configParams.setSnapshotBatchCount(2);
+
+ doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
+
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+ MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
+ MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
+
+ log.appendAndPersist(logEntry1);
+ verifyPersist(logEntry1);
+
+ verifyNoMoreInteractions(mockSnapshotManager);
+ reset(mockPersistence);
+
+ log.appendAndPersist(logEntry2);
+ verifyPersist(logEntry2);
+
+ verify(mockSnapshotManager).capture(same(logEntry2), eq(1L));
+
+ assertEquals("size", 2, log.size());
+ }
+
+ @Test
+ public void testAppendAndPersistExpectingCaptureDueToDataSize() throws Exception {
+ doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
+
+ context.setTotalMemoryRetriever(new Supplier<Long>() {
+ @Override
+ public Long get() {
+ return 100L;
+ }
+ });
+
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+ int dataSize = 600;
+ MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
+
+ doReturn(true).when(mockSnapshotManager).capture(same(logEntry), eq(1L));
+
+ log.appendAndPersist(logEntry);
+ verifyPersist(logEntry);
+
+ verify(mockSnapshotManager).capture(same(logEntry), eq(1L));
+
+ reset(mockPersistence, mockSnapshotManager);
+
+ logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
+
+ log.appendAndPersist(logEntry);
+ verifyPersist(logEntry);
+
+ verifyNoMoreInteractions(mockSnapshotManager);
+
+ assertEquals("size", 2, log.size());
+ }
+
+ @Test
+ public void testRemoveFromAndPersist() throws Exception {
+
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockPersistence, mockBehavior);
+
+ log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
+ log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
+ log.append(new MockReplicatedLogEntry(1, 2, new MockPayload("2")));
+
+ log.removeFromAndPersist(1);
+
+ DeleteEntries deleteEntries = new DeleteEntries(1);
+ verifyPersist(deleteEntries, match(deleteEntries));
+
+ assertEquals("size", 1, log.size());
+
+ reset(mockPersistence);
+
+ log.removeFromAndPersist(1);
+
+ verifyNoMoreInteractions(mockPersistence);
+ }
+
+ public Matcher<DeleteEntries> match(final DeleteEntries actual){
+ return new BaseMatcher<DeleteEntries>() {
+ @Override
+ public boolean matches(Object o) {
+ DeleteEntries other = (DeleteEntries) o;
+ return actual.getFromIndex() == other.getFromIndex();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("DeleteEntries: fromIndex: " + actual.getFromIndex());
+ }
+ };
+ }
+}
*/
void deleteMessages(long sequenceNumber);
+ /**
+ * Returns the last sequence number contained in the journal.
+ */
+ long getLastSequenceNumber();
}
public void deleteMessages(long sequenceNumber) {
delegate.deleteMessages(sequenceNumber);
}
+
+ @Override
+ public long getLastSequenceNumber() {
+ return delegate.getLastSequenceNumber();
+ }
}
@Override
public void deleteMessages(long sequenceNumber) {
}
+
+ @Override
+ public long getLastSequenceNumber() {
+ return -1;
+ }
}
\ No newline at end of file
public void deleteMessages(long sequenceNumber) {
persistentActor.deleteMessages(sequenceNumber);
}
+
+ @Override
+ public long getLastSequenceNumber() {
+ return persistentActor.lastSequenceNr();
+ }
}
\ No newline at end of file
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
-
import java.util.concurrent.CountDownLatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
/**
* This class is intended for testing purposes. It just triggers CountDownLatch's in each method.
public void setDeleteMessagesLatch(CountDownLatch deleteMessagesLatch) {
this.deleteMessagesLatch = deleteMessagesLatch;
}
+
+ @Override
+ public long getLastSequenceNumber() {
+ return -1;
+ }
}