From 7aa383ecae87fd79f8f5dbece04f5d8c204e4767 Mon Sep 17 00:00:00 2001 From: Andrej Mak Date: Mon, 14 Mar 2016 11:37:48 +0100 Subject: [PATCH] Refactor NetconfEventSource Change-Id: I83e2d89c1b06abb757e941df3312108e2c6c2031 Signed-off-by: Andrej Mak --- ...nnectionNotificationTopicRegistration.java | 24 +- .../netconf/NetconfEventSource.java | 226 +++++++----------- .../netconf/NetconfEventSourceManager.java | 11 +- .../netconf/NetconfEventSourceMount.java | 164 +++++++++++++ .../NetconfEventSourceRegistration.java | 81 +++---- .../NotificationTopicRegistration.java | 39 ++- .../StreamNotificationTopicRegistration.java | 116 +++------ ...tionNotificationTopicRegistrationTest.java | 6 +- .../NetconfEventSourceManagerTest.java | 46 ++-- .../netconf/NetconfEventSourceMountTest.java | 148 ++++++++++++ .../netconf/NetconfEventSourceTest.java | 200 +++++++--------- .../netconf/NetconfTestUtils.java | 33 +++ ...reamNotificationTopicRegistrationTest.java | 64 ++--- 13 files changed, 645 insertions(+), 513 deletions(-) create mode 100644 netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMount.java create mode 100644 netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMountTest.java diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java index 598a260459..c9afd6ead9 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java @@ -11,7 +11,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; @@ -36,7 +36,7 @@ import org.w3c.dom.Element; /** * Topic registration on event-source-status-notification. */ -public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration { +class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration { private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class); @@ -48,7 +48,6 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/"; private final DOMNotificationListener domNotificationListener; - private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) { super(NotificationSourceType.ConnectionStatusChange, SourceName, @@ -84,31 +83,20 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe } @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { - if (checkNotificationPath(notificationPath) == false) { + if (!checkNotificationPath(notificationPath)) { LOG.debug("Bad SchemaPath for notification try to register"); return false; } - ArrayList topicIds = getNotificationTopicIds(notificationPath); - if (topicIds == null) { - topicIds = new ArrayList<>(); - topicIds.add(topicId); - } else { - if (topicIds.contains(topicId) == false) { - topicIds.add(topicId); - } - } + Set topicIds = getTopicsForNotification(notificationPath); + topicIds.add(topicId); notificationTopicMap.put(notificationPath, topicIds); return true; } - @Override ArrayList getNotificationTopicIds(SchemaPath notificationPath) { - return notificationTopicMap.get(notificationPath); - } - @Override synchronized void unRegisterNotificationTopic(TopicId topicId) { List notificationPathToRemove = new ArrayList<>(); for (SchemaPath notifKey : notificationTopicMap.keySet()) { - ArrayList topicList = notificationTopicMap.get(notifKey); + Set topicList = notificationTopicMap.get(notifKey); if (topicList != null) { topicList.remove(topicId); if (topicList.isEmpty()) { diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSource.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSource.java index 28300753b3..01d229bf24 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSource.java @@ -10,12 +10,17 @@ package org.opendaylight.netconf.messagebus.eventsources.netconf; import static com.google.common.util.concurrent.Futures.immediateFuture; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -23,21 +28,16 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.regex.Pattern; +import javax.annotation.Nullable; import javax.xml.stream.XMLStreamException; import javax.xml.transform.dom.DOMResult; import javax.xml.transform.dom.DOMSource; import org.opendaylight.controller.config.util.xml.XmlUtil; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMEvent; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification; import org.opendaylight.controller.messagebus.app.util.Util; import org.opendaylight.controller.messagebus.spi.EventSource; @@ -50,13 +50,8 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -75,7 +70,7 @@ import org.w3c.dom.Element; /** * NetconfEventSource serves as proxy between nodes and messagebus. Subscribers can join topic stream from this source. - * Then they will receive notifications that matches pattern specified by topic. + * Then they will receive notifications from device that matches pattern specified by topic. */ public class NetconfEventSource implements EventSource, DOMNotificationListener { @@ -90,89 +85,72 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener QName.create(TopicNotification.QNAME, "payload")); private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; - private final String nodeId; - private final Node node; - - private final DOMMountPoint netconfMount; - private final MountPoint mountPoint; private final DOMNotificationPublishService domPublish; private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName - private final List notificationTopicRegistrationList = new ArrayList<>(); + + /** + * Map notification uri -> registrations + */ + private final Multimap + notificationTopicRegistrations = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); + private final NetconfEventSourceMount mount; /** * Creates new NetconfEventSource for node. Topic notifications will be published via provided {@link DOMNotificationPublishService} - * @param node node * @param streamMap netconf streams from device - * @param netconfMount - * @param mountPoint * @param publishService publish service */ - public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, - final MountPoint mountPoint, final DOMNotificationPublishService publishService) { - this.netconfMount = Preconditions.checkNotNull(netconfMount); - this.mountPoint = Preconditions.checkNotNull(mountPoint); - this.node = Preconditions.checkNotNull(node); + public NetconfEventSource(final Map streamMap, NetconfEventSourceMount mount, final DOMNotificationPublishService publishService) { + this.mount = mount; this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); this.domPublish = Preconditions.checkNotNull(publishService); - this.nodeId = node.getNodeId().getValue(); this.initializeNotificationTopicRegistrationList(); - LOG.info("NetconfEventSource [{}] created.", this.nodeId); + LOG.info("NetconfEventSource [{}] created.", mount.getNodeId()); } + /** + * Creates {@link ConnectionNotificationTopicRegistration} for connection. Also creates + * {@link StreamNotificationTopicRegistration} for every prefix and available stream as defined in config file. + */ private void initializeNotificationTopicRegistrationList() { - notificationTopicRegistrationList - .add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); - Optional> streamMap = getAvailableStreams(); - if (streamMap.isPresent()) { - LOG.debug("Stream configuration compare..."); - for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { - final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); - LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); - if (streamMap.get().containsKey(streamName)) { - LOG.debug("Stream containig on device"); - notificationTopicRegistrationList - .add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName), urnPrefix, this)); - } + final ConnectionNotificationTopicRegistration cntr = new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this); + notificationTopicRegistrations + .put(cntr.getNotificationUrnPrefix(), cntr); + Map availableStreams = getAvailableStreams(); + LOG.debug("Stream configuration compare..."); + for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { + final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); + LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); + if (availableStreams.containsKey(streamName)) { + LOG.debug("Stream containig on device"); + notificationTopicRegistrations + .put(urnPrefix, new StreamNotificationTopicRegistration(availableStreams.get(streamName), urnPrefix, this)); } } } - private Optional> getAvailableStreams() { - - Map streamMap = null; - InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); - Optional dataBroker = this.mountPoint.getService(DataBroker.class); - - if (dataBroker.isPresent()) { - LOG.debug("GET Available streams ..."); - ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); - CheckedFuture, ReadFailedException> checkFeature = tx - .read(LogicalDatastoreType.OPERATIONAL, pathStream); - - try { - Optional streams = checkFeature.checkedGet(); - if (streams.isPresent()) { - streamMap = new HashMap<>(); - for (Stream stream : streams.get().getStream()) { - LOG.debug("*** find stream {}", stream.getName().getValue()); - streamMap.put(stream.getName().getValue(), stream); - } + private Map getAvailableStreams() { + Map streamMap = new HashMap<>(); + final List availableStreams; + try { + availableStreams = mount.getAvailableStreams(); + streamMap = Maps.uniqueIndex(availableStreams, new Function() { + @Nullable + @Override + public String apply(@Nullable Stream input) { + return input.getName().getValue(); } - } catch (ReadFailedException e) { - LOG.warn("Can not read streams for node {}", this.nodeId); - } - - } else { - LOG.warn("No databroker on node {}", this.nodeId); + }); + } catch (ReadFailedException e) { + LOG.warn("Can not read streams for node {}", mount.getNodeId()); } - - return Optional.fromNullable(streamMap); + return streamMap; } @Override public Future> joinTopic(final JoinTopicInput input) { - LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); + LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), mount.getNodeId()); final NotificationPattern notificationPattern = input.getNotificationPattern(); final List matchingNotifications = getMatchingNotifications(notificationPattern); return registerTopic(input.getTopicId(), matchingNotifications); @@ -180,7 +158,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener } @Override public Future> disJoinTopic(DisJoinTopicInput input) { - for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) { reg.unRegisterNotificationTopic(input.getTopicId()); } return Util.resultRpcSuccessFor((Void) null); @@ -188,53 +166,42 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe) { + Preconditions.checkNotNull(notificationsToSubscribe); LOG.debug("Join topic {} - register", topicId); JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; - if (notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false) { - LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size()); - final Optional notifyService = getDOMMountPoint() - .getService(DOMNotificationService.class); - if (notifyService.isPresent()) { - int registeredNotificationCount = 0; - for (SchemaPath schemaNotification : notificationsToSubscribe) { - for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { - LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), - schemaNotification.getLastComponent().getLocalName()); - if (reg.checkNotificationPath(schemaNotification)) { - LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), - topicId.getValue()); - boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); - if (regSuccess) { - registeredNotificationCount = registeredNotificationCount + 1; - } - } - } - } - if (registeredNotificationCount > 0) { - joinTopicStatus = JoinTopicStatus.Up; + + LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size()); + int registeredNotificationCount = 0; + for (SchemaPath schemaPath : notificationsToSubscribe) { + final Collection topicRegistrations = + notificationTopicRegistrations.get(schemaPath.getLastComponent().getNamespace().toString()); + for (NotificationTopicRegistration reg : topicRegistrations) { + LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), + topicId.getValue()); + boolean regSuccess = reg.registerNotificationTopic(schemaPath, topicId); + if (regSuccess) { + registeredNotificationCount = registeredNotificationCount + 1; } - } else { - LOG.warn("NO DOMNotification service on node {}", this.nodeId); } - } else { - LOG.debug("Notifications to subscribe has NOT found"); } - + if (registeredNotificationCount > 0) { + joinTopicStatus = JoinTopicStatus.Up; + } final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); return immediateFuture(RpcResultBuilder.success(output).build()); } public void reActivateStreams() { - for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { - LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); + for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) { + LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), mount.getNodeId()); reg.reActivateNotificationSource(); } } public void deActivateStreams() { - for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { - LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); + for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) { + LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), mount.getNodeId()); reg.deActivateNotificationSource(); } } @@ -245,21 +212,14 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener if (notification instanceof DOMEvent) { notificationEventTime = ((DOMEvent) notification).getEventTime(); } - for (NotificationTopicRegistration notifReg : notificationTopicRegistrationList) { - ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); - if (topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false) { - - if (notifReg instanceof StreamNotificationTopicRegistration) { - StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration) notifReg; - streamReg.setLastEventTime(notificationEventTime); - } - - for (TopicId topicId : topicIdsForNotification) { - publishNotification(notification, topicId); - LOG.debug("Notification {} has been published for TopicId {}", notification.getType(), - topicId.getValue()); - } - + final String namespace = notification.getType().getLastComponent().getNamespace().toString(); + for (NotificationTopicRegistration notifReg : notificationTopicRegistrations.get(namespace)) { + notifReg.setLastEventTime(notificationEventTime); + Set topicIdsForNotification = notifReg.getTopicsForNotification(notificationPath); + for (TopicId topicId : topicIdsForNotification) { + publishNotification(notification, topicId); + LOG.debug("Notification {} has been published for TopicId {}", notification.getType(), + topicId.getValue()); } } } @@ -267,7 +227,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener private void publishNotification(final DOMNotification notification, TopicId topicId) { final ContainerNode topicNotification = Builders.containerBuilder().withNodeIdentifier(TOPIC_NOTIFICATION_ARG) .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) - .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)).withChild(encapsulate(notification)) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, mount.getNodeId())).withChild(encapsulate(notification)) .build(); try { domPublish.putNotification(new TopicDOMNotification(topicNotification)); @@ -284,7 +244,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener final DOMResult result = new DOMResult(element); - final SchemaContext context = getDOMMountPoint().getSchemaContext(); + final SchemaContext context = mount.getSchemaContext(); final SchemaPath schemaPath = body.getType(); try { NetconfUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); @@ -305,20 +265,17 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener final Pattern pattern = Pattern.compile(regex); List availableNotifications = getAvailableNotifications(); - if (availableNotifications == null || availableNotifications.isEmpty()) { - return null; - } return Util.expandQname(availableNotifications, pattern); } @Override public void close() throws Exception { - for (NotificationTopicRegistration streamReg : notificationTopicRegistrationList) { + for (NotificationTopicRegistration streamReg : notificationTopicRegistrations.values()) { streamReg.close(); } } @Override public NodeKey getSourceNodeKey() { - return getNode().getKey(); + return mount.getNode().getKey(); } @Override public List getAvailableNotifications() { @@ -327,8 +284,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener // add Event Source Connection status notification availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); - // FIXME: use SchemaContextListener to get changes asynchronously - final Set availableNotifications = getDOMMountPoint().getSchemaContext() + final Set availableNotifications = mount.getSchemaContext() .getNotifications(); // add all known notifications from netconf device for (final NotificationDefinition nd : availableNotifications) { @@ -337,20 +293,8 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener return availNotifList; } - public Node getNode() { - return node; - } - - DOMMountPoint getDOMMountPoint() { - return netconfMount; - } - - MountPoint getMountPoint() { - return mountPoint; - } - - NetconfNode getNetconfNode() { - return node.getAugmentation(NetconfNode.class); + NetconfEventSourceMount getMount() { + return mount; } } diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManager.java index e8a02e112d..f7b60e9b11 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManager.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManager.java @@ -52,7 +52,6 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto private final ConcurrentHashMap, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>(); private final DOMNotificationPublishService publishService; private final DOMMountPointService domMounts; - private final MountPointService mountPointService; private ListenerRegistration listenerRegistration; private final EventSourceRegistry eventSourceRegistry; @@ -76,12 +75,10 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto Preconditions.checkNotNull(domPublish); Preconditions.checkNotNull(domMount); - Preconditions.checkNotNull(bindingMount); Preconditions.checkNotNull(eventSourceRegistry); Preconditions.checkNotNull(namespaceMapping); this.streamMap = namespaceToStreamMapping(namespaceMapping); this.domMounts = domMount; - this.mountPointService = bindingMount; this.publishService = domPublish; this.eventSourceRegistry = eventSourceRegistry; } @@ -130,7 +127,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto private void nodeCreated(final InstanceIdentifier key, final Node node) { Preconditions.checkNotNull(key); - if (validateNode(node) == false) { + if (!validateNode(node)) { LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString()); return; } @@ -146,7 +143,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto private void nodeUpdated(final InstanceIdentifier key, final Node node) { Preconditions.checkNotNull(key); - if (validateNode(node) == false) { + if (!validateNode(node)) { LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString()); return; } @@ -192,10 +189,6 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto return eventSourceRegistry; } - MountPointService getMountPointService() { - return mountPointService; - } - private boolean isNetconfNode(final Node node) { return node.getAugmentation(NetconfNode.class) != null; } diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMount.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMount.java new file mode 100644 index 0000000000..bd1f345061 --- /dev/null +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMount.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.messagebus.eventsources.netconf; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import javassist.ClassPool; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.api.DOMService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator; +import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext; +import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext; +import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +/** + * Facade of mounted netconf device + */ +class NetconfEventSourceMount { + + private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY; + private static final YangInstanceIdentifier STREAMS_PATH = YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build(); + private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath + .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + + static{ + final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create(); + moduleInfoBackedContext.addModuleInfos(Collections.singletonList(org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.$YangModuleInfoImpl.getInstance())); + final Optional schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext(); + Preconditions.checkState(schemaContextOptional.isPresent()); + SchemaContext NOTIFICATIONS_SCHEMA_CTX = schemaContextOptional.get(); + + final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault()); + CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist)); + CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, NOTIFICATIONS_SCHEMA_CTX)); + } + + private final DOMMountPoint mountPoint; + private final DOMRpcService rpcService; + private final DOMNotificationService notificationService; + private final DOMDataBroker dataBroker; + private final Node node; + private final String nodeId; + + public NetconfEventSourceMount(final Node node, final DOMMountPoint mountPoint) { + this.mountPoint = mountPoint; + this.node = node; + this.nodeId = node.getNodeId().getValue(); + this.rpcService = getService(mountPoint, DOMRpcService.class); + this.notificationService = getService(mountPoint, DOMNotificationService.class); + this.dataBroker = getService(mountPoint, DOMDataBroker.class); + } + + private static T getService(DOMMountPoint mountPoint, Class service) { + final Optional optional = mountPoint.getService(service); + Preconditions.checkState(optional.isPresent(), "Service not present on mount point: %s", service.getName()); + return optional.get(); + } + + Node getNode() { + return node; + } + + String getNodeId() { + return nodeId; + } + + /** + * Invokes create-subscription rpc on mounted device stream. If lastEventTime is provided and stream supports replay, + * rpc will be invoked with start time parameter. + * @param stream stream + * @param lastEventTime last event time + * @return rpc result + */ + CheckedFuture invokeCreateSubscription(final Stream stream, final Optional lastEventTime) { + final CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder() + .setStream(stream.getName()); + if(lastEventTime.isPresent() && stream.isReplaySupport()) { + final ZonedDateTime dateTime = lastEventTime.get().toInstant().atZone(ZoneId.systemDefault()); + final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime); + inputBuilder.setStartTime(new DateAndTime(formattedDate)); + } + final CreateSubscriptionInput input = inputBuilder.build(); + final ContainerNode nnInput = CODEC_REGISTRY.toNormalizedNodeRpcData(input); + return rpcService.invokeRpc(CREATE_SUBSCRIPTION, nnInput); + } + + /** + * Invokes create-subscription rpc on mounted device stream. + * @param stream stream + * @return rpc result + */ + CheckedFuture invokeCreateSubscription(final Stream stream) { + return invokeCreateSubscription(stream, Optional.absent()); + } + + /** + * Returns list of streams avaliable on device + * @return list of streams + * @throws ReadFailedException if data read fails + */ + List getAvailableStreams() throws ReadFailedException { + DOMDataReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); + CheckedFuture>, ReadFailedException> checkFeature = tx + .read(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH); + Optional> streams = checkFeature.checkedGet(); + if (streams.isPresent()) { + Streams s = (Streams) CODEC_REGISTRY.fromNormalizedNode(STREAMS_PATH, streams.get()).getValue(); + return s.getStream(); + } + return Collections.emptyList(); + } + + SchemaContext getSchemaContext() { + return mountPoint.getSchemaContext(); + } + + /** + * Registers notification listener to receive a set of notifications. + * @see DOMNotificationService#registerNotificationListener(DOMNotificationListener, SchemaPath...) + * @param listener listener + * @param notificationPath notification path + * @return + */ + ListenerRegistration registerNotificationListener(DOMNotificationListener listener, SchemaPath notificationPath) { + return notificationService.registerNotificationListener(listener, notificationPath); + } + +} diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java index e45f9c918c..52b8a6a085 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java @@ -40,7 +40,6 @@ public class NetconfEventSourceRegistration implements AutoCloseable { private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification"; private final Node node; - private final InstanceIdentifier instanceIdent; private final NetconfEventSourceManager netconfEventSourceManager; private ConnectionStatus currentNetconfConnStatus; private EventSourceRegistration eventSourceRegistration; @@ -50,11 +49,10 @@ public class NetconfEventSourceRegistration implements AutoCloseable { Preconditions.checkNotNull(instanceIdent); Preconditions.checkNotNull(node); Preconditions.checkNotNull(netconfEventSourceManager); - if (isEventSource(node) == false) { + if (!isEventSource(node)) { return null; } - NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, - netconfEventSourceManager); + NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(node, netconfEventSourceManager); nesr.updateStatus(); LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...", node.getNodeId().getValue()); return nesr; @@ -81,16 +79,11 @@ public class NetconfEventSourceRegistration implements AutoCloseable { return false; } - private NetconfEventSourceRegistration(final InstanceIdentifier instanceIdent, final Node node, - final NetconfEventSourceManager netconfEventSourceManager) { - this.instanceIdent = instanceIdent; + private NetconfEventSourceRegistration(final Node node, final NetconfEventSourceManager netconfEventSourceManager) { this.node = node; this.netconfEventSourceManager = netconfEventSourceManager; this.eventSourceRegistration = null; - } - - public Node getNode() { - return node; + this.currentNetconfConnStatus = ConnectionStatus.Connecting; } Optional> getEventSourceRegistration() { @@ -111,61 +104,49 @@ public class NetconfEventSourceRegistration implements AutoCloseable { } private boolean checkConnectionStatusType(ConnectionStatus status) { - if (status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting - || status == ConnectionStatus.UnableToConnect) { - return true; - } - return false; + return status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting + || status == ConnectionStatus.UnableToConnect; } private void changeStatus(ConnectionStatus newStatus) { Preconditions.checkNotNull(newStatus); - if (checkConnectionStatusType(newStatus) == false) { + Preconditions.checkState(this.currentNetconfConnStatus != null); + if (!checkConnectionStatusType(newStatus)) { throw new IllegalStateException("Unknown new Netconf Connection Status"); } - if (this.currentNetconfConnStatus == null) { - if (newStatus == ConnectionStatus.Connected) { - registrationEventSource(); - } - } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting) { - if (newStatus == ConnectionStatus.Connected) { - if (this.eventSourceRegistration == null) { - registrationEventSource(); - } else { - // reactivate stream on registered event source (invoke publish notification about connection) - this.eventSourceRegistration.getInstance().reActivateStreams(); + switch (this.currentNetconfConnStatus) { + case Connecting: + case UnableToConnect: + if (newStatus == ConnectionStatus.Connected) { + if (this.eventSourceRegistration == null) { + registrationEventSource(); + } else { + // reactivate stream on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().reActivateStreams(); + } } - } - } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) { - - if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) { - // deactivate streams on registered event source (invoke publish notification about connection) - this.eventSourceRegistration.getInstance().deActivateStreams(); - } - } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect) { - if (newStatus == ConnectionStatus.Connected) { - if (this.eventSourceRegistration == null) { - registrationEventSource(); - } else { - // reactivate stream on registered event source (invoke publish notification about connection) - this.eventSourceRegistration.getInstance().reActivateStreams(); + break; + case Connected: + if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) { + // deactivate streams on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().deActivateStreams(); } - } - } else { - throw new IllegalStateException("Unknown current Netconf Connection Status"); + break; + default: + throw new IllegalStateException("Unknown current Netconf Connection Status"); } this.currentNetconfConnStatus = newStatus; } private void registrationEventSource() { - final Optional mountPoint = netconfEventSourceManager.getMountPointService() - .getMountPoint(instanceIdent); final Optional domMountPoint = netconfEventSourceManager.getDomMounts() .getMountPoint(domMountPath(node.getNodeId())); EventSourceRegistration registration = null; - if (domMountPoint.isPresent() && mountPoint.isPresent()) { - final NetconfEventSource netconfEventSource = new NetconfEventSource(node, - netconfEventSourceManager.getStreamMap(), domMountPoint.get(), mountPoint.get(), + if (domMountPoint.isPresent()/* && mountPoint.isPresent()*/) { + NetconfEventSourceMount mount = new NetconfEventSourceMount(node, domMountPoint.get()); + final NetconfEventSource netconfEventSource = new NetconfEventSource( + netconfEventSourceManager.getStreamMap(), + mount, netconfEventSourceManager.getPublishService()); registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource); LOG.info("Event source {} has been registered", node.getNodeId().getValue()); diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NotificationTopicRegistration.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NotificationTopicRegistration.java index 07e2d3b564..b7bfda3432 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NotificationTopicRegistration.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NotificationTopicRegistration.java @@ -7,7 +7,11 @@ */ package org.opendaylight.netconf.messagebus.eventsources.netconf; -import java.util.ArrayList; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; @@ -16,13 +20,13 @@ import org.slf4j.LoggerFactory; /** * Notification topic registration. */ -public abstract class NotificationTopicRegistration implements AutoCloseable { +abstract class NotificationTopicRegistration implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class); public enum NotificationSourceType { NetconfDeviceStream, - ConnectionStatusChange; + ConnectionStatusChange } private boolean active; @@ -30,6 +34,8 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { private final String sourceName; private final String notificationUrnPrefix; private boolean replaySupported; + private Date lastEventTime; + protected final ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) { @@ -60,12 +66,22 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { return notificationUrnPrefix; } + /** + * Returns registered topics for given notification path. + * @param notificationPath path + * @return topicIds + */ + Set getTopicsForNotification(SchemaPath notificationPath) { + final Set topicIds = notificationTopicMap.get(notificationPath); + return topicIds != null ? topicIds : Sets.newHashSet(); + } + /** * Checks, if notification is from namespace belonging to this registration. * @param notificationPath path * @return true, if notification belongs to registration namespace */ - public boolean checkNotificationPath(SchemaPath notificationPath) { + boolean checkNotificationPath(SchemaPath notificationPath) { if (notificationPath == null) { return false; } @@ -75,6 +91,14 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { return nameSpace.startsWith(getNotificationUrnPrefix()); } + Optional getLastEventTime() { + return Optional.fromNullable(lastEventTime); + } + + void setLastEventTime(Date lastEventTime) { + this.lastEventTime = lastEventTime; + } + abstract void activateNotificationSource(); abstract void deActivateNotificationSource(); @@ -96,13 +120,6 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { */ abstract void unRegisterNotificationTopic(TopicId topicId); - /** - * Returns registered topics for given path. - * @param notificationPath path - * @return topicIds - */ - abstract ArrayList getNotificationTopicIds(SchemaPath notificationPath); - public boolean isReplaySupported() { return replaySupported; } diff --git a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java index b01a57e704..d163a777cd 100644 --- a/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java +++ b/netconf/messagebus-netconf/src/main/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java @@ -7,54 +7,33 @@ */ package org.opendaylight.netconf.messagebus.eventsources.netconf; -import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import java.util.ArrayList; -import java.util.Date; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Topic registration for notification stream. + * Topic registration for notification with specified namespace from stream. */ -public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { +class StreamNotificationTopicRegistration extends NotificationTopicRegistration { private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); - private static final NodeIdentifier STREAM_QNAME = NodeIdentifier.create( - QName.create(CreateSubscriptionInput.QNAME, "stream")); - private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath - .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); - private static final NodeIdentifier START_TIME_SUBSCRIPTION = NodeIdentifier.create( - QName.create(CreateSubscriptionInput.QNAME, "startTime")); - private static final NodeIdentifier CREATE_SUBSCRIPTION_INPUT = NodeIdentifier.create( - CreateSubscriptionInput.QNAME); - - final private DOMMountPoint domMountPoint; - final private String nodeId; - final private NetconfEventSource netconfEventSource; - final private Stream stream; - private Date lastEventTime; - - private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); - private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); + + private final String nodeId; + private final NetconfEventSource netconfEventSource; + private final NetconfEventSourceMount mountPoint; + private final ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); + private final Stream stream; /** * Creates registration to notification stream. @@ -65,12 +44,11 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); - this.domMountPoint = netconfEventSource.getDOMMountPoint(); - this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); this.netconfEventSource = netconfEventSource; + this.mountPoint = netconfEventSource.getMount(); + this.nodeId = mountPoint.getNode().getNodeId().getValue(); this.stream = stream; - this.lastEventTime = null; - setReplaySupported(this.stream.isReplaySupport()); + setReplaySupported(stream.isReplaySupport()); setActive(false); LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); } @@ -79,20 +57,15 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist * Subscribes to notification stream associated with this registration. */ void activateNotificationSource() { - if (isActive() == false) { + if (!isActive()) { LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); - final ContainerNode input = Builders.containerBuilder() - .withNodeIdentifier(CREATE_SUBSCRIPTION_INPUT) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build(); - CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get() - .invokeRpc(CREATE_SUBSCRIPTION, input); + final CheckedFuture result = mountPoint.invokeCreateSubscription(stream); try { - csFuture.checkedGet(); + result.checkedGet(); setActive(true); } catch (DOMRpcException e) { LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); setActive(false); - return; } } else { LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); @@ -106,22 +79,14 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist void reActivateNotificationSource() { if (isActive()) { LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); - DataContainerNodeAttrBuilder inputBuilder = Builders.containerBuilder() - .withNodeIdentifier(CREATE_SUBSCRIPTION_INPUT) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); - if (isReplaySupported() && this.getLastEventTime() != null) { - inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); - } - final ContainerNode input = inputBuilder.build(); - CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get() - .invokeRpc(CREATE_SUBSCRIPTION, input); + final CheckedFuture result; + result = mountPoint.invokeCreateSubscription(stream, getLastEventTime()); try { - csFuture.checkedGet(); + result.checkedGet(); setActive(true); } catch (DOMRpcException e) { LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); setActive(false); - return; } } } @@ -132,7 +97,7 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist private void closeStream() { if (isActive()) { - for (ListenerRegistration reg : notificationRegistrationMap.values()) { + for (ListenerRegistration reg : notificationRegistrationMap.values()) { reg.close(); } notificationRegistrationMap.clear(); @@ -145,42 +110,23 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist return getSourceName(); } - @Override ArrayList getNotificationTopicIds(SchemaPath notificationPath) { - return notificationTopicMap.get(notificationPath); - } - @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { - - if (checkNotificationPath(notificationPath) == false) { + if (!checkNotificationPath(notificationPath)) { LOG.debug("Bad SchemaPath for notification try to register"); return false; } - final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); - if (notifyService.isPresent() == false) { - LOG.debug("DOMNotificationService is not present"); - return false; - } - activateNotificationSource(); - if (isActive() == false) { + if (!isActive()) { LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); return false; } - ListenerRegistration registration = notifyService.get() - .registerNotificationListener(this.netconfEventSource, notificationPath); + ListenerRegistration registration = mountPoint.registerNotificationListener(netconfEventSource, notificationPath); notificationRegistrationMap.put(notificationPath, registration); - ArrayList topicIds = getNotificationTopicIds(notificationPath); - if (topicIds == null) { - topicIds = new ArrayList<>(); - topicIds.add(topicId); - } else { - if (topicIds.contains(topicId) == false) { - topicIds.add(topicId); - } - } + Set topicIds = getTopicsForNotification(notificationPath); + topicIds.add(topicId); notificationTopicMap.put(notificationPath, topicIds); return true; @@ -189,7 +135,7 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist @Override synchronized void unRegisterNotificationTopic(TopicId topicId) { List notificationPathToRemove = new ArrayList<>(); for (SchemaPath notifKey : notificationTopicMap.keySet()) { - ArrayList topicList = notificationTopicMap.get(notifKey); + Set topicList = notificationTopicMap.get(notifKey); if (topicList != null) { topicList.remove(topicId); if (topicList.isEmpty()) { @@ -199,21 +145,13 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist } for (SchemaPath notifKey : notificationPathToRemove) { notificationTopicMap.remove(notifKey); - ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); + ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); if (reg != null) { reg.close(); } } } - Optional getLastEventTime() { - return Optional.fromNullable(lastEventTime); - } - - void setLastEventTime(Date lastEventTime) { - this.lastEventTime = lastEventTime; - } - @Override public void close() throws Exception { closeStream(); } diff --git a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistrationTest.java b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistrationTest.java index b23e0f59f6..a19f13028e 100644 --- a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistrationTest.java +++ b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistrationTest.java @@ -11,8 +11,8 @@ package org.opendaylight.netconf.messagebus.eventsources.netconf; import static org.hamcrest.CoreMatchers.hasItems; import static org.mockito.Mockito.verify; -import java.util.ArrayList; import java.util.Collection; +import java.util.Set; import javax.xml.transform.dom.DOMSource; import org.junit.Assert; import org.junit.Before; @@ -71,12 +71,12 @@ public class ConnectionNotificationTopicRegistrationTest { final TopicId topic1 = registerTopic("topic1"); final TopicId topic2 = registerTopic("topic2"); final TopicId topic3 = registerTopic("topic3"); - final ArrayList notificationTopicIds = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + final Set notificationTopicIds = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); Assert.assertNotNull(notificationTopicIds); Assert.assertThat(notificationTopicIds, hasItems(topic1, topic2, topic3)); registration.unRegisterNotificationTopic(topic3); - final ArrayList afterUnregister = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + final Set afterUnregister = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); Assert.assertNotNull(afterUnregister); Assert.assertThat(afterUnregister, hasItems(topic1, topic2)); Assert.assertFalse(afterUnregister.contains(topic3)); diff --git a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java index 983074946b..39cdb10975 100644 --- a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java +++ b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.verify; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -26,18 +27,19 @@ import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.binding.api.MountPointService; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.messagebus.spi.EventSource; -import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; @@ -48,6 +50,7 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class NetconfEventSourceManagerTest { @@ -68,7 +71,6 @@ public class NetconfEventSourceManagerTest { DataBroker dataBrokerMock = mock(DataBroker.class); DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); domMountPointServiceMock = mock(DOMMountPointService.class); - mountPointServiceMock = mock(MountPointService.class); eventSourceTopologyMock = mock(EventSourceRegistry.class); rpcProviderRegistryMock = mock(RpcProviderRegistry.class); eventSourceRegistry = mock(EventSourceRegistry.class); @@ -78,36 +80,20 @@ public class NetconfEventSourceManagerTest { doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq( AsyncDataBroker.DataChangeScope.SUBTREE)); - Optional optionalDomMountServiceMock = (Optional) mock(Optional.class); - doReturn(true).when(optionalDomMountServiceMock).isPresent(); - doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); - DOMMountPoint domMountPointMock = mock(DOMMountPoint.class); - doReturn(domMountPointMock).when(optionalDomMountServiceMock).get(); - - - Optional optionalBindingMountMock = mock(Optional.class); - doReturn(true).when(optionalBindingMountMock).isPresent(); - - MountPoint mountPointMock = mock(MountPoint.class); - doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class)); - doReturn(mountPointMock).when(optionalBindingMountMock).get(); - - Optional optionalMpDataBroker = mock(Optional.class); - DataBroker mpDataBroker = mock(DataBroker.class); - doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); - doReturn(true).when(optionalMpDataBroker).isPresent(); - doReturn(mpDataBroker).when(optionalMpDataBroker).get(); + Optional optionalDomMountServiceMock = Optional.of(domMountPointMock); + doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); + DOMDataBroker mpDataBroker = mock(DOMDataBroker.class); + doReturn(Optional.of(mpDataBroker)).when(domMountPointMock).getService(DOMDataBroker.class); + doReturn(Optional.of(mock(DOMRpcService.class))).when(domMountPointMock).getService(DOMRpcService.class); + doReturn(Optional.of(mock(DOMNotificationService.class))).when(domMountPointMock).getService(DOMNotificationService.class); - ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); + DOMDataReadOnlyTransaction rtx = mock(DOMDataReadOnlyTransaction.class); doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); - CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); - InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); - doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); - Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); - doReturn(avStreams).when(checkFeature).checkedGet(); + CheckedFuture>, ReadFailedException> checkFeature = Futures.immediateCheckedFuture(Optional.of(NetconfTestUtils.getStreamsNode("stream-1"))); - EventSourceRegistration esrMock = mock(EventSourceRegistration.class); + YangInstanceIdentifier pathStream = YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build(); + doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); netconfEventSourceManager = NetconfEventSourceManager diff --git a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMountTest.java b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMountTest.java new file mode 100644 index 0000000000..369d6b4df8 --- /dev/null +++ b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceMountTest.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.messagebus.eventsources.netconf; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Date; +import java.util.List; +import javax.annotation.Nullable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class NetconfEventSourceMountTest { + + public static final String STREAM_1 = "stream-1"; + public static final String STREAM_2 = "stream-2"; + @Mock + private DOMMountPoint domMountPoint; + @Mock + DOMDataBroker dataBroker; + @Mock + DOMRpcService rpcService; + @Mock + private DOMDataReadOnlyTransaction tx; + private NetconfEventSourceMount mount; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + doReturn(Optional.of(dataBroker)).when(domMountPoint).getService(DOMDataBroker.class); + doReturn(Optional.of(rpcService)).when(domMountPoint).getService(DOMRpcService.class); + doReturn(Optional.of(mock(DOMNotificationService.class))).when(domMountPoint).getService(DOMNotificationService.class); + doReturn(tx).when(dataBroker).newReadOnlyTransaction(); + final YangInstanceIdentifier path = YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build(); + final NormalizedNode streamsNode = NetconfTestUtils.getStreamsNode(STREAM_1, STREAM_2); + doReturn(Futures.immediateCheckedFuture(Optional.of(streamsNode))).when(tx).read(LogicalDatastoreType.OPERATIONAL, path); + mount = new NetconfEventSourceMount(NetconfTestUtils.getNode("node-1"), domMountPoint); + } + + @Test + public void testInvokeCreateSubscription() throws Exception { + Stream stream = new StreamBuilder() + .setName(new StreamNameType(STREAM_1)) + .build(); + mount.invokeCreateSubscription(stream, Optional.absent()); + final SchemaPath type = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + ArgumentCaptor captor = ArgumentCaptor.forClass(ContainerNode.class); + verify(rpcService).invokeRpc(eq(type), captor.capture()); + Assert.assertEquals(STREAM_1, getStreamName(captor.getValue())); + } + + @Test + public void testInvokeCreateSubscription1() throws Exception { + Stream stream = new StreamBuilder() + .setName(new StreamNameType(STREAM_1)) + .setReplaySupport(true) + .build(); + final Date date = new Date(); + mount.invokeCreateSubscription(stream, Optional.of(date)); + final SchemaPath type = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + ArgumentCaptor captor = ArgumentCaptor.forClass(ContainerNode.class); + verify(rpcService).invokeRpc(eq(type), captor.capture()); + Assert.assertEquals(STREAM_1, getStreamName(captor.getValue())); + final String expDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(date.toInstant().atZone(ZoneId.systemDefault())); + final Optional actual = (Optional) getDate(captor.getValue()); + Assert.assertTrue(actual.isPresent()); + String actualDate = (String) actual.get().getValue(); + Assert.assertEquals(expDate, actualDate); + } + + @Test + public void testInvokeCreateSubscription2() throws Exception { + Stream stream = new StreamBuilder() + .setName(new StreamNameType(STREAM_1)) + .setReplaySupport(true) + .build(); + mount.invokeCreateSubscription(stream, Optional.absent()); + final SchemaPath type = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + ArgumentCaptor captor = ArgumentCaptor.forClass(ContainerNode.class); + verify(rpcService).invokeRpc(eq(type), captor.capture()); + Assert.assertEquals(STREAM_1, getStreamName(captor.getValue())); + final Optional date = (Optional) getDate(captor.getValue()); + Assert.assertFalse(date.isPresent()); + + } + + @Test + public void testGetAvailableStreams() throws Exception { + final List availableStreams = mount.getAvailableStreams(); + Assert.assertEquals(2, availableStreams.size()); + final List streamNames = Lists.transform(availableStreams, new Function() { + @Nullable + @Override + public String apply(@Nullable Stream input) { + return input.getName().getValue(); + } + }); + streamNames.contains(STREAM_1); + streamNames.contains(STREAM_2); + } + + private String getStreamName(ContainerNode value) { + YangInstanceIdentifier.NodeIdentifier STREAM = new YangInstanceIdentifier.NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME, "stream")); + return (String) value.getChild(STREAM).get().getValue(); + } + + private Optional getDate(ContainerNode value) { + YangInstanceIdentifier.NodeIdentifier START_TIME = new YangInstanceIdentifier.NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME, "startTime")); + return value.getChild(START_TIME); + } +} \ No newline at end of file diff --git a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceTest.java b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceTest.java index 42cf92762d..1a439b9ff6 100644 --- a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceTest.java +++ b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfEventSourceTest.java @@ -7,179 +7,153 @@ */ package org.opendaylight.netconf.messagebus.eventsources.netconf; -import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; import static org.mockito.Mockito.verify; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import java.net.URI; -import java.util.Collections; +import com.google.common.util.concurrent.Futures; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.opendaylight.controller.md.sal.binding.api.BindingService; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.controller.md.sal.dom.api.DOMService; import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification; -import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaPath; public class NetconfEventSourceTest { + + private static final SchemaPath notification1Path = SchemaPath.create(true, QName.create("ns1", "1970-01-15", "not1")); + private static final SchemaPath notification2Path = SchemaPath.create(true, QName.create("ns2", "1980-02-18", "not2")); + NetconfEventSource netconfEventSource; - DOMMountPoint domMountPointMock; - MountPoint mountPointMock; - JoinTopicInput joinTopicInputMock; + + @Mock DOMNotificationPublishService domNotificationPublishServiceMock; - DOMNotification notification; + @Mock + DOMNotification matchnigNotification; + @Mock + DOMNotification nonMachtingNotification; + @Mock + NetconfEventSourceMount mount; @Before public void setUp() throws Exception { - Map streamMap = new HashMap<>(); - streamMap.put("uriStr1", "string2"); - domMountPointMock = mock(DOMMountPoint.class); - mountPointMock = mock(MountPoint.class); - domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); - RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); - Optional onlyOptionalMock = (Optional) mock(Optional.class); - NotificationsService notificationsServiceMock = mock(NotificationsService.class); - doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); - - final NotificationDefinition notificationDefinitionMock = getNotificationDefinitionMock("urn:cisco:params:xml:ns:yang:messagebus:eventsource", "2014-12-02", "event-source-status"); - Set notifications = Collections.singleton(notificationDefinitionMock); - ContainerNode node = Builders.containerBuilder() - .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(QName.create("notification-namespace", "2016-02-17", "name"))) - .build(); - notification = mock(DOMNotification.class); - doReturn(node).when(notification).getBody(); - doReturn(notificationDefinitionMock.getPath()).when(notification).getType(); - SchemaContext schema = mock(SchemaContext.class); - doReturn(notifications).when(schema).getNotifications(); - doReturn(schema).when(domMountPointMock).getSchemaContext(); - doReturn(Optional.of(mock(DOMNotificationService.class))).when(domMountPointMock).getService(DOMNotificationService.class); - - Optional optionalMpDataBroker = (Optional) mock(Optional.class); - DataBroker mpDataBroker = mock(DataBroker.class); - doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); - doReturn(true).when(optionalMpDataBroker).isPresent(); - doReturn(mpDataBroker).when(optionalMpDataBroker).get(); - - ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); - doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); - CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); - InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); - doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); - Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); - doReturn(avStreams).when(checkFeature).checkedGet(); + MockitoAnnotations.initMocks(this); + //init notification mocks + doReturn(notification1Path).when(matchnigNotification).getType(); + doReturn(notification2Path).when(nonMachtingNotification).getType(); + DataContainerNodeAttrBuilder body = Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("ns1", "1970-01-15", "not1data"))); + doReturn(body.build()).when(matchnigNotification).getBody(); + //init schema context mock + Set notifications = new HashSet<>(); + notifications.add(getNotificationDefinitionMock(notification1Path.getLastComponent())); + notifications.add(getNotificationDefinitionMock(notification2Path.getLastComponent())); + SchemaContext schemaContext = mock(SchemaContext.class); + doReturn(notifications).when(schemaContext).getNotifications(); + //init mount point mock + List streams = new ArrayList<>(); + streams.add(createStream("stream-1")); + streams.add(createStream("stream-2")); + doReturn(streams).when(mount).getAvailableStreams(); + doReturn(schemaContext).when(mount).getSchemaContext(); + doReturn(Futures.immediateCheckedFuture(null)).when(mount).invokeCreateSubscription(any(), any()); + doReturn(Futures.immediateCheckedFuture(null)).when(mount).invokeCreateSubscription(any()); + doReturn(mock(ListenerRegistration.class)).when(mount).registerNotificationListener(any(), any()); + final Node nodeId1 = NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix); + doReturn(nodeId1).when(mount).getNode(); + Map streamMap = new HashMap<>(); + streamMap.put(notification1Path.getLastComponent().getNamespace().toString(), "stream-1"); netconfEventSource = new NetconfEventSource( - NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, - NetconfTestUtils.notification_capability_prefix), streamMap, - domMountPointMock, - mountPointMock , + mount, domNotificationPublishServiceMock); } @Test - public void joinTopicTest() throws Exception{ - joinTopicTestHelper(); - assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock)); - } - - @Test - public void testOnNotification() throws Exception { + public void testJoinTopicOnNotification() throws Exception { final JoinTopicInput topic1 = new JoinTopicInputBuilder() .setTopicId(TopicId.getDefaultInstance("topic1")) - .setNotificationPattern(NotificationPattern.getDefaultInstance(".*")) + .setNotificationPattern(NotificationPattern.getDefaultInstance(".*ns1")) .build(); netconfEventSource.joinTopic(topic1); - ArgumentCaptor captor = ArgumentCaptor.forClass(DOMNotification.class); - netconfEventSource.onNotification(notification); + //handle notification matching topic namespace + netconfEventSource.onNotification(matchnigNotification); + //handle notification that does not match topic namespace + netconfEventSource.onNotification(nonMachtingNotification); + //only matching notification should be published verify(domNotificationPublishServiceMock).putNotification(captor.capture()); final TopicDOMNotification value = (TopicDOMNotification) captor.getValue(); - final Object actualTopicId = value.getBody().getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create("urn:cisco:params:xml:ns:yang:messagebus:eventaggregator", "2014-12-02", "topic-id"))).get().getValue(); + final QName qname = TopicNotification.QNAME; + final YangInstanceIdentifier.NodeIdentifier topicIdNode = + new YangInstanceIdentifier.NodeIdentifier(QName.create(qname.getNamespace().toString(), qname.getFormattedRevision(), "topic-id")); + final Object actualTopicId = value.getBody().getChild(topicIdNode).get().getValue(); Assert.assertEquals(topic1.getTopicId(), actualTopicId); } - private void joinTopicTestHelper() throws Exception{ - joinTopicInputMock = mock(JoinTopicInput.class); - TopicId topicId = new TopicId("topicID007"); - doReturn(topicId).when(joinTopicInputMock).getTopicId(); - NotificationPattern notificationPatternMock = mock(NotificationPattern.class); - doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern(); - doReturn("uriStr1").when(notificationPatternMock).getValue(); - - SchemaContext schemaContextMock = mock(SchemaContext.class); - doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext(); - Set notificationDefinitionSet = new HashSet<>(); - NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class); - notificationDefinitionSet.add(notificationDefinitionMock); - - URI uri = new URI("uriStr1"); - QName qName = new QName(uri, "localName1"); - org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName); - doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications(); - doReturn(schemaPath).when(notificationDefinitionMock).getPath(); - - Optional domNotificationServiceOptionalMock = (Optional) mock(Optional.class); - doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class); - doReturn(true).when(domNotificationServiceOptionalMock).isPresent(); - - DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class); - doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get(); - ListenerRegistration listenerRegistrationMock = (ListenerRegistration)mock(ListenerRegistration.class); - doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class)); - - Optional optionalMock = (Optional) mock(Optional.class); - doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); - doReturn(true).when(optionalMock).isPresent(); - DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); - doReturn(domRpcServiceMock).when(optionalMock).get(); - CheckedFuture checkedFutureMock = mock(CheckedFuture.class); - doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); + @Test + public void testDisjoinTopicOnNotification() throws Exception { + final TopicId topicId = TopicId.getDefaultInstance("topic1"); + final JoinTopicInput topic1 = new JoinTopicInputBuilder() + .setTopicId(topicId) + .setNotificationPattern(NotificationPattern.getDefaultInstance(".*ns1")) + .build(); + netconfEventSource.joinTopic(topic1); + + //handle notification matching topic namespace + netconfEventSource.onNotification(matchnigNotification); + //disjoin topic + DisJoinTopicInput disjoinTopic = new DisJoinTopicInputBuilder().setTopicId(topicId).build(); + netconfEventSource.disJoinTopic(disjoinTopic); + netconfEventSource.onNotification(matchnigNotification); + //topic notification published only once before disjoin + verify(domNotificationPublishServiceMock, only()).putNotification(any()); + } + private Stream createStream(String name) { + return new StreamBuilder() + .setName(new StreamNameType(name)) + .setReplaySupport(true) + .build(); } - private NotificationDefinition getNotificationDefinitionMock(String namespace, String revision, String name) { + private NotificationDefinition getNotificationDefinitionMock(QName qName) { NotificationDefinition notification = mock(NotificationDefinition.class); - final QName qName = QName.create(namespace, revision, name); doReturn(qName).when(notification).getQName(); doReturn(SchemaPath.create(true, qName)).when(notification).getPath(); return notification; diff --git a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfTestUtils.java b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfTestUtils.java index 72323c816e..3e04fc3a85 100644 --- a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfTestUtils.java +++ b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/NetconfTestUtils.java @@ -9,7 +9,9 @@ package org.opendaylight.netconf.messagebus.eventsources.netconf; import com.google.common.base.Optional; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder; @@ -32,6 +34,13 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; public final class NetconfTestUtils { @@ -84,4 +93,28 @@ public final class NetconfTestUtils { return Optional.of(streams); } + public static NormalizedNode getStreamsNode(String... streamName) { + QName nameNode = QName.create(Stream.QNAME, "name"); + Set streamSet = new HashSet<>(); + for (String s : streamName) { + MapEntryNode stream = Builders.mapEntryBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifierWithPredicates(Stream.QNAME, nameNode, s)) + .withChild(Builders.leafBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(nameNode)) + .withValue(s) + .build()) + .build(); + streamSet.add(stream); + } + + CollectionNodeBuilder streams = Builders.mapBuilder().withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(Stream.QNAME)); + for (MapEntryNode mapEntryNode : streamSet) { + streams.withChild(mapEntryNode); + } + return Builders.containerBuilder() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(Streams.QNAME)) + .withChild(streams.build()) + .build(); + } + } diff --git a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistrationTest.java b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistrationTest.java index 6036d90876..2bf354b2c9 100644 --- a/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistrationTest.java +++ b/netconf/messagebus-netconf/src/test/java/org/opendaylight/netconf/messagebus/eventsources/netconf/StreamNotificationTopicRegistrationTest.java @@ -10,26 +10,21 @@ package org.opendaylight.netconf.messagebus.eventsources.netconf; import static org.hamcrest.CoreMatchers.hasItems; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; -import java.util.ArrayList; import java.util.Date; +import java.util.Set; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; @@ -37,49 +32,37 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; public class StreamNotificationTopicRegistrationTest { - private static final String NS = CreateSubscriptionInput.QNAME.getNamespace().toString(); - private static final String REV = CreateSubscriptionInput.QNAME.getFormattedRevision(); - private static final YangInstanceIdentifier.NodeIdentifier STREAM = new YangInstanceIdentifier.NodeIdentifier(QName.create(NS, REV, "stream")); - private static final YangInstanceIdentifier.NodeIdentifier START_TIME = new YangInstanceIdentifier.NodeIdentifier(QName.create(NS, REV, "startTime")); private static final String STREAM_NAME = "stream-1"; - private static final SchemaPath createSubscription = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); private static final String PREFIX = ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString(); @Mock private NetconfEventSource source; @Mock - private DOMMountPoint mountPoint; - @Mock - private DOMRpcService service; + private NetconfEventSourceMount mount; @Mock private DOMNotificationService reference; @Mock private ListenerRegistration listenerRegistration; private StreamNotificationTopicRegistration registration; + private Stream stream; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); Node node = new NodeBuilder().setNodeId(NodeId.getDefaultInstance("node-id")).build(); - when(source.getNode()).thenReturn(node); - when(source.getDOMMountPoint()).thenReturn(mountPoint); - - when(mountPoint.getService(DOMRpcService.class)).thenReturn(Optional.of(service)); - when(mountPoint.getService(DOMNotificationService.class)).thenReturn(Optional.of(reference)); - when(reference.registerNotificationListener(any(), eq(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH))) + when(mount.getNode()).thenReturn(node); + when(mount.registerNotificationListener(source, ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH)) .thenReturn(listenerRegistration); - when(service.invokeRpc(eq(createSubscription), any())).thenReturn(Futures.immediateCheckedFuture(null)); + when(mount.invokeCreateSubscription(any(), any())).thenReturn(Futures.immediateCheckedFuture(null)); + when(mount.invokeCreateSubscription(any())).thenReturn(Futures.immediateCheckedFuture(null)); - Stream stream = new StreamBuilder().setName(StreamNameType.getDefaultInstance(STREAM_NAME)).setReplaySupport(true).build(); + when(source.getMount()).thenReturn(mount); + stream = new StreamBuilder().setName(StreamNameType.getDefaultInstance(STREAM_NAME)).setReplaySupport(true).build(); registration = new StreamNotificationTopicRegistration(stream, PREFIX, source); } @@ -87,11 +70,9 @@ public class StreamNotificationTopicRegistrationTest { @Test public void testActivateNotificationSource() throws Exception { registration.activateNotificationSource(); - - ArgumentCaptor captor = ArgumentCaptor.forClass(ContainerNode.class); Assert.assertTrue(registration.isActive()); - verify(service).invokeRpc(eq(createSubscription), captor.capture()); - checkStreamName(captor.getValue()); + verify(mount).invokeCreateSubscription(stream); + } @Test @@ -99,11 +80,8 @@ public class StreamNotificationTopicRegistrationTest { registration.setActive(true); registration.reActivateNotificationSource(); - ArgumentCaptor captor = ArgumentCaptor.forClass(ContainerNode.class); Assert.assertTrue(registration.isActive()); - verify(service).invokeRpc(eq(createSubscription), captor.capture()); - checkStreamName(captor.getValue()); - checkDate(captor.getValue(), Optional.absent()); + verify(mount).invokeCreateSubscription(stream, Optional.absent()); } @Test @@ -113,11 +91,8 @@ public class StreamNotificationTopicRegistrationTest { registration.setLastEventTime(lastEventTime); registration.reActivateNotificationSource(); - ArgumentCaptor captor = ArgumentCaptor.forClass(ContainerNode.class); Assert.assertTrue(registration.isActive()); - verify(service).invokeRpc(eq(createSubscription), captor.capture()); - checkStreamName(captor.getValue()); - checkDate(captor.getValue(), Optional.of(lastEventTime)); + verify(mount).invokeCreateSubscription(stream, Optional.of(lastEventTime)); } @Test @@ -132,12 +107,12 @@ public class StreamNotificationTopicRegistrationTest { final TopicId topic1 = registerTopic("topic1"); final TopicId topic2 = registerTopic("topic2"); final TopicId topic3 = registerTopic("topic3"); - final ArrayList notificationTopicIds = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + final Set notificationTopicIds = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); Assert.assertNotNull(notificationTopicIds); Assert.assertThat(notificationTopicIds, hasItems(topic1, topic2, topic3)); registration.unRegisterNotificationTopic(topic3); - final ArrayList afterUnregister = registration.getNotificationTopicIds(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + final Set afterUnregister = registration.getTopicsForNotification(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); Assert.assertNotNull(afterUnregister); Assert.assertThat(afterUnregister, hasItems(topic1, topic2)); Assert.assertFalse(afterUnregister.contains(topic3)); @@ -149,14 +124,5 @@ public class StreamNotificationTopicRegistrationTest { return topic; } - private void checkStreamName(ContainerNode value) { - final String streamName = (String) value.getChild(STREAM).get().getValue(); - Assert.assertEquals(STREAM_NAME, streamName); - } - - private void checkDate(ContainerNode value, Optional lastEventTime) { - final Optional startTime = (Optional) value.getChild(START_TIME).get().getValue(); - Assert.assertEquals(lastEventTime, startTime); - } } \ No newline at end of file -- 2.36.6