X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FNetconfEventSource.java;fp=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FNetconfEventSource.java;h=9c0697f3fb3d666d7d4871e79a9de571f6d627ab;hb=22c6645e0793c65ba1f0c1004d79cf83e770d765;hp=0000000000000000000000000000000000000000;hpb=68e68b304776fd74c40dd9ce29dab674f4c21dda;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java new file mode 100644 index 0000000000..9c0697f3fb --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.regex.Pattern; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.mdsal.MdSAL; +import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class); + + private final MdSAL mdSal; + private final String nodeId; + + private final List activeStreams = new ArrayList<>(); + + private final Map urnPrefixToStreamMap; + + public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map streamMap) { + Preconditions.checkNotNull(mdSal); + Preconditions.checkNotNull(nodeId); + + this.mdSal = mdSal; + this.nodeId = nodeId; + this.urnPrefixToStreamMap = streamMap; + + LOGGER.info("NetconfEventSource [{}] created.", nodeId); + } + + @Override + public Future> joinTopic(final JoinTopicInput input) { + final NotificationPattern notificationPattern = input.getNotificationPattern(); + + // FIXME: default language should already be regex + final String regex = Util.wildcardToRegex(notificationPattern.getValue()); + + final Pattern pattern = Pattern.compile(regex); + List matchingNotifications = Util.expandQname(availableNotifications(), pattern); + registerNotificationListener(matchingNotifications); + return null; + } + + private List availableNotifications() { + // FIXME: use SchemaContextListener to get changes asynchronously + Set availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications(); + List qNs = new ArrayList<>(availableNotifications.size()); + for (NotificationDefinition nd : availableNotifications) { + qNs.add(nd.getQName()); + } + + return qNs; + } + + private void registerNotificationListener(final List notificationsToSubscribe) { + for (QName qName : notificationsToSubscribe) { + startSubscription(qName); + // FIXME: do not lose this registration + final ListenerRegistration reg = mdSal.addNotificationListener(nodeId, qName, this); + } + } + + private synchronized void startSubscription(final QName qName) { + String streamName = resolveStream(qName); + + if (streamIsActive(streamName) == false) { + LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); + startSubscription(streamName); + } + } + + private synchronized void resubscribeToActiveStreams() { + for (String streamName : activeStreams) { + startSubscription(streamName); + } + } + + private synchronized void startSubscription(final String streamName) { + CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName); + mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput); + activeStreams.add(streamName); + } + + private static CreateSubscriptionInput getSubscriptionInput(final String streamName) { + CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder(); + csib.setStream(new StreamNameType(streamName)); + return csib.build(); + } + + private String resolveStream(final QName qName) { + String streamName = null; + + for (Map.Entry entry : urnPrefixToStreamMap.entrySet()) { + String nameSpace = qName.getNamespace().toString(); + String urnPrefix = entry.getKey(); + if( nameSpace.startsWith(urnPrefix) ) { + streamName = entry.getValue(); + break; + } + } + + return streamName; + } + + private boolean streamIsActive(final String streamName) { + return activeStreams.contains(streamName); + } + + // PASS + @Override public Set getSupportedNotifications() { + return null; + } + + @Override + public void onNotification(final CompositeNode notification) { + LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification); + ImmutableCompositeNode payload = ImmutableCompositeNode.builder() + .setQName(QName.create(TopicNotification.QNAME, "payload")) + .add(notification).toInstance(); + ImmutableCompositeNode icn = ImmutableCompositeNode.builder() + .setQName(TopicNotification.QNAME) + .add(payload) + .addLeaf("event-source", nodeId) + .toInstance(); + + mdSal.publishNotification(icn); + } + + @Override + public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { + boolean wasConnected = false; + boolean nowConnected = false; + + for (Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { + if ( isNetconfNode(changeEntry) ) { + NetconfNode nn = (NetconfNode)changeEntry.getValue(); + wasConnected = nn.isConnected(); + } + } + + for (Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { + if ( isNetconfNode(changeEntry) ) { + NetconfNode nn = (NetconfNode)changeEntry.getValue(); + nowConnected = nn.isConnected(); + } + } + + if (wasConnected == false && nowConnected == true) { + resubscribeToActiveStreams(); + } + } + + private static boolean isNetconfNode(final Map.Entry, DataObject> changeEntry ) { + return NetconfNode.class.equals(changeEntry.getKey().getTargetType()); + } + +}