* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import akka.actor.Status.Failure;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.actor.Status.Failure;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import scala.concurrent.duration.FiniteDuration;
public class RaftActorTest extends AbstractActorTest {
import scala.concurrent.duration.FiniteDuration;
public class RaftActorTest extends AbstractActorTest {
String persistenceId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
- public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
- final JavaTestKit kit = new JavaTestKit(getSystem());
+ public void testUpdateElectionTermPersistedWithPersistenceDisabled() {
+ final TestKit kit = new TestKit(getSystem());
String persistenceId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
String persistenceId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(),
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
ref = factory.createTestActor(MockRaftActor.props(persistenceId,
ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
String persistenceId = factory.generateActorId("leader-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("leader-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
.id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
final TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder()
.id(persistenceId).config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
persistenceId);
List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
MockRaftActor raftActor = raftActorRef.underlyingActor();
final String newLeaderId = "new-leader";
final short newLeaderVersion = 6;
Follower follower = new Follower(raftActor.getRaftActorContext()) {
@Override
MockRaftActor raftActor = raftActorRef.underlyingActor();
final String newLeaderId = "new-leader";
final short newLeaderVersion = 6;
Follower follower = new Follower(raftActor.getRaftActorContext()) {
@Override
assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
assertEquals(newLeaderId, leaderStateChange.getLeaderId());
assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
assertEquals(newLeaderId, leaderStateChange.getLeaderId());
assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
final String persistenceId = factory.generateActorId("leader-");
final String follower1Id = factory.generateActorId("follower-");
final String persistenceId = factory.generateActorId("leader-");
final String follower1Id = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
final String persistenceId = factory.generateActorId("follower-");
final String leaderId = factory.generateActorId("leader-");
final String persistenceId = factory.generateActorId("follower-");
final String leaderId = factory.generateActorId("leader-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
final String follower1Id = factory.generateActorId("follower-");
final String follower2Id = factory.generateActorId("follower-");
final String follower1Id = factory.generateActorId("follower-");
final String follower2Id = factory.generateActorId("follower-");
- final ActorRef followerActor1 = factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
- final ActorRef followerActor2 = factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
+ final ActorRef followerActor1 = factory.createActor(MessageCollectorActor.props(), follower1Id);
+ final ActorRef followerActor2 = factory.createActor(MessageCollectorActor.props(), follower2Id);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setSnapshotBatchCount(5);
DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("follower-");
ImmutableMap<String, String> peerAddresses =
DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("follower-");
ImmutableMap<String, String> peerAddresses =
String persistenceId = factory.generateActorId("test-actor-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("test-actor-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
assertEquals("getId", persistenceId, reply.getId());
GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
assertEquals("getId", persistenceId, reply.getId());
assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
reset(mockRaftActor.snapshotCohortDelegate);
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
reset(mockRaftActor.snapshotCohortDelegate);
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
- mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
+ mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(
+ FiniteDuration.create(30, TimeUnit.SECONDS));
verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
assertEquals("getId", persistenceId, reply.getId());
verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
assertEquals("getId", persistenceId, reply.getId());
assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
TEST_LOG.info("testRestoreFromSnapshot starting");
String persistenceId = factory.generateActorId("test-actor-");
TEST_LOG.info("testRestoreFromSnapshot starting");
String persistenceId = factory.generateActorId("test-actor-");
InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
persistenceId = factory.generateActorId("test-actor-");
raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
persistenceId = factory.generateActorId("test-actor-");
raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
.persistent(Optional.of(Boolean.FALSE)).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
mockRaftActor = raftActorRef.underlyingActor();
.persistent(Optional.of(Boolean.FALSE)).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
mockRaftActor = raftActorRef.underlyingActor();
new MockRaftActorContext.MockPayload("B")));
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
new MockRaftActorContext.MockPayload("B")));
TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
.withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
TEST_LOG.info("testNonVotingOnRecovery starting");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
TEST_LOG.info("testNonVotingOnRecovery starting");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
final String leaderId = factory.generateActorId("leader-");
final String followerId = factory.generateActorId("follower-");
final String leaderId = factory.generateActorId("leader-");
final String followerId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
final String leaderId = factory.generateActorId("leader-");
final String followerId = factory.generateActorId("follower-");
final String leaderId = factory.generateActorId("leader-");
final String followerId = factory.generateActorId("follower-");
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));