Bump upstreams for Silicon
[netconf.git] / netconf / messagebus-netconf / src / main / java / org / opendaylight / netconf / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.messagebus.eventsources.netconf;
9
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ExecutionException;
16 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
17 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
18 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Topic registration for notification with specified namespace from stream.
27  */
28 class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
29
30     private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
31
32     private final String nodeId;
33     private final NetconfEventSource netconfEventSource;
34     private final NetconfEventSourceMount mountPoint;
35     private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>>
36             notificationRegistrationMap = new ConcurrentHashMap<>();
37     private final Stream stream;
38
39     /**
40      * Creates registration to notification stream.
41      *
42      * @param stream             stream
43      * @param notificationPrefix notifications namespace
44      * @param netconfEventSource event source
45      */
46     StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
47                                         final NetconfEventSource netconfEventSource) {
48         super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
49         this.netconfEventSource = netconfEventSource;
50         this.mountPoint = netconfEventSource.getMount();
51         this.nodeId = mountPoint.getNode().getNodeId().getValue();
52         this.stream = stream;
53         setReplaySupported(stream.isReplaySupport());
54         setActive(false);
55         LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
56     }
57
58     /**
59      * Subscribes to notification stream associated with this registration.
60      */
61     @Override
62     void activateNotificationSource() {
63         if (!isActive()) {
64             LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
65             final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream);
66             try {
67                 result.get();
68                 setActive(true);
69             } catch (InterruptedException | ExecutionException e) {
70                 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
71                 setActive(false);
72             }
73         } else {
74             LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
75         }
76     }
77
78     /**
79      * Subscribes to notification stream associated with this registration. If replay is supported, notifications
80      * from last
81      * received event time will be requested.
82      */
83     @Override
84     void reActivateNotificationSource() {
85         if (isActive()) {
86             LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
87             final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream,
88                 getLastEventTime());
89             try {
90                 result.get();
91                 setActive(true);
92             } catch (InterruptedException | ExecutionException e) {
93                 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
94                 setActive(false);
95             }
96         }
97     }
98
99     @Override
100     void deActivateNotificationSource() {
101         // no operations need
102     }
103
104     private void closeStream() {
105         if (isActive()) {
106             for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
107                 reg.close();
108             }
109             notificationRegistrationMap.clear();
110             notificationTopicMap.clear();
111             setActive(false);
112         }
113     }
114
115     private String getStreamName() {
116         return getSourceName();
117     }
118
119     @Override
120     boolean registerNotificationTopic(final SchemaPath notificationPath, final TopicId topicId) {
121         if (!checkNotificationPath(notificationPath)) {
122             LOG.debug("Bad SchemaPath for notification try to register");
123             return false;
124         }
125
126         activateNotificationSource();
127         if (!isActive()) {
128             LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
129                     notificationPath);
130             return false;
131         }
132
133         ListenerRegistration<DOMNotificationListener> registration =
134                 mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
135         notificationRegistrationMap.put(notificationPath, registration);
136         Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
137         topicIds.add(topicId);
138
139         notificationTopicMap.put(notificationPath, topicIds);
140         return true;
141     }
142
143     @Override
144     synchronized void unRegisterNotificationTopic(final TopicId topicId) {
145         List<SchemaPath> notificationPathToRemove = new ArrayList<>();
146         for (SchemaPath notifKey : notificationTopicMap.keySet()) {
147             Set<TopicId> topicList = notificationTopicMap.get(notifKey);
148             if (topicList != null) {
149                 topicList.remove(topicId);
150                 if (topicList.isEmpty()) {
151                     notificationPathToRemove.add(notifKey);
152                 }
153             }
154         }
155         for (SchemaPath notifKey : notificationPathToRemove) {
156             notificationTopicMap.remove(notifKey);
157             ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
158             if (reg != null) {
159                 reg.close();
160             }
161         }
162     }
163
164     @Override
165     public void close() {
166         closeStream();
167     }
168
169 }