import akka.actor.Terminated;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
-import akka.testkit.JavaTestKit;
+import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@Test(timeout = 10000)
public void testSuccessfulRegistration() {
- new JavaTestKit(getSystem()) {
+ new TestKit(getSystem()) {
{
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
Assert.assertEquals("getScope", scope, registerMsg.getScope());
Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
- reply(new RegisterChangeListenerReply(getRef()));
+ reply(new RegisterDataTreeNotificationListenerReply(getRef()));
for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
proxy.close();
// The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+ expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
// The DataChangeListener actor should be terminated
expectMsgClass(timeout, Terminated.class);
@Test(timeout = 10000)
public void testSuccessfulRegistrationForClusteredListener() {
- new JavaTestKit(getSystem()) {
+ new TestKit(getSystem()) {
{
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
Assert.assertEquals("getScope", scope, registerMsg.getScope());
Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
- reply(new RegisterChangeListenerReply(getRef()));
+ reply(new RegisterDataTreeNotificationListenerReply(getRef()));
for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
proxy.close();
// The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataChangeListenerRegistration.class);
+ expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
// The DataChangeListener actor should be terminated
expectMsgClass(timeout, Terminated.class);
@Test(timeout = 10000)
public void testLocalShardNotFound() {
- new JavaTestKit(getSystem()) {
+ new TestKit(getSystem()) {
{
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
@Test(timeout = 10000)
public void testLocalShardNotInitialized() {
- new JavaTestKit(getSystem()) {
+ new TestKit(getSystem()) {
{
ActorContext actorContext = new ActorContext(getSystem(), getRef(),
mock(ClusterWrapper.class), mock(Configuration.class));
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
reply(new NotInitializedException("not initialized"));
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
- expectNoMsg();
- }
- };
-
+ expectNoMsg(duration("1 seconds"));
proxy.close();
}
};
@Test
public void testFailedRegistration() {
- new JavaTestKit(getSystem()) {
+ new TestKit(getSystem()) {
{
ActorSystem mockActorSystem = mock(ActorSystem.class);
@Test
public void testCloseBeforeRegistration() {
- new JavaTestKit(getSystem()) {
+ new TestKit(getSystem()) {
{
ActorContext actorContext = mock(ActorContext.class);
Answer<Future<Object>> answer = invocation -> {
proxy.close();
- return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
+ return Futures.successful((Object)new RegisterDataTreeNotificationListenerReply(getRef()));
};
doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
AsyncDataBroker.DataChangeScope.ONE);
- expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.class);
+ expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
proxy.close();