import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.time.Duration;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.Test;
-import org.mockito.stubbing.Answer;
+import org.mockito.ArgumentCaptor;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
@Test(timeout = 10000)
public void testSuccessfulRegistration() {
- final TestKit kit = new TestKit(getSystem());
- ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+ final var kit = new TestKit(getSystem());
+ final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorUtils, mockListener, path);
+ final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final var proxy = startProxyAsync(actorUtils, path, false);
- new Thread(() -> proxy.init("shard-1")).start();
-
- Duration timeout = Duration.ofSeconds(5);
- FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
- assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+ final var timeout = Duration.ofSeconds(5);
+ final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+ assertEquals("shard-1", findLocalShard.getShardName());
kit.reply(new LocalShardFound(kit.getRef()));
- RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
- RegisterDataTreeChangeListener.class);
- assertEquals("getPath", path, registerMsg.getPath());
- assertFalse("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
+ final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+ assertEquals(path, registerMsg.getPath());
+ assertFalse(registerMsg.isRegisterOnAllInstances());
kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()),
- proxy.getListenerRegistrationActor());
+ assertEquals(getSystem().actorSelection(kit.getRef().path()), proxy.getListenerRegistrationActor());
kit.watch(proxy.getDataChangeListenerActor());
@Test(timeout = 10000)
public void testSuccessfulRegistrationForClusteredListener() {
- final TestKit kit = new TestKit(getSystem());
- ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+ final var kit = new TestKit(getSystem());
+ final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
- ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
- ClusteredDOMDataTreeChangeListener.class);
-
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
- new DataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, path);
+ final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final var proxy = startProxyAsync(actorUtils, path, true);
- new Thread(() -> proxy.init("shard-1")).start();
-
- Duration timeout = Duration.ofSeconds(5);
- FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
- assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+ final var timeout = Duration.ofSeconds(5);
+ final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+ assertEquals("shard-1", findLocalShard.getShardName());
kit.reply(new LocalShardFound(kit.getRef()));
- RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
- RegisterDataTreeChangeListener.class);
- assertEquals("getPath", path, registerMsg.getPath());
- assertTrue("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
+ final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+ assertEquals(path, registerMsg.getPath());
+ assertTrue(registerMsg.isRegisterOnAllInstances());
proxy.close();
}
@Test(timeout = 10000)
public void testLocalShardNotFound() {
- final TestKit kit = new TestKit(getSystem());
- ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+ final var kit = new TestKit(getSystem());
+ final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorUtils, mockListener, path);
-
- new Thread(() -> proxy.init("shard-1")).start();
+ final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final var proxy = startProxyAsync(actorUtils, path, true);
- Duration timeout = Duration.ofSeconds(5);
- FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
- assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+ final var timeout = Duration.ofSeconds(5);
+ final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+ assertEquals("shard-1", findLocalShard.getShardName());
kit.reply(new LocalShardNotFound("shard-1"));
@Test(timeout = 10000)
public void testLocalShardNotInitialized() {
- final TestKit kit = new TestKit(getSystem());
- ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+ final var kit = new TestKit(getSystem());
+ final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
mock(Configuration.class));
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorUtils, mockListener, path);
+ final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final var proxy = startProxyAsync(actorUtils, path, false);
- new Thread(() -> proxy.init("shard-1")).start();
-
- Duration timeout = Duration.ofSeconds(5);
- FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
- assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+ final var timeout = Duration.ofSeconds(5);
+ final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
+ assertEquals("shard-1", findLocalShard.getShardName());
kit.reply(new NotInitializedException("not initialized"));
@Test
public void testFailedRegistration() {
- final TestKit kit = new TestKit(getSystem());
- ActorSystem mockActorSystem = mock(ActorSystem.class);
+ final var kit = new TestKit(getSystem());
+ final var mockActorSystem = mock(ActorSystem.class);
- ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
+ final var mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
- ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
+ final var executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
- ActorUtils actorUtils = mock(ActorUtils.class);
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final var actorUtils = mock(ActorUtils.class);
+ final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
doReturn(executor).when(actorUtils).getClientDispatcher();
doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
doReturn(mockActorSystem).when(actorUtils).getActorSystem();
- String shardName = "shard-1";
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorUtils, mockListener, path);
-
doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
- doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
+ doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync(
any(ActorRef.class), any(Object.class), any(Timeout.class));
- doReturn(mock(DatastoreContext.class)).when(actorUtils).getDatastoreContext();
-
- proxy.init("shard-1");
- assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
+ final var proxy = DataTreeChangeListenerProxy.of(actorUtils, mockListener, path, true, "shard-1");
+ assertNull(proxy.getListenerRegistrationActor());
proxy.close();
}
@Test
public void testCloseBeforeRegistration() {
- final TestKit kit = new TestKit(getSystem());
- ActorUtils actorUtils = mock(ActorUtils.class);
-
- String shardName = "shard-1";
+ final var kit = new TestKit(getSystem());
+ final var actorUtils = mock(ActorUtils.class);
doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher();
doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection(
kit.getRef().path());
doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
- doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
+ doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
- final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
- actorUtils, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
+ final var proxy = createProxy(actorUtils, YangInstanceIdentifier.of(TestModel.TEST_QNAME), true);
+ final var instance = proxy.getKey();
- Answer<Future<Object>> answer = invocation -> {
- proxy.close();
- return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef()));
- };
+ doAnswer(invocation -> {
+ instance.close();
+ return Futures.successful(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
+ }).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
+ proxy.getValue().run();
- doAnswer(answer).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class),
- any(Timeout.class));
+ kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
- proxy.init(shardName);
+ assertNull(instance.getListenerRegistrationActor());
+ }
- kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
+ @NonNullByDefault
+ private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
+ final boolean clustered) {
+ return startProxyAsync(actorUtils, path, clustered, Runnable::run);
+ }
+
+ @NonNullByDefault
+ private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
+ final boolean clustered, final Consumer<Runnable> execute) {
+ final var proxy = createProxy(actorUtils, path, clustered);
+ final var thread = new Thread(proxy.getValue());
+ thread.setDaemon(true);
+ thread.start();
+ return proxy.getKey();
+ }
- assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
+ @NonNullByDefault
+ private Entry<DataTreeChangeListenerProxy, Runnable> createProxy(final ActorUtils actorUtils,
+ final YangInstanceIdentifier path, final boolean clustered) {
+ final var executor = mock(Executor.class);
+ final var captor = ArgumentCaptor.forClass(Runnable.class);
+ doNothing().when(executor).execute(captor.capture());
+ final var proxy = DataTreeChangeListenerProxy.ofTesting(actorUtils, mockListener, path, clustered, "shard-1",
+ executor);
+ return Map.entry(proxy, captor.getValue());
}
}