import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
factory = new TestActorFactory(getSystem());
}
- @SuppressWarnings("unchecked")
- private static DataPersistenceProvider mockPersistenceProvider() {
- final DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
- doReturn(false).when(dataPersistenceProvider).isRecoveryApplicable();
- doReturn(0L).when(dataPersistenceProvider).getLastSequenceNumber();
- doNothing().when(dataPersistenceProvider).saveSnapshot(any(Object.class));
- doNothing().when(dataPersistenceProvider).persist(any(Object.class), any(Procedure.class));
- doNothing().when(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
- doNothing().when(dataPersistenceProvider).deleteMessages(0L);
-
- return dataPersistenceProvider;
- }
-
@After
public void tearDown() throws Exception {
factory.close();
mockRaftActor.waitForRecoveryComplete();
RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
- doReturn(false).when(mockSupport).handleRecoveryMessage(any(Object.class), any(PersistentDataProvider.class));
- mockRaftActor.setRaftActorRecoverySupport(mockSupport);
+ mockRaftActor.setRaftActorRecoverySupport(mockSupport );
Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
mockRaftActor.handleCommand(captureSnapshotReply);
- SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
+ SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotSuccess);
- SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
+ SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotFailure);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("F"));
- mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+ final Identifier id = new StringIdentifier("apply-state");
+ mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
- verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+ verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
}
};
Follower follower = new Follower(raftActor.getRaftActorContext()) {
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
- leaderId = newLeaderId;
+ setLeaderId(newLeaderId);
setLeaderPayloadVersion(newLeaderVersion);
return this;
}
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
- leader, Runtime.getRuntime().totalMemory());
+ Runtime.getRuntime().totalMemory());
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(leaderId, leaderActor1.path().toString());
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// simulate a real snapshot
- leaderActor.onReceiveCommand(new SendHeartBeat());
+ leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// Persist another entry (this will cause a CaptureSnapshot to be triggered
- leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+ leaderActor.persistData(mockActorRef, new StringIdentifier("x"),
+ new MockRaftActorContext.MockPayload("duh"));
// Now send a CaptureSnapshotReply
mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
String persistenceId = factory.generateActorId("follower-");
ImmutableMap<String, String> peerAddresses =
ImmutableMap.<String, String>builder().put("member1", "address").build();
- DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
- doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
reset(mockRaftActor.snapshotCohortDelegate);
- doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
mockRaftActor.setPersistence(false);
reset(mockRaftActor.snapshotCohortDelegate);
- doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
reply = kit.expectMsgClass(GetSnapshotReply.class);
mockRaftActor.waitForRecoveryComplete();
- verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class));
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
RaftActorContext context = mockRaftActor.getRaftActorContext();
assertEquals("Journal log size", 1, context.getReplicatedLog().size());
MessageCollectorActor.clearMessages(notifierActor);
- raftActorRef.tell(new LeaderTransitioning(), ActorRef.noSender());
+ raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());