2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.messagebus.eventsources.netconf;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.concurrent.ConcurrentHashMap;
15 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
16 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
18 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Topic registration for notification with specified namespace from stream.
28 class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
30 private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
32 private final String nodeId;
33 private final NetconfEventSource netconfEventSource;
34 private final NetconfEventSourceMount mountPoint;
35 private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>> notificationRegistrationMap = new ConcurrentHashMap<>();
36 private final Stream stream;
39 * Creates registration to notification stream.
40 * @param stream stream
41 * @param notificationPrefix notifications namespace
42 * @param netconfEventSource event source
44 public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
45 NetconfEventSource netconfEventSource) {
46 super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
47 this.netconfEventSource = netconfEventSource;
48 this.mountPoint = netconfEventSource.getMount();
49 this.nodeId = mountPoint.getNode().getNodeId().getValue();
51 setReplaySupported(stream.isReplaySupport());
53 LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
57 * Subscribes to notification stream associated with this registration.
59 void activateNotificationSource() {
61 LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
62 final CheckedFuture<DOMRpcResult, DOMRpcException> result = mountPoint.invokeCreateSubscription(stream);
66 } catch (DOMRpcException e) {
67 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
71 LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
76 * Subscribes to notification stream associated with this registration. If replay is supported, notifications from last
77 * received event time will be requested.
79 void reActivateNotificationSource() {
81 LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
82 final CheckedFuture<DOMRpcResult, DOMRpcException> result;
83 result = mountPoint.invokeCreateSubscription(stream, getLastEventTime());
87 } catch (DOMRpcException e) {
88 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
94 @Override void deActivateNotificationSource() {
98 private void closeStream() {
100 for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
103 notificationRegistrationMap.clear();
104 notificationTopicMap.clear();
109 private String getStreamName() {
110 return getSourceName();
113 @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
114 if (!checkNotificationPath(notificationPath)) {
115 LOG.debug("Bad SchemaPath for notification try to register");
119 activateNotificationSource();
121 LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
122 notificationPath.toString());
126 ListenerRegistration<DOMNotificationListener> registration = mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
127 notificationRegistrationMap.put(notificationPath, registration);
128 Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
129 topicIds.add(topicId);
131 notificationTopicMap.put(notificationPath, topicIds);
135 @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
136 List<SchemaPath> notificationPathToRemove = new ArrayList<>();
137 for (SchemaPath notifKey : notificationTopicMap.keySet()) {
138 Set<TopicId> topicList = notificationTopicMap.get(notifKey);
139 if (topicList != null) {
140 topicList.remove(topicId);
141 if (topicList.isEmpty()) {
142 notificationPathToRemove.add(notifKey);
146 for (SchemaPath notifKey : notificationPathToRemove) {
147 notificationTopicMap.remove(notifKey);
148 ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
155 @Override public void close() throws Exception {