import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
private static CommonConfig config;
private final ReentrantLock lock = new ReentrantLock();
- @Before
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void setUp() throws Exception {
config = new CommonConfig.Builder<>("testsystem").withConfigReader(new AkkaConfigurationReader() {
@Override
public Config read() {
actorSystem = ActorSystem.create("testsystem", config.get());
}
- @After
- public void tearDown() throws Exception {
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (actorSystem != null) {
+ actorSystem.terminate();
+ actorSystem = null;
+ }
}
@Test
ReentrantLock lock;
- private PingPongActor(ReentrantLock lock){
+ private PingPongActor(final ReentrantLock lock){
this.lock = lock;
}
}
@Override
- public void onReceive(Object message) throws Exception {
+ public void onReceive(final Object message) throws Exception {
lock.lock();
try {
if ("ping".equals(message)) {
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
public class ActorSystemProviderImpl implements ActorSystemProvider, AutoCloseable {
@Override
public ListenerRegistration<ActorSystemProviderListener> registerActorSystemProviderListener(
- ActorSystemProviderListener listener) {
+ final ActorSystemProviderListener listener) {
return listeners.register(listener);
}
public void close() {
LOG.info("Shutting down ActorSystem");
- actorSystem.shutdown();
try {
- actorSystem.awaitTermination(Duration.create(10, TimeUnit.SECONDS));
+ Await.result(actorSystem.terminate(), Duration.create(10, TimeUnit.SECONDS));
} catch (Exception e) {
LOG.warn("Error awaiting actor termination", e);
}
private final ActorRef actorRef;
private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
- private MockShardManager(boolean found, ActorRef actorRef){
+ private MockShardManager(final boolean found, final ActorRef actorRef){
this.found = found;
this.actorRef = actorRef;
}
- @Override public void onReceive(Object message) throws Exception {
+ @Override public void onReceive(final Object message) throws Exception {
if(message instanceof FindPrimary) {
FindPrimary fp = (FindPrimary)message;
Object resp = findPrimaryResponses.get(fp.getShardName());
}
}
- void addFindPrimaryResp(String shardName, Object resp) {
+ void addFindPrimaryResp(final String shardName, final Object resp) {
findPrimaryResponses.put(shardName, resp);
}
this.actorRef = null;
}
- MockShardManagerCreator(boolean found, ActorRef actorRef) {
+ MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
}
mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
-
}
@Test
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
- actorSystem.shutdown();
-
+ actorSystem.terminate();
}
@Test
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
}
};
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
}
};
testFindPrimaryExceptions(new NotInitializedException("not initialized"));
}
- private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
return Futures.successful(expectedException);
}
};
actorContext.broadcast(new Function<Short, Object>() {
@Override
- public Object apply(Short v) {
+ public Object apply(final Short v) {
return new TestMessage();
}
}, TestMessage.class);