2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.timeout;
15 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.testkit.javadsl.TestKit;
19 import com.google.common.collect.ImmutableList;
20 import java.time.Duration;
21 import java.util.List;
23 import org.junit.Test;
24 import org.opendaylight.controller.cluster.datastore.config.Configuration;
25 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
26 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
27 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
28 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
31 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
34 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
39 public class RootDataTreeChangeListenerProxyTest extends AbstractActorTest {
41 @Test(timeout = 10000)
42 public void testSuccessfulRegistrationOnTwoShards() {
43 final TestKit kit = new TestKit(getSystem());
44 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
45 mock(Configuration.class));
47 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
48 ClusteredDOMDataTreeChangeListener.class);
50 final YangInstanceIdentifier path = YangInstanceIdentifier.empty();
51 final RootDataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> rootListenerProxy =
52 new RootDataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener,
53 Set.of("shard-1", "shard-2"));
55 final Duration timeout = Duration.ofSeconds(5);
56 FindLocalShard findLocalShard1 = kit.expectMsgClass(FindLocalShard.class);
57 kit.reply(new LocalShardFound(kit.getRef()));
58 FindLocalShard findLocalShard2 = kit.expectMsgClass(FindLocalShard.class);
59 kit.reply(new LocalShardFound(kit.getRef()));
60 assertTrue(List.of(findLocalShard1.getShardName(), findLocalShard2.getShardName())
61 .containsAll(List.of("shard-2", "shard-1")));
63 RegisterDataTreeChangeListener registerForShard1 = kit.expectMsgClass(timeout,
64 RegisterDataTreeChangeListener.class);
65 assertEquals("getPath", path, registerForShard1.getPath());
66 assertTrue("isRegisterOnAllInstances", registerForShard1.isRegisterOnAllInstances());
68 kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
70 RegisterDataTreeChangeListener registerForShard2 = kit.expectMsgClass(timeout,
71 RegisterDataTreeChangeListener.class);
72 assertEquals("getPath", path, registerForShard2.getPath());
73 assertTrue("isRegisterOnAllInstances", registerForShard2.isRegisterOnAllInstances());
75 kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef()));
77 assertEquals(registerForShard1.getListenerActorPath(), registerForShard2.getListenerActorPath());
79 final TestKit kit2 = new TestKit(getSystem());
80 final ActorRef rootListenerActor = getSystem().actorFor(registerForShard1.getListenerActorPath());
81 rootListenerActor.tell(new EnableNotification(true, "test"), kit.getRef());
82 final DataTreeCandidate peopleCandidate = DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(),
83 PeopleModel.create());
84 rootListenerActor.tell(new DataTreeChanged(ImmutableList.of(peopleCandidate)), kit.getRef());
85 rootListenerActor.tell(new DataTreeChanged(ImmutableList.of(peopleCandidate)), kit2.getRef());
86 //verify the 2 candidates were processed into 1 initial candidate
87 verify(mockClusteredListener, timeout(100).times(1)).onDataTreeChanged(any());
89 rootListenerProxy.close();
92 @Test(timeout = 10000, expected = java.lang.AssertionError.class)
93 public void testNotAllShardsFound() {
94 final TestKit kit = new TestKit(getSystem());
95 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
96 mock(Configuration.class));
98 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
99 ClusteredDOMDataTreeChangeListener.class);
101 final RootDataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> rootListenerProxy =
102 new RootDataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, Set.of("shard-1", "shard-2"));
104 Duration timeout = Duration.ofSeconds(5);
105 kit.expectMsgClass(FindLocalShard.class);
106 kit.reply(new LocalShardFound(kit.getRef()));
107 kit.expectMsgClass(FindLocalShard.class);
108 // don't send second reply
109 kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
111 rootListenerProxy.close();
114 @Test(timeout = 10000, expected = java.lang.AssertionError.class)
115 public void testLocalShardNotInitialized() {
116 final TestKit kit = new TestKit(getSystem());
117 ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
118 mock(Configuration.class));
120 ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(
121 ClusteredDOMDataTreeChangeListener.class);
123 final RootDataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> rootListenerProxy =
124 new RootDataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, Set.of("shard-1"));
126 Duration timeout = Duration.ofSeconds(5);
127 kit.expectMsgClass(FindLocalShard.class);
128 kit.reply(new NotInitializedException("not initialized"));
129 // don't send second reply
130 kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
132 rootListenerProxy.close();