- new JavaTestKit(getSystem()) {{
- ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
- ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
- TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
- MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
- shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
- shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
- shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
-
- Configuration mockConfig = mock(Configuration.class);
- doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
- when(mockConfig).getAllShardNames();
-
- ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
- mock(ClusterWrapper.class), mockConfig,
- DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
-
- actorContext.broadcast(new TestMessage());
-
- expectFirstMatching(shardActorRef1, TestMessage.class);
- expectFirstMatching(shardActorRef2, TestMessage.class);
- }};
- }
-
- private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
- int count = 5000 / 50;
- for(int i = 0; i < count; i++) {
- try {
- T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
- if(message != null) {
- return message;
- }
- } catch (Exception e) {}
-
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
-
- Assert.fail("Did not receive message of type " + clazz);
- return null;
+ new TestKit(getSystem()) {
+ {
+ ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
+ ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
+ MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
+ shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
+ shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
+ .getAllShardNames();
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
+ new PrimaryShardInfoFutureCache());
+
+ actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
+
+ MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+ MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
+ }
+ };