Merge "Add support to RESTCONF to dynamically specify Filters"
[netconf.git] / netconf / messagebus-netconf / src / main / java / org / opendaylight / netconf / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.messagebus.eventsources.netconf;
9
10 import com.google.common.util.concurrent.CheckedFuture;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
16 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
18 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Topic registration for notification with specified namespace from stream.
27  */
28 class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
29
30     private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
31
32     private final String nodeId;
33     private final NetconfEventSource netconfEventSource;
34     private final NetconfEventSourceMount mountPoint;
35     private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>> notificationRegistrationMap = new ConcurrentHashMap<>();
36     private final Stream stream;
37
38     /**
39      * Creates registration to notification stream.
40      * @param stream stream
41      * @param notificationPrefix notifications namespace
42      * @param netconfEventSource event source
43      */
44     public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
45         NetconfEventSource netconfEventSource) {
46         super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
47         this.netconfEventSource = netconfEventSource;
48         this.mountPoint = netconfEventSource.getMount();
49         this.nodeId = mountPoint.getNode().getNodeId().getValue();
50         this.stream = stream;
51         setReplaySupported(stream.isReplaySupport());
52         setActive(false);
53         LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
54     }
55
56     /**
57      * Subscribes to notification stream associated with this registration.
58      */
59     void activateNotificationSource() {
60         if (!isActive()) {
61             LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
62             final CheckedFuture<DOMRpcResult, DOMRpcException> result = mountPoint.invokeCreateSubscription(stream);
63             try {
64                 result.checkedGet();
65                 setActive(true);
66             } catch (DOMRpcException e) {
67                 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
68                 setActive(false);
69             }
70         } else {
71             LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
72         }
73     }
74
75     /**
76      * Subscribes to notification stream associated with this registration. If replay is supported, notifications from last
77      * received event time will be requested.
78      */
79     void reActivateNotificationSource() {
80         if (isActive()) {
81             LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
82             final CheckedFuture<DOMRpcResult, DOMRpcException> result;
83             result = mountPoint.invokeCreateSubscription(stream, getLastEventTime());
84             try {
85                 result.checkedGet();
86                 setActive(true);
87             } catch (DOMRpcException e) {
88                 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
89                 setActive(false);
90             }
91         }
92     }
93
94     @Override void deActivateNotificationSource() {
95         // no operations need
96     }
97
98     private void closeStream() {
99         if (isActive()) {
100             for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
101                 reg.close();
102             }
103             notificationRegistrationMap.clear();
104             notificationTopicMap.clear();
105             setActive(false);
106         }
107     }
108
109     private String getStreamName() {
110         return getSourceName();
111     }
112
113     @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
114         if (!checkNotificationPath(notificationPath)) {
115             LOG.debug("Bad SchemaPath for notification try to register");
116             return false;
117         }
118
119         activateNotificationSource();
120         if (!isActive()) {
121             LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
122                 notificationPath.toString());
123             return false;
124         }
125
126         ListenerRegistration<DOMNotificationListener> registration = mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
127         notificationRegistrationMap.put(notificationPath, registration);
128         Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
129         topicIds.add(topicId);
130
131         notificationTopicMap.put(notificationPath, topicIds);
132         return true;
133     }
134
135     @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
136         List<SchemaPath> notificationPathToRemove = new ArrayList<>();
137         for (SchemaPath notifKey : notificationTopicMap.keySet()) {
138             Set<TopicId> topicList = notificationTopicMap.get(notifKey);
139             if (topicList != null) {
140                 topicList.remove(topicId);
141                 if (topicList.isEmpty()) {
142                     notificationPathToRemove.add(notifKey);
143                 }
144             }
145         }
146         for (SchemaPath notifKey : notificationPathToRemove) {
147             notificationTopicMap.remove(notifKey);
148             ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
149             if (reg != null) {
150                 reg.close();
151             }
152         }
153     }
154
155     @Override public void close() throws Exception {
156         closeStream();
157     }
158
159 }