From 75a7b88f4b97e59fc53de293a0d6f0f5ab1396b9 Mon Sep 17 00:00:00 2001 From: Marian Adamjak Date: Wed, 22 Apr 2015 08:04:32 +0200 Subject: [PATCH] BUG 3029 - netconf event source modification - add TopicId into published notifications Change-Id: I9ff95935fd37c2978a7e41d77646546adc30d4d2 Signed-off-by: Marian Adamjak --- .../netconf/NetconfEventSource.java | 207 +++++++++++++----- .../netconf/NetconfEventSourceManager.java | 24 +- .../app/impl/NetconfEventSourceTest.java | 80 ++----- 3 files changed, 179 insertions(+), 132 deletions(-) diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java index 615fa34b7c..3dbdc98ea5 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -25,7 +24,6 @@ import javax.xml.transform.dom.DOMResult; import javax.xml.transform.dom.DOMSource; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; @@ -53,7 +51,6 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; @@ -78,6 +75,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); + private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); @@ -88,74 +86,74 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, private final DOMMountPoint netconfMount; private final DOMNotificationPublishService domPublish; - private final Set activeStreams = new ConcurrentSkipListSet<>(); private final Map urnPrefixToStreamMap; - private final ConcurrentHashMap> listenerRegistrationMap = new ConcurrentHashMap<>(); - public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) { + private final ConcurrentHashMap streamNotifRegistrationMap = new ConcurrentHashMap<>(); + + public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService) { this.netconfMount = netconfMount; this.node = node; this.nodeId = node.getNodeId().getValue(); this.urnPrefixToStreamMap = streamMap; this.domPublish = publishService; + this.initializeStreamNotifRegistrationMap(); LOG.info("NetconfEventSource [{}] created.", nodeId); } + private void initializeStreamNotifRegistrationMap(){ + for(String streamName : this.urnPrefixToStreamMap.values()){ + streamNotifRegistrationMap.put(streamName, new StreamNotificationTopicRegistration(streamName, this.nodeId, this.netconfMount, this)); + } + } + @Override public Future> joinTopic(final JoinTopicInput input) { + final NotificationPattern notificationPattern = input.getNotificationPattern(); final List matchingNotifications = getMatchingNotifications(notificationPattern); - return registerNotificationListener(input.getTopicId(),matchingNotifications); + return registerTopic(input.getTopicId(),matchingNotifications); + } - private synchronized Future> registerNotificationListener(final TopicId topicId, final List notificationsToSubscribe){ - if(listenerRegistrationMap.containsKey(topicId)){ - final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId; - return immediateFuture(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, errMessage).build()); - } - ListenerRegistration registration = null; - JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; - final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ - if(notifyService.isPresent()) { - for (final SchemaPath qName : notificationsToSubscribe) { - startSubscription(qName); + JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; + if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ + final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + if(notifyService.isPresent()){ + int subscribedStreams = 0; + for(SchemaPath schemaNotification : notificationsToSubscribe){ + final Optional streamName = resolveStream(schemaNotification.getLastComponent()); + if(streamName.isPresent()){ + LOG.info("Stream {} is activating, TopicId {}", streamName.get(), topicId.getValue() ); + StreamNotificationTopicRegistration streamReg = streamNotifRegistrationMap.get(streamName.get()); + streamReg.activateStream(); + for(SchemaPath notificationPath : notificationsToSubscribe){ + LOG.info("Notification listener is registering, Notification {}, TopicId {}", notificationPath, topicId.getValue() ); + streamReg.registerNotificationListenerTopic(notificationPath, topicId); + } + subscribedStreams = subscribedStreams + 1; + } + } + if(subscribedStreams > 0){ + joinTopicStatus = JoinTopicStatus.Up; + } } - registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe); } - if(registration != null){ - listenerRegistrationMap.put(topicId,registration); - joinTopicStatus = JoinTopicStatus.Up; - } final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); return immediateFuture(RpcResultBuilder.success(output).build()); - } - private void startSubscription(final SchemaPath path) { - final String streamName = resolveStream(path.getLastComponent()); - startSubscription(streamName); } private void resubscribeToActiveStreams() { - for (final String streamName : activeStreams) { - startSubscription(streamName); - } - } - - private synchronized void startSubscription(final String streamName) { - if(streamIsActive(streamName) == false){ - LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName)) - .build(); - netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - activeStreams.add(streamName); + for (StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ + streamReg.reActivateStream(); } } - private String resolveStream(final QName qName) { + private Optional resolveStream(final QName qName) { String streamName = null; for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { @@ -166,28 +164,35 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, break; } } - - return streamName; - } - - private boolean streamIsActive(final String streamName) { - return activeStreams.contains(streamName); + return Optional.fromNullable(streamName); } @Override public void onNotification(final DOMNotification notification) { - final ContainerNode topicNotification = Builders.containerBuilder() - .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) - .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) - .withChild(encapsulate(notification)) - .build(); - try { - domPublish.putNotification(new TopicDOMNotification(topicNotification)); - } catch (final InterruptedException e) { - throw Throwables.propagate(e); + SchemaPath notificationPath = notification.getType(); + LOG.info("Notification {} has come.",notification.getType()); + for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ + for(TopicId topicId : streamReg.getNotificationTopicIds(notificationPath)){ + publishNotification(notification, topicId); + LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); + } } } + private void publishNotification(final DOMNotification notification, TopicId topicId){ + final ContainerNode topicNotification = Builders.containerBuilder() + .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) + .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) + .withChild(encapsulate(notification)) + .build(); + try { + domPublish.putNotification(new TopicDOMNotification(topicNotification)); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } + } + private AnyXmlNode encapsulate(final DOMNotification body) { // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools final Document doc = XmlUtil.newDocument(); @@ -242,13 +247,17 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, final String regex = Util.wildcardToRegex(notificationPattern.getValue()); final Pattern pattern = Pattern.compile(regex); - return Util.expandQname(getAvailableNotifications(), pattern); + List availableNotifications = getAvailableNotifications(); + if(availableNotifications == null || availableNotifications.isEmpty()){ + return null; + } + return Util.expandQname(availableNotifications, pattern); } @Override public void close() throws Exception { - for(ListenerRegistration registration : listenerRegistrationMap.values()){ - registration.close(); + for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ + streamReg.deactivateStream(); } } @@ -268,4 +277,84 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, return qNs; } + private class StreamNotificationTopicRegistration{ + + final private String streamName; + final private DOMMountPoint netconfMount; + final private String nodeId; + final private NetconfEventSource notificationListener; + private boolean active; + + private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); + + public StreamNotificationTopicRegistration(final String streamName, final String nodeId, final DOMMountPoint netconfMount, NetconfEventSource notificationListener) { + this.streamName = streamName; + this.netconfMount = netconfMount; + this.nodeId = nodeId; + this.notificationListener = notificationListener; + this.active = false; + } + + public boolean isActive() { + return active; + } + + public void reActivateStream(){ + if(this.isActive()){ + LOG.info("Stream {} is reactivated active on node {}.", this.streamName, this.nodeId); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName)) + .build(); + netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); + } + } + + public void activateStream() { + if(this.isActive() == false){ + LOG.info("Stream {} is not active on node {}. Will subscribe.", this.streamName, this.nodeId); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName)) + .build(); + netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); + this.active = true; + } else { + LOG.info("Stream {} is now active on node {}", this.streamName, this.nodeId); + } + } + + public void deactivateStream() { + for(ListenerRegistration reg : notificationRegistrationMap.values()){ + reg.close(); + } + this.active = false; + } + + public String getStreamName() { + return streamName; + } + + public ArrayList getNotificationTopicIds(SchemaPath notificationPath){ + return notificationTopicMap.get(notificationPath); + } + + public void registerNotificationListenerTopic(SchemaPath notificationPath, TopicId topicId){ + final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + if(notificationPath != null && notifyService.isPresent()){ + ListenerRegistration registration = notifyService.get().registerNotificationListener(this.notificationListener,notificationPath); + notificationRegistrationMap.put(notificationPath, registration); + ArrayList topicIds = getNotificationTopicIds(notificationPath); + if(topicIds == null){ + topicIds = new ArrayList<>(); + topicIds.add(topicId); + } else { + if(topicIds.contains(topicId) == false){ + topicIds.add(topicId); + } + } + notificationTopicMap.put(notificationPath, topicIds); + } + } + + } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java index 7605a3eb34..dd64e77073 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java @@ -9,16 +9,14 @@ package org.opendaylight.controller.messagebus.eventsources.netconf; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.binding.api.MountPointService; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; @@ -45,9 +43,12 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSourceManager.class); + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class); private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())); private static final InstanceIdentifier NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class) .child(Topology.class, NETCONF_TOPOLOGY_KEY) @@ -106,7 +107,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto private void initialize(final DataBroker dataBroker){ Preconditions.checkNotNull(dataBroker); listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE); - LOGGER.info("NetconfEventSourceManager initialized."); + LOG.info("NetconfEventSourceManager initialized."); } private Map namespaceToStreamMapping(final List namespaceMapping) { @@ -122,7 +123,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - LOGGER.debug("[DataChangeEvent, DataObject>: {}]", event); + LOG.debug("[DataChangeEvent, DataObject>: {}]", event); for (final Map.Entry, DataObject> changeEntry : event.getCreatedData().entrySet()) { if (changeEntry.getValue() instanceof Node) { nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue()); @@ -144,11 +145,11 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto throw new IllegalStateException("Node is null"); } if ( isNetconfNode(node) == false ) { - LOGGER.debug("OnDataChanged Event. Not a Netconf node."); + LOG.debug("OnDataChanged Event. Not a Netconf node."); return; } if ( isEventSource(node) == false ) { - LOGGER.debug("OnDataChanged Event. Node an EventSource node."); + LOG.debug("OnDataChanged Event. Node an EventSource node."); return; } if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) { @@ -162,13 +163,12 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto private void createEventSource(final InstanceIdentifier key, final Node node) { final Optional netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId())); - final Optional bindingMount = bindingMounts.getMountPoint(key); - - if(netconfMount.isPresent() && bindingMount.isPresent()) { + if(netconfMount.isPresent()) { final NetconfEventSource netconfEventSource = - new NetconfEventSource(node, streamMap, netconfMount.get(), publishService, bindingMount.get()); + new NetconfEventSource(node, streamMap, netconfMount.get(), publishService); final EventSourceRegistration registration = eventSourceRegistry.registerEventSource(netconfEventSource); + LOG.info("Event source {} has been registered",node.getNodeId().getValue()); eventSourceRegistration.putIfAbsent(key, registration); } diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java index ed9025780a..5e1a07062d 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java @@ -7,12 +7,23 @@ */ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; +//import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.md.sal.binding.api.BindingService; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; @@ -39,21 +50,8 @@ import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import java.lang.reflect.Field; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; public class NetconfEventSourceTest { @@ -67,21 +65,18 @@ public class NetconfEventSourceTest { streamMap.put("uriStr1", "string2"); domMountPointMock = mock(DOMMountPoint.class); DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); - MountPoint mountPointMock = mock(MountPoint.class); RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); Optional onlyOptionalMock = (Optional) mock(Optional.class); NotificationsService notificationsServiceMock = mock(NotificationsService.class); - doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class); - doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get(); doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class); org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1"); doReturn(nodeId).when(node).getNodeId(); - netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock); + netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock); } @Test @@ -99,35 +94,6 @@ public class NetconfEventSourceTest { verify(dataObjectMock, times(2)).isConnected(); } - @Test - public void onDataChangedResubscribeTest() throws Exception{ - - InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class) - .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class); - - AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); - NetconfNode dataObjectMock = mock(NetconfNode.class); - Map dataChangeMap = new HashMap<>(); - dataChangeMap.put(brmIdent, dataObjectMock); - doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData(); - doReturn(new HashMap()).when(asyncDataChangeEventMock).getOriginalData(); - doReturn(true).when(dataObjectMock).isConnected(); - - Set localSet = getActiveStreams(); - localSet.add("activeStream1"); - - Optional optionalMock = (Optional) mock(Optional.class); - doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); - DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); - doReturn(domRpcServiceMock).when(optionalMock).get(); - CheckedFuture checkedFutureMock = mock(CheckedFuture.class); - doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); - - netconfEventSource.onDataChanged(asyncDataChangeEventMock); - verify(dataObjectMock, times(1)).isConnected(); - assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size()); - } - @Test public void joinTopicTest() throws Exception{ joinTopicTestHelper(); @@ -160,8 +126,8 @@ public class NetconfEventSourceTest { DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class); doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get(); - ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class); - doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class)); + ListenerRegistration listenerRegistrationMock = (ListenerRegistration)mock(ListenerRegistration.class); + doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class)); Optional optionalMock = (Optional) mock(Optional.class); doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); @@ -171,12 +137,4 @@ public class NetconfEventSourceTest { doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); } -//TODO: create Test for NetConfEventSource#onNotification - - private Set getActiveStreams() throws Exception{ - Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams"); - nesField.setAccessible(true); - return (Set) nesField.get(netconfEventSource); - } - } \ No newline at end of file -- 2.36.6