/* * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.testkit.javadsl.TestKit; import com.google.common.collect.ImmutableList; import java.time.Duration; import java.util.List; import java.util.Set; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; public class RootDataTreeChangeListenerProxyTest extends AbstractActorTest { @Test(timeout = 10000) public void testSuccessfulRegistrationOnTwoShards() { final TestKit kit = new TestKit(getSystem()); ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( ClusteredDOMDataTreeChangeListener.class); final YangInstanceIdentifier path = YangInstanceIdentifier.empty(); final RootDataTreeChangeListenerProxy rootListenerProxy = new RootDataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, Set.of("shard-1", "shard-2")); final Duration timeout = Duration.ofSeconds(5); FindLocalShard findLocalShard1 = kit.expectMsgClass(FindLocalShard.class); kit.reply(new LocalShardFound(kit.getRef())); FindLocalShard findLocalShard2 = kit.expectMsgClass(FindLocalShard.class); kit.reply(new LocalShardFound(kit.getRef())); assertTrue(List.of(findLocalShard1.getShardName(), findLocalShard2.getShardName()) .containsAll(List.of("shard-2", "shard-1"))); RegisterDataTreeChangeListener registerForShard1 = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class); assertEquals("getPath", path, registerForShard1.getPath()); assertTrue("isRegisterOnAllInstances", registerForShard1.isRegisterOnAllInstances()); kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef())); RegisterDataTreeChangeListener registerForShard2 = kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class); assertEquals("getPath", path, registerForShard2.getPath()); assertTrue("isRegisterOnAllInstances", registerForShard2.isRegisterOnAllInstances()); kit.reply(new RegisterDataTreeNotificationListenerReply(kit.getRef())); assertEquals(registerForShard1.getListenerActorPath(), registerForShard2.getListenerActorPath()); final TestKit kit2 = new TestKit(getSystem()); final ActorRef rootListenerActor = getSystem().actorFor(registerForShard1.getListenerActorPath()); rootListenerActor.tell(new EnableNotification(true, "test"), kit.getRef()); final DataTreeCandidate peopleCandidate = DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), PeopleModel.create()); rootListenerActor.tell(new DataTreeChanged(ImmutableList.of(peopleCandidate)), kit.getRef()); rootListenerActor.tell(new DataTreeChanged(ImmutableList.of(peopleCandidate)), kit2.getRef()); //verify the 2 candidates were processed into 1 initial candidate verify(mockClusteredListener, timeout(100).times(1)).onDataTreeChanged(any()); rootListenerProxy.close(); } @Test(timeout = 10000, expected = java.lang.AssertionError.class) public void testNotAllShardsFound() { final TestKit kit = new TestKit(getSystem()); ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( ClusteredDOMDataTreeChangeListener.class); final RootDataTreeChangeListenerProxy rootListenerProxy = new RootDataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, Set.of("shard-1", "shard-2")); Duration timeout = Duration.ofSeconds(5); kit.expectMsgClass(FindLocalShard.class); kit.reply(new LocalShardFound(kit.getRef())); kit.expectMsgClass(FindLocalShard.class); // don't send second reply kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class); rootListenerProxy.close(); } @Test(timeout = 10000, expected = java.lang.AssertionError.class) public void testLocalShardNotInitialized() { final TestKit kit = new TestKit(getSystem()); ActorUtils actorUtils = new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class), mock(Configuration.class)); ClusteredDOMDataTreeChangeListener mockClusteredListener = mock( ClusteredDOMDataTreeChangeListener.class); final RootDataTreeChangeListenerProxy rootListenerProxy = new RootDataTreeChangeListenerProxy<>(actorUtils, mockClusteredListener, Set.of("shard-1")); Duration timeout = Duration.ofSeconds(5); kit.expectMsgClass(FindLocalShard.class); kit.reply(new NotInitializedException("not initialized")); // don't send second reply kit.expectMsgClass(timeout, RegisterDataTreeChangeListener.class); rootListenerProxy.close(); } }