2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.dispatch.ExecutionContexts;
25 import akka.dispatch.Futures;
26 import akka.testkit.javadsl.TestKit;
27 import akka.util.Timeout;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.time.Duration;
32 import java.util.Map.Entry;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Consumer;
36 import org.eclipse.jdt.annotation.NonNullByDefault;
37 import org.junit.Test;
38 import org.mockito.ArgumentCaptor;
39 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
40 import org.opendaylight.controller.cluster.datastore.config.Configuration;
41 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
47 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
48 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
49 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
50 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
55 private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
57 @Test(timeout = 10000)
58 public void testSuccessfulRegistration() {
59 final var kit = new TestKit(getSystem());
60 final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
61 mock(Configuration.class));
63 final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
64 final var proxy = startProxyAsync(actorUtils, path, false);
66 final var timeout = Duration.ofSeconds(5);
67 final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
68 assertEquals("shard-1", findLocalShard.getShardName());
70 kit.reply(new LocalShardFound(kit.getRef()));
72 final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
73 assertEquals(path, registerMsg.getPath());
74 assertFalse(registerMsg.isRegisterOnAllInstances());
76 kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
78 for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
79 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
82 assertEquals(getSystem().actorSelection(kit.getRef().path()), proxy.getListenerRegistrationActor());
84 kit.watch(proxy.getDataChangeListenerActor());
88 // The listener registration actor should get a Close message
89 kit.expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
91 // The DataChangeListener actor should be terminated
92 kit.expectMsgClass(timeout, Terminated.class);
96 kit.expectNoMessage();
99 @Test(timeout = 10000)
100 public void testSuccessfulRegistrationForClusteredListener() {
101 final var kit = new TestKit(getSystem());
102 final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
103 mock(Configuration.class));
105 final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
106 final var proxy = startProxyAsync(actorUtils, path, true);
108 final var timeout = Duration.ofSeconds(5);
109 final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
110 assertEquals("shard-1", findLocalShard.getShardName());
112 kit.reply(new LocalShardFound(kit.getRef()));
114 final var registerMsg = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
115 assertEquals(path, registerMsg.getPath());
116 assertTrue(registerMsg.isRegisterOnAllInstances());
121 @Test(timeout = 10000)
122 public void testLocalShardNotFound() {
123 final var kit = new TestKit(getSystem());
124 final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
125 mock(Configuration.class));
127 final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
128 final var proxy = startProxyAsync(actorUtils, path, true);
130 final var timeout = Duration.ofSeconds(5);
131 final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
132 assertEquals("shard-1", findLocalShard.getShardName());
134 kit.reply(new LocalShardNotFound("shard-1"));
136 kit.expectNoMessage(Duration.ofSeconds(1));
141 @Test(timeout = 10000)
142 public void testLocalShardNotInitialized() {
143 final var kit = new TestKit(getSystem());
144 final var actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
145 mock(Configuration.class));
147 final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
148 final var proxy = startProxyAsync(actorUtils, path, false);
150 final var timeout = Duration.ofSeconds(5);
151 final var findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
152 assertEquals("shard-1", findLocalShard.getShardName());
154 kit.reply(new NotInitializedException("not initialized"));
156 kit.within(Duration.ofSeconds(1), () -> {
157 kit.expectNoMessage();
165 public void testFailedRegistration() {
166 final var kit = new TestKit(getSystem());
167 final var mockActorSystem = mock(ActorSystem.class);
169 final var mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
170 doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
171 final var executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
173 final var actorUtils = mock(ActorUtils.class);
174 final var path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
176 doReturn(executor).when(actorUtils).getClientDispatcher();
177 doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
178 doReturn(mockActorSystem).when(actorUtils).getActorSystem();
180 doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
181 doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
182 doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync(
183 any(ActorRef.class), any(Object.class), any(Timeout.class));
185 final var proxy = DataTreeChangeListenerProxy.of(actorUtils, mockListener, path, true, "shard-1");
186 assertNull(proxy.getListenerRegistrationActor());
192 public void testCloseBeforeRegistration() {
193 final var kit = new TestKit(getSystem());
194 final var actorUtils = mock(ActorUtils.class);
196 doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
197 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher();
198 doReturn(getSystem()).when(actorUtils).getActorSystem();
199 doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorUtils).getNotificationDispatcherPath();
200 doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection(
201 kit.getRef().path());
202 doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
203 doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync("shard-1");
205 final var proxy = createProxy(actorUtils, YangInstanceIdentifier.of(TestModel.TEST_QNAME), true);
206 final var instance = proxy.getKey();
208 doAnswer(invocation -> {
210 return Futures.successful(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
211 }).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class), any(Timeout.class));
212 proxy.getValue().run();
214 kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
216 assertNull(instance.getListenerRegistrationActor());
220 private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
221 final boolean clustered) {
222 return startProxyAsync(actorUtils, path, clustered, Runnable::run);
226 private DataTreeChangeListenerProxy startProxyAsync(final ActorUtils actorUtils, final YangInstanceIdentifier path,
227 final boolean clustered, final Consumer<Runnable> execute) {
228 final var proxy = createProxy(actorUtils, path, clustered);
229 final var thread = new Thread(proxy.getValue());
230 thread.setDaemon(true);
232 return proxy.getKey();
236 private Entry<DataTreeChangeListenerProxy, Runnable> createProxy(final ActorUtils actorUtils,
237 final YangInstanceIdentifier path, final boolean clustered) {
238 final var executor = mock(Executor.class);
239 final var captor = ArgumentCaptor.forClass(Runnable.class);
240 doNothing().when(executor).execute(captor.capture());
241 final var proxy = DataTreeChangeListenerProxy.ofTesting(actorUtils, mockListener, path, clustered, "shard-1",
243 return Map.entry(proxy, captor.getValue());