import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AddressFromURIString;
-import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
- private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, memberName,"config").toString();
- return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
- }
-
private final Collection<ActorSystem> actorSystems = new ArrayList<>();
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
for(ActorSystem system: actorSystems) {
JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
}
+
+ actorFactory.close();
}
private ActorSystem newActorSystem(String config) {
return system;
}
+ private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+ String name = new ShardIdentifier(shardName, memberName,"config").toString();
+ if(system == getSystem()) {
+ return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
+ }
+
+ return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+ }
+
private Props newShardMgrProps() {
return newShardMgrProps(new MockConfiguration());
}
}
private TestShardManager newTestShardManager(Props props) {
- TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
+ TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
TestShardManager shardManager = shardManagerActor.underlyingActor();
shardManager.waitForRecoveryComplete();
return shardManager;
}
};
- final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
- Props.create(MessageCollectorActor.class), "default");
- final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
- Props.create(MessageCollectorActor.class), "topology");
+ final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
+ final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
new HashMap<String, Entry<ActorRef, DatastoreContext>>());
JavaTestKit kit = new JavaTestKit(getSystem());
- final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
- withDispatcher(Dispatchers.DefaultDispatcherId()));
+ final ActorRef shardManager = actorFactory.createActor(Props.create(
+ new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
-
- defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
new JavaTestKit(getSystem()) {{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
@Test
public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
@Test
public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testOnReceiveSwitchShardBehavior() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
new JavaTestKit(getSystem()) {{
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
SchemaContext schemaContext = TestModel.createTestContext();
new JavaTestKit(getSystem()) {{
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
@Test
public void testOnCreateShardWithNoInitialSchemaContext() {
new JavaTestKit(getSystem()) {{
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
Shard.Builder shardBuilder = Shard.builder();
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher(
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(mockConfig).withDispatcher(
Dispatchers.DefaultDispatcherId()));
shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
-
- shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@Test
public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
shardManager.tell(new AddShardReplica("model-inventory"), getRef());
@Test
public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
new JavaTestKit(getSystem()) {{
- TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
newPropsShardMgrWithMockShardActor(), shardMgrID);
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
- leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
new JavaTestKit(getSystem()) {{
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+ ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
put("astronauts", Arrays.asList("member-2")).build());
ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
- final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
new MockConfiguration(ImmutableMap.<String, List<String>>builder().
put("astronauts", Arrays.asList("member-2")).build());
- final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
put("astronauts", Arrays.asList("member-2")).build());
- final ActorRef newReplicaShardManager = getSystem().actorOf(newTestShardMgrBuilder(mockConfig).
+ final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
shardActor(mockShardActor).props(), shardMgrID);
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testRemoveShardReplicaForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
- newShardMgrProps(mockConfig));
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
shardManager.underlyingActor().waitForRecoveryComplete();
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
+ TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(MessageCollectorActor.props());
- TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
watch(shard);
InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
//create shardManager to come up with restored data
- TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
+ TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
newShardMgrProps(mockConfig));
newRestoredShardManager.underlyingActor().waitForRecoveryComplete();