2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.messagebus.eventsources.netconf;
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.List;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ExecutionException;
16 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
17 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
18 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Topic registration for notification with specified namespace from stream.
28 class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
30 private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
32 private final String nodeId;
33 private final NetconfEventSource netconfEventSource;
34 private final NetconfEventSourceMount mountPoint;
35 private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>>
36 notificationRegistrationMap = new ConcurrentHashMap<>();
37 private final Stream stream;
40 * Creates registration to notification stream.
42 * @param stream stream
43 * @param notificationPrefix notifications namespace
44 * @param netconfEventSource event source
46 StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
47 final NetconfEventSource netconfEventSource) {
48 super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
49 this.netconfEventSource = netconfEventSource;
50 this.mountPoint = netconfEventSource.getMount();
51 this.nodeId = mountPoint.getNode().getNodeId().getValue();
53 setReplaySupported(stream.getReplaySupport());
55 LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
59 * Subscribes to notification stream associated with this registration.
62 void activateNotificationSource() {
64 LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
65 final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream);
69 } catch (InterruptedException | ExecutionException e) {
70 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
74 LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
79 * Subscribes to notification stream associated with this registration. If replay is supported, notifications
81 * received event time will be requested.
84 void reActivateNotificationSource() {
86 LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
87 final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream,
92 } catch (InterruptedException | ExecutionException e) {
93 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
100 void deActivateNotificationSource() {
101 // no operations need
104 private void closeStream() {
106 for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
109 notificationRegistrationMap.clear();
110 notificationTopicMap.clear();
115 private String getStreamName() {
116 return getSourceName();
120 boolean registerNotificationTopic(final SchemaPath notificationPath, final TopicId topicId) {
121 if (!checkNotificationPath(notificationPath)) {
122 LOG.debug("Bad SchemaPath for notification try to register");
126 activateNotificationSource();
128 LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
133 ListenerRegistration<DOMNotificationListener> registration =
134 mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
135 notificationRegistrationMap.put(notificationPath, registration);
136 Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
137 topicIds.add(topicId);
139 notificationTopicMap.put(notificationPath, topicIds);
144 synchronized void unRegisterNotificationTopic(final TopicId topicId) {
145 List<SchemaPath> notificationPathToRemove = new ArrayList<>();
146 for (SchemaPath notifKey : notificationTopicMap.keySet()) {
147 Set<TopicId> topicList = notificationTopicMap.get(notifKey);
148 if (topicList != null) {
149 topicList.remove(topicId);
150 if (topicList.isEmpty()) {
151 notificationPathToRemove.add(notifKey);
155 for (SchemaPath notifKey : notificationPathToRemove) {
156 notificationTopicMap.remove(notifKey);
157 ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
165 public void close() {