2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.Props;
22 import akka.actor.Terminated;
23 import akka.dispatch.ExecutionContexts;
24 import akka.dispatch.Futures;
25 import akka.testkit.javadsl.TestKit;
26 import akka.util.Timeout;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.time.Duration;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Test;
32 import org.mockito.stubbing.Answer;
33 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
37 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
38 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
39 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
40 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
41 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
42 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
43 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import scala.concurrent.ExecutionContextExecutor;
49 import scala.concurrent.Future;
51 public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
52 private final DOMDataTreeChangeListener mockListener = mock(DOMDataTreeChangeListener.class);
54 @Test(timeout = 10000)
55 public void testSuccessfulRegistration() {
56 final TestKit kit = new TestKit(getSystem());
57 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
58 mock(Configuration.class));
60 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
61 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
62 actorUtils, mockListener, path);
64 new Thread(() -> proxy.init("shard-1")).start();
66 Duration timeout = Duration.ofSeconds(5);
67 FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
68 assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
70 kit.reply(new LocalShardFound(kit.getRef()));
72 RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
73 RegisterDataTreeChangeListener.class);
74 assertEquals("getPath", path, registerMsg.getPath());
75 assertFalse("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
77 kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
79 for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
80 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
83 assertEquals("getListenerRegistrationActor", getSystem().actorSelection(kit.getRef().path()),
84 proxy.getListenerRegistrationActor());
86 kit.watch(proxy.getDataChangeListenerActor());
90 // The listener registration actor should get a Close message
91 kit.expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
93 // The DataChangeListener actor should be terminated
94 kit.expectMsgClass(timeout, Terminated.class);
98 kit.expectNoMessage();
101 @Test(timeout = 10000)
102 public void testSuccessfulRegistrationForClusteredListener() {
103 final TestKit kit = new TestKit(getSystem());
104 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
105 mock(Configuration.class));
107 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
108 ClusteredDOMDataTreeChangeListener.class);
110 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
111 final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
112 new DataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, path);
114 new Thread(() -> proxy.init("shard-1")).start();
116 Duration timeout = Duration.ofSeconds(5);
117 FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
118 assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
120 kit.reply(new LocalShardFound(kit.getRef()));
122 RegisterDataTreeChangeListener registerMsg = kit.expectMsgClass(timeout,
123 RegisterDataTreeChangeListener.class);
124 assertEquals("getPath", path, registerMsg.getPath());
125 assertTrue("isRegisterOnAllInstances", registerMsg.isRegisterOnAllInstances());
130 @Test(timeout = 10000)
131 public void testLocalShardNotFound() {
132 final TestKit kit = new TestKit(getSystem());
133 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
134 mock(Configuration.class));
136 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
137 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
138 actorUtils, mockListener, path);
140 new Thread(() -> proxy.init("shard-1")).start();
142 Duration timeout = Duration.ofSeconds(5);
143 FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
144 assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
146 kit.reply(new LocalShardNotFound("shard-1"));
148 kit.expectNoMessage(Duration.ofSeconds(1));
153 @Test(timeout = 10000)
154 public void testLocalShardNotInitialized() {
155 final TestKit kit = new TestKit(getSystem());
156 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
157 mock(Configuration.class));
159 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
160 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
161 actorUtils, mockListener, path);
163 new Thread(() -> proxy.init("shard-1")).start();
165 Duration timeout = Duration.ofSeconds(5);
166 FindLocalShard findLocalShard = kit.expectMsgClass(timeout, FindLocalShard.class);
167 assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
169 kit.reply(new NotInitializedException("not initialized"));
171 kit.within(Duration.ofSeconds(1), () -> {
172 kit.expectNoMessage();
180 public void testFailedRegistration() {
181 final TestKit kit = new TestKit(getSystem());
182 ActorSystem mockActorSystem = mock(ActorSystem.class);
184 ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class), "testFailedRegistration");
185 doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
186 ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
188 ActorUtils actorUtils = mock(ActorUtils.class);
189 final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
191 doReturn(executor).when(actorUtils).getClientDispatcher();
192 doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
193 doReturn(mockActorSystem).when(actorUtils).getActorSystem();
195 String shardName = "shard-1";
196 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
197 actorUtils, mockListener, path);
199 doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
200 doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
201 doReturn(Futures.failed(new RuntimeException("mock"))).when(actorUtils).executeOperationAsync(
202 any(ActorRef.class), any(Object.class), any(Timeout.class));
203 doReturn(mock(DatastoreContext.class)).when(actorUtils).getDatastoreContext();
205 proxy.init("shard-1");
207 assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
213 public void testCloseBeforeRegistration() {
214 final TestKit kit = new TestKit(getSystem());
215 ActorUtils actorUtils = mock(ActorUtils.class);
217 String shardName = "shard-1";
219 doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
220 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorUtils).getClientDispatcher();
221 doReturn(getSystem()).when(actorUtils).getActorSystem();
222 doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorUtils).getNotificationDispatcherPath();
223 doReturn(getSystem().actorSelection(kit.getRef().path())).when(actorUtils).actorSelection(
224 kit.getRef().path());
225 doReturn(kit.duration("5 seconds")).when(actorUtils).getOperationDuration();
226 doReturn(Futures.successful(kit.getRef())).when(actorUtils).findLocalShardAsync(eq(shardName));
228 final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
229 actorUtils, mockListener, YangInstanceIdentifier.of(TestModel.TEST_QNAME));
231 Answer<Future<Object>> answer = invocation -> {
233 return Futures.successful((Object) new RegisterDataTreeNotificationListenerReply(kit.getRef()));
236 doAnswer(answer).when(actorUtils).executeOperationAsync(any(ActorRef.class), any(Object.class),
239 proxy.init(shardName);
241 kit.expectMsgClass(Duration.ofSeconds(5), CloseDataTreeNotificationListenerRegistration.class);
243 assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());