2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.messagebus.eventsources.netconf;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.concurrent.ConcurrentHashMap;
15 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
16 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
18 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Topic registration for notification with specified namespace from stream.
28 class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
30 private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
32 private final String nodeId;
33 private final NetconfEventSource netconfEventSource;
34 private final NetconfEventSourceMount mountPoint;
35 private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>>
36 notificationRegistrationMap = new ConcurrentHashMap<>();
37 private final Stream stream;
40 * Creates registration to notification stream.
42 * @param stream stream
43 * @param notificationPrefix notifications namespace
44 * @param netconfEventSource event source
46 StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
47 NetconfEventSource netconfEventSource) {
48 super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
49 this.netconfEventSource = netconfEventSource;
50 this.mountPoint = netconfEventSource.getMount();
51 this.nodeId = mountPoint.getNode().getNodeId().getValue();
53 setReplaySupported(stream.isReplaySupport());
55 LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
59 * Subscribes to notification stream associated with this registration.
61 void activateNotificationSource() {
63 LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
64 final CheckedFuture<DOMRpcResult, DOMRpcException> result = mountPoint.invokeCreateSubscription(stream);
68 } catch (DOMRpcException e) {
69 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
73 LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
78 * Subscribes to notification stream associated with this registration. If replay is supported, notifications
80 * received event time will be requested.
82 void reActivateNotificationSource() {
84 LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
85 final CheckedFuture<DOMRpcResult, DOMRpcException> result;
86 result = mountPoint.invokeCreateSubscription(stream, getLastEventTime());
90 } catch (DOMRpcException e) {
91 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
98 void deActivateNotificationSource() {
102 private void closeStream() {
104 for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
107 notificationRegistrationMap.clear();
108 notificationTopicMap.clear();
113 private String getStreamName() {
114 return getSourceName();
118 boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
119 if (!checkNotificationPath(notificationPath)) {
120 LOG.debug("Bad SchemaPath for notification try to register");
124 activateNotificationSource();
126 LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
127 notificationPath.toString());
131 ListenerRegistration<DOMNotificationListener> registration =
132 mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
133 notificationRegistrationMap.put(notificationPath, registration);
134 Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
135 topicIds.add(topicId);
137 notificationTopicMap.put(notificationPath, topicIds);
142 synchronized void unRegisterNotificationTopic(TopicId topicId) {
143 List<SchemaPath> notificationPathToRemove = new ArrayList<>();
144 for (SchemaPath notifKey : notificationTopicMap.keySet()) {
145 Set<TopicId> topicList = notificationTopicMap.get(notifKey);
146 if (topicList != null) {
147 topicList.remove(topicId);
148 if (topicList.isEmpty()) {
149 notificationPathToRemove.add(notifKey);
153 for (SchemaPath notifKey : notificationPathToRemove) {
154 notificationTopicMap.remove(notifKey);
155 ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
163 public void close() {