import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
private final ActorRef actorRef;
private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
- private MockShardManager(boolean found, ActorRef actorRef){
+ private MockShardManager(final boolean found, final ActorRef actorRef){
this.found = found;
this.actorRef = actorRef;
}
- @Override public void onReceive(Object message) throws Exception {
+ @Override public void onReceive(final Object message) throws Exception {
if(message instanceof FindPrimary) {
FindPrimary fp = (FindPrimary)message;
Object resp = findPrimaryResponses.get(fp.getShardName());
}
}
- void addFindPrimaryResp(String shardName, Object resp) {
+ void addFindPrimaryResp(final String shardName, final Object resp) {
findPrimaryResponses.put(shardName, resp);
}
this.actorRef = null;
}
- MockShardManagerCreator(boolean found, ActorRef actorRef) {
+ MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
}
assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
}
- @Test
- public void testResolvePathForRemoteActor() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(
- ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForLocalActor() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka://system/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForRemoteActorWithProperRemoteAddress() {
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
- "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
-
@Test
public void testClientDispatcherIsGlobalDispatcher(){
ActorContext actorContext =
mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
-
}
@Test
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
- actorSystem.shutdown();
-
+ actorSystem.terminate();
}
@Test
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
}
};
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
}
};
testFindPrimaryExceptions(new NotInitializedException("not initialized"));
}
- private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return Futures.successful(expectedException);
}
};
mock(ClusterWrapper.class), mockConfig,
DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
- actorContext.broadcast(new TestMessage());
+ actorContext.broadcast(new Function<Short, Object>() {
+ @Override
+ public Object apply(final Short v) {
+ return new TestMessage();
+ }
+ }, TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);