X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Feventsources%2Fnetconf%2FNetconfEventSource.java;h=3dbdc98ea52dd1c68fc7f15b2afcc955fd7887d2;hb=869d6bd3f66ca2b655d7690afe6a432700749907;hp=615fa34b7c4d302aa005cd5b236a6661e4c4c354;hpb=78527e81f8cc82140af5cb2649863a597f380291;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java index 615fa34b7c..3dbdc98ea5 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -25,7 +24,6 @@ import javax.xml.transform.dom.DOMResult; import javax.xml.transform.dom.DOMSource; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; @@ -53,7 +51,6 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; @@ -78,6 +75,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); + private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); @@ -88,74 +86,74 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, private final DOMMountPoint netconfMount; private final DOMNotificationPublishService domPublish; - private final Set activeStreams = new ConcurrentSkipListSet<>(); private final Map urnPrefixToStreamMap; - private final ConcurrentHashMap> listenerRegistrationMap = new ConcurrentHashMap<>(); - public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) { + private final ConcurrentHashMap streamNotifRegistrationMap = new ConcurrentHashMap<>(); + + public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService) { this.netconfMount = netconfMount; this.node = node; this.nodeId = node.getNodeId().getValue(); this.urnPrefixToStreamMap = streamMap; this.domPublish = publishService; + this.initializeStreamNotifRegistrationMap(); LOG.info("NetconfEventSource [{}] created.", nodeId); } + private void initializeStreamNotifRegistrationMap(){ + for(String streamName : this.urnPrefixToStreamMap.values()){ + streamNotifRegistrationMap.put(streamName, new StreamNotificationTopicRegistration(streamName, this.nodeId, this.netconfMount, this)); + } + } + @Override public Future> joinTopic(final JoinTopicInput input) { + final NotificationPattern notificationPattern = input.getNotificationPattern(); final List matchingNotifications = getMatchingNotifications(notificationPattern); - return registerNotificationListener(input.getTopicId(),matchingNotifications); + return registerTopic(input.getTopicId(),matchingNotifications); + } - private synchronized Future> registerNotificationListener(final TopicId topicId, final List notificationsToSubscribe){ - if(listenerRegistrationMap.containsKey(topicId)){ - final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId; - return immediateFuture(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, errMessage).build()); - } - ListenerRegistration registration = null; - JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; - final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ - if(notifyService.isPresent()) { - for (final SchemaPath qName : notificationsToSubscribe) { - startSubscription(qName); + JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; + if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ + final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + if(notifyService.isPresent()){ + int subscribedStreams = 0; + for(SchemaPath schemaNotification : notificationsToSubscribe){ + final Optional streamName = resolveStream(schemaNotification.getLastComponent()); + if(streamName.isPresent()){ + LOG.info("Stream {} is activating, TopicId {}", streamName.get(), topicId.getValue() ); + StreamNotificationTopicRegistration streamReg = streamNotifRegistrationMap.get(streamName.get()); + streamReg.activateStream(); + for(SchemaPath notificationPath : notificationsToSubscribe){ + LOG.info("Notification listener is registering, Notification {}, TopicId {}", notificationPath, topicId.getValue() ); + streamReg.registerNotificationListenerTopic(notificationPath, topicId); + } + subscribedStreams = subscribedStreams + 1; + } + } + if(subscribedStreams > 0){ + joinTopicStatus = JoinTopicStatus.Up; + } } - registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe); } - if(registration != null){ - listenerRegistrationMap.put(topicId,registration); - joinTopicStatus = JoinTopicStatus.Up; - } final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); return immediateFuture(RpcResultBuilder.success(output).build()); - } - private void startSubscription(final SchemaPath path) { - final String streamName = resolveStream(path.getLastComponent()); - startSubscription(streamName); } private void resubscribeToActiveStreams() { - for (final String streamName : activeStreams) { - startSubscription(streamName); - } - } - - private synchronized void startSubscription(final String streamName) { - if(streamIsActive(streamName) == false){ - LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName)) - .build(); - netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - activeStreams.add(streamName); + for (StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ + streamReg.reActivateStream(); } } - private String resolveStream(final QName qName) { + private Optional resolveStream(final QName qName) { String streamName = null; for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { @@ -166,28 +164,35 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, break; } } - - return streamName; - } - - private boolean streamIsActive(final String streamName) { - return activeStreams.contains(streamName); + return Optional.fromNullable(streamName); } @Override public void onNotification(final DOMNotification notification) { - final ContainerNode topicNotification = Builders.containerBuilder() - .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) - .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) - .withChild(encapsulate(notification)) - .build(); - try { - domPublish.putNotification(new TopicDOMNotification(topicNotification)); - } catch (final InterruptedException e) { - throw Throwables.propagate(e); + SchemaPath notificationPath = notification.getType(); + LOG.info("Notification {} has come.",notification.getType()); + for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ + for(TopicId topicId : streamReg.getNotificationTopicIds(notificationPath)){ + publishNotification(notification, topicId); + LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); + } } } + private void publishNotification(final DOMNotification notification, TopicId topicId){ + final ContainerNode topicNotification = Builders.containerBuilder() + .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) + .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) + .withChild(encapsulate(notification)) + .build(); + try { + domPublish.putNotification(new TopicDOMNotification(topicNotification)); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } + } + private AnyXmlNode encapsulate(final DOMNotification body) { // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools final Document doc = XmlUtil.newDocument(); @@ -242,13 +247,17 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, final String regex = Util.wildcardToRegex(notificationPattern.getValue()); final Pattern pattern = Pattern.compile(regex); - return Util.expandQname(getAvailableNotifications(), pattern); + List availableNotifications = getAvailableNotifications(); + if(availableNotifications == null || availableNotifications.isEmpty()){ + return null; + } + return Util.expandQname(availableNotifications, pattern); } @Override public void close() throws Exception { - for(ListenerRegistration registration : listenerRegistrationMap.values()){ - registration.close(); + for(StreamNotificationTopicRegistration streamReg : streamNotifRegistrationMap.values()){ + streamReg.deactivateStream(); } } @@ -268,4 +277,84 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener, return qNs; } + private class StreamNotificationTopicRegistration{ + + final private String streamName; + final private DOMMountPoint netconfMount; + final private String nodeId; + final private NetconfEventSource notificationListener; + private boolean active; + + private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); + + public StreamNotificationTopicRegistration(final String streamName, final String nodeId, final DOMMountPoint netconfMount, NetconfEventSource notificationListener) { + this.streamName = streamName; + this.netconfMount = netconfMount; + this.nodeId = nodeId; + this.notificationListener = notificationListener; + this.active = false; + } + + public boolean isActive() { + return active; + } + + public void reActivateStream(){ + if(this.isActive()){ + LOG.info("Stream {} is reactivated active on node {}.", this.streamName, this.nodeId); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName)) + .build(); + netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); + } + } + + public void activateStream() { + if(this.isActive() == false){ + LOG.info("Stream {} is not active on node {}. Will subscribe.", this.streamName, this.nodeId); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName)) + .build(); + netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); + this.active = true; + } else { + LOG.info("Stream {} is now active on node {}", this.streamName, this.nodeId); + } + } + + public void deactivateStream() { + for(ListenerRegistration reg : notificationRegistrationMap.values()){ + reg.close(); + } + this.active = false; + } + + public String getStreamName() { + return streamName; + } + + public ArrayList getNotificationTopicIds(SchemaPath notificationPath){ + return notificationTopicMap.get(notificationPath); + } + + public void registerNotificationListenerTopic(SchemaPath notificationPath, TopicId topicId){ + final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + if(notificationPath != null && notifyService.isPresent()){ + ListenerRegistration registration = notifyService.get().registerNotificationListener(this.notificationListener,notificationPath); + notificationRegistrationMap.put(notificationPath, registration); + ArrayList topicIds = getNotificationTopicIds(notificationPath); + if(topicIds == null){ + topicIds = new ArrayList<>(); + topicIds.add(topicId); + } else { + if(topicIds.contains(topicId) == false){ + topicIds.add(topicId); + } + } + notificationTopicMap.put(notificationPath, topicIds); + } + } + + } }