X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Feventsources%2Fnetconf%2FNetconfEventSource.java;h=e4ad387f4d084d1b44fe26d5bff6e2dfe2611dbe;hb=cce450550bec259d4f925389bafd007676f2186f;hp=615fa34b7c4d302aa005cd5b236a6661e4c4c354;hpb=9f1061c46af5220ad95d8d0b94411ba2fd832a50;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java index 615fa34b7c..e4ad387f4d 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -12,11 +12,11 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -24,15 +24,17 @@ import javax.xml.stream.XMLStreamException; import javax.xml.transform.dom.DOMResult; import javax.xml.transform.dom.DOMSource; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.MountPoint; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMEvent; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification; import org.opendaylight.controller.messagebus.app.impl.Util; import org.opendaylight.controller.messagebus.spi.EventSource; @@ -45,15 +47,15 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; @@ -70,124 +72,194 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.CheckedFuture; -public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener { +public class NetconfEventSource implements EventSource, DOMNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); + private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); - - private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); - private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; private final String nodeId; private final Node node; private final DOMMountPoint netconfMount; + private final MountPoint mountPoint; private final DOMNotificationPublishService domPublish; - private final Set activeStreams = new ConcurrentSkipListSet<>(); - private final Map urnPrefixToStreamMap; - private final ConcurrentHashMap> listenerRegistrationMap = new ConcurrentHashMap<>(); + private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName + private final List notificationTopicRegistrationList = new ArrayList<>(); - public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) { - this.netconfMount = netconfMount; - this.node = node; + public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) { + this.netconfMount = Preconditions.checkNotNull(netconfMount); + this.mountPoint = Preconditions.checkNotNull(mountPoint); + this.node = Preconditions.checkNotNull(node); + this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); + this.domPublish = Preconditions.checkNotNull(publishService); this.nodeId = node.getNodeId().getValue(); - this.urnPrefixToStreamMap = streamMap; - this.domPublish = publishService; - LOG.info("NetconfEventSource [{}] created.", nodeId); + this.initializeNotificationTopicRegistrationList(); + + LOG.info("NetconfEventSource [{}] created.", this.nodeId); + } + + private void initializeNotificationTopicRegistrationList() { + notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); + Optional> streamMap = getAvailableStreams(); + if(streamMap.isPresent()){ + LOG.debug("Stream configuration compare..."); + for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { + final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); + LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); + if(streamMap.get().containsKey(streamName)){ + LOG.debug("Stream containig on device"); + notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this)); + } + } + } + } + + private Optional> getAvailableStreams(){ + + Map streamMap = null; + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + Optional dataBroker = this.mountPoint.getService(DataBroker.class); + + if(dataBroker.isPresent()){ + LOG.debug("GET Available streams ..."); + ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream); + + try { + Optional streams = checkFeature.checkedGet(); + if(streams.isPresent()){ + streamMap = new HashMap<>(); + for(Stream stream : streams.get().getStream()){ + LOG.debug("*** find stream {}", stream.getName().getValue()); + streamMap.put(stream.getName().getValue(), stream); + } + } + } catch (ReadFailedException e) { + LOG.warn("Can not read streams for node {}",this.nodeId); + } + + } else { + LOG.warn("No databroker on node {}", this.nodeId); + } + + return Optional.fromNullable(streamMap); } @Override public Future> joinTopic(final JoinTopicInput input) { + LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); final NotificationPattern notificationPattern = input.getNotificationPattern(); final List matchingNotifications = getMatchingNotifications(notificationPattern); - return registerNotificationListener(input.getTopicId(),matchingNotifications); + return registerTopic(input.getTopicId(),matchingNotifications); + } - private synchronized Future> registerNotificationListener(final TopicId topicId, final List notificationsToSubscribe){ - if(listenerRegistrationMap.containsKey(topicId)){ - final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId; - return immediateFuture(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, errMessage).build()); - } - ListenerRegistration registration = null; - JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; - final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + @Override + public Future> disJoinTopic(DisJoinTopicInput input) { + for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ + reg.unRegisterNotificationTopic(input.getTopicId()); + } + return Util.resultRpcSuccessFor((Void) null) ; + } - if(notifyService.isPresent()) { - for (final SchemaPath qName : notificationsToSubscribe) { - startSubscription(qName); + private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ + LOG.debug("Join topic {} - register"); + JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; + if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ + LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() ); + final Optional notifyService = getDOMMountPoint().getService(DOMNotificationService.class); + if(notifyService.isPresent()){ + int registeredNotificationCount = 0; + for(SchemaPath schemaNotification : notificationsToSubscribe){ + for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ + LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName()); + if(reg.checkNotificationPath(schemaNotification)){ + LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); + boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); + if(regSuccess){ + registeredNotificationCount = registeredNotificationCount +1; + } + } + } + } + if(registeredNotificationCount > 0){ + joinTopicStatus = JoinTopicStatus.Up; + } + } else { + LOG.warn("NO DOMNotification service on node {}", this.nodeId); } - registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe); + } else { + LOG.debug("Notifications to subscribe has NOT found"); } - if(registration != null){ - listenerRegistrationMap.put(topicId,registration); - joinTopicStatus = JoinTopicStatus.Up; - } final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); return immediateFuture(RpcResultBuilder.success(output).build()); - } - private void startSubscription(final SchemaPath path) { - final String streamName = resolveStream(path.getLastComponent()); - startSubscription(streamName); } - private void resubscribeToActiveStreams() { - for (final String streamName : activeStreams) { - startSubscription(streamName); + public void reActivateStreams(){ + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); + reg.reActivateNotificationSource(); } } - private synchronized void startSubscription(final String streamName) { - if(streamIsActive(streamName) == false){ - LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName)) - .build(); - netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - activeStreams.add(streamName); + public void deActivateStreams(){ + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); + reg.deActivateNotificationSource(); } } - private String resolveStream(final QName qName) { - String streamName = null; - - for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { - final String nameSpace = qName.getNamespace().toString(); - final String urnPrefix = entry.getKey(); - if( nameSpace.startsWith(urnPrefix) ) { - streamName = entry.getValue(); - break; - } + @Override + public void onNotification(final DOMNotification notification) { + SchemaPath notificationPath = notification.getType(); + Date notificationEventTime = null; + if(notification instanceof DOMEvent){ + notificationEventTime = ((DOMEvent) notification).getEventTime(); } + for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){ + ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); + if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){ - return streamName; - } + if(notifReg instanceof StreamNotificationTopicRegistration){ + StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg; + streamReg.setLastEventTime(notificationEventTime); + } - private boolean streamIsActive(final String streamName) { - return activeStreams.contains(streamName); - } + for(TopicId topicId : topicIdsForNotification){ + publishNotification(notification, topicId); + LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); + } - @Override - public void onNotification(final DOMNotification notification) { - final ContainerNode topicNotification = Builders.containerBuilder() - .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) - .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) - .withChild(encapsulate(notification)) - .build(); - try { - domPublish.putNotification(new TopicDOMNotification(topicNotification)); - } catch (final InterruptedException e) { - throw Throwables.propagate(e); + } } } + private void publishNotification(final DOMNotification notification, TopicId topicId){ + final ContainerNode topicNotification = Builders.containerBuilder() + .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) + .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)) + .withChild(encapsulate(notification)) + .build(); + try { + domPublish.putNotification(new TopicDOMNotification(topicNotification)); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } + } + private AnyXmlNode encapsulate(final DOMNotification body) { // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools final Document doc = XmlUtil.newDocument(); @@ -196,7 +268,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, final DOMResult result = new DOMResult(element); - final SchemaContext context = netconfMount.getSchemaContext(); + final SchemaContext context = getDOMMountPoint().getSchemaContext(); final SchemaPath schemaPath = body.getType(); try { NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); @@ -209,63 +281,60 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, } } - @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { - boolean wasConnected = false; - boolean nowConnected = false; - - for (final Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { - if ( isNetconfNode(changeEntry) ) { - final NetconfNode nn = (NetconfNode)changeEntry.getValue(); - wasConnected = nn.isConnected(); - } - } - - for (final Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { - if ( isNetconfNode(changeEntry) ) { - final NetconfNode nn = (NetconfNode)changeEntry.getValue(); - nowConnected = nn.isConnected(); - } - } - - if (wasConnected == false && nowConnected == true) { - resubscribeToActiveStreams(); - } - } - - private static boolean isNetconfNode(final Map.Entry, DataObject> changeEntry ) { - return NetconfNode.class.equals(changeEntry.getKey().getTargetType()); - } - private List getMatchingNotifications(NotificationPattern notificationPattern){ // FIXME: default language should already be regex final String regex = Util.wildcardToRegex(notificationPattern.getValue()); final Pattern pattern = Pattern.compile(regex); - return Util.expandQname(getAvailableNotifications(), pattern); + List availableNotifications = getAvailableNotifications(); + if(availableNotifications == null || availableNotifications.isEmpty()){ + return null; + } + return Util.expandQname(availableNotifications, pattern); } @Override public void close() throws Exception { - for(ListenerRegistration registration : listenerRegistrationMap.values()){ - registration.close(); + for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){ + streamReg.close(); } } @Override public NodeKey getSourceNodeKey(){ - return node.getKey(); + return getNode().getKey(); } @Override public List getAvailableNotifications() { + + final List availNotifList = new ArrayList<>(); + // add Event Source Connection status notification + availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + // FIXME: use SchemaContextListener to get changes asynchronously - final Set availableNotifications = netconfMount.getSchemaContext().getNotifications(); - final List qNs = new ArrayList<>(availableNotifications.size()); + final Set availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications(); + // add all known notifications from netconf device for (final NotificationDefinition nd : availableNotifications) { - qNs.add(nd.getPath()); + availNotifList.add(nd.getPath()); } - return qNs; + return availNotifList; + } + + public Node getNode() { + return node; + } + + DOMMountPoint getDOMMountPoint() { + return netconfMount; + } + + MountPoint getMountPoint() { + return mountPoint; + } + + NetconfNode getNetconfNode(){ + return node.getAugmentation(NetconfNode.class); } }