Bump versions to 2.0.0-SNAPSHOT
[netconf.git] / netconf / messagebus-netconf / src / main / java / org / opendaylight / netconf / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.messagebus.eventsources.netconf;
9
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ExecutionException;
16 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
17 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
18 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
19 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Topic registration for notification with specified namespace from stream.
27  */
28 @Deprecated(forRemoval = true)
29 class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
30
31     private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
32
33     private final String nodeId;
34     private final NetconfEventSource netconfEventSource;
35     private final NetconfEventSourceMount mountPoint;
36     private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>>
37             notificationRegistrationMap = new ConcurrentHashMap<>();
38     private final Stream stream;
39
40     /**
41      * Creates registration to notification stream.
42      *
43      * @param stream             stream
44      * @param notificationPrefix notifications namespace
45      * @param netconfEventSource event source
46      */
47     StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
48                                         final NetconfEventSource netconfEventSource) {
49         super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
50         this.netconfEventSource = netconfEventSource;
51         this.mountPoint = netconfEventSource.getMount();
52         this.nodeId = mountPoint.getNode().getNodeId().getValue();
53         this.stream = stream;
54         setReplaySupported(stream.getReplaySupport());
55         setActive(false);
56         LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
57     }
58
59     /**
60      * Subscribes to notification stream associated with this registration.
61      */
62     @Override
63     void activateNotificationSource() {
64         if (!isActive()) {
65             LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
66             final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream);
67             try {
68                 result.get();
69                 setActive(true);
70             } catch (InterruptedException | ExecutionException e) {
71                 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
72                 setActive(false);
73             }
74         } else {
75             LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
76         }
77     }
78
79     /**
80      * Subscribes to notification stream associated with this registration. If replay is supported, notifications
81      * from last
82      * received event time will be requested.
83      */
84     @Override
85     void reActivateNotificationSource() {
86         if (isActive()) {
87             LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
88             final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream,
89                 getLastEventTime());
90             try {
91                 result.get();
92                 setActive(true);
93             } catch (InterruptedException | ExecutionException e) {
94                 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
95                 setActive(false);
96             }
97         }
98     }
99
100     @Override
101     void deActivateNotificationSource() {
102         // no operations need
103     }
104
105     private void closeStream() {
106         if (isActive()) {
107             for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
108                 reg.close();
109             }
110             notificationRegistrationMap.clear();
111             notificationTopicMap.clear();
112             setActive(false);
113         }
114     }
115
116     private String getStreamName() {
117         return getSourceName();
118     }
119
120     @Override
121     boolean registerNotificationTopic(final SchemaPath notificationPath, final TopicId topicId) {
122         if (!checkNotificationPath(notificationPath)) {
123             LOG.debug("Bad SchemaPath for notification try to register");
124             return false;
125         }
126
127         activateNotificationSource();
128         if (!isActive()) {
129             LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
130                     notificationPath);
131             return false;
132         }
133
134         ListenerRegistration<DOMNotificationListener> registration =
135                 mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
136         notificationRegistrationMap.put(notificationPath, registration);
137         Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
138         topicIds.add(topicId);
139
140         notificationTopicMap.put(notificationPath, topicIds);
141         return true;
142     }
143
144     @Override
145     synchronized void unRegisterNotificationTopic(final TopicId topicId) {
146         List<SchemaPath> notificationPathToRemove = new ArrayList<>();
147         for (SchemaPath notifKey : notificationTopicMap.keySet()) {
148             Set<TopicId> topicList = notificationTopicMap.get(notifKey);
149             if (topicList != null) {
150                 topicList.remove(topicId);
151                 if (topicList.isEmpty()) {
152                     notificationPathToRemove.add(notifKey);
153                 }
154             }
155         }
156         for (SchemaPath notifKey : notificationPathToRemove) {
157             notificationTopicMap.remove(notifKey);
158             ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
159             if (reg != null) {
160                 reg.close();
161             }
162         }
163     }
164
165     @Override
166     public void close() {
167         closeStream();
168     }
169
170 }