BUG 3121 - destroy topic implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.messagebus.eventsources.netconf;
9
10 import java.util.ArrayList;
11 import java.util.Date;
12 import java.util.List;
13 import java.util.concurrent.ConcurrentHashMap;
14
15 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
16 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
17 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
18 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
19 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
20 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
21 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
22 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
23 import org.opendaylight.yangtools.concepts.ListenerRegistration;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
27 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
28 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
29 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
30 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import com.google.common.base.Optional;
35 import com.google.common.util.concurrent.CheckedFuture;
36
37 public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
38
39     private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
40     private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
41     private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
42     private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime"));
43
44     final private DOMMountPoint domMountPoint;
45     final private String nodeId;
46     final private NetconfEventSource netconfEventSource;
47     final private Stream stream;
48     private Date lastEventTime;
49
50     private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
51     private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
52
53     public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) {
54         super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
55         this.domMountPoint = netconfEventSource.getDOMMountPoint();
56         this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
57         this.netconfEventSource = netconfEventSource;
58         this.stream = stream;
59         this.lastEventTime= null;
60         setReplaySupported(this.stream.isReplaySupport());
61         setActive(false);
62         LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
63     }
64
65     void activateNotificationSource() {
66         if(isActive() == false){
67             LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
68             final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
69                     .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()))
70                     .build();
71             CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
72             try {
73                 csFuture.checkedGet();
74                 setActive(true);
75             } catch (DOMRpcException e) {
76                 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
77                 setActive(false);
78                 return;
79             }
80         } else {
81             LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
82         }
83     }
84
85     void reActivateNotificationSource(){
86         if(isActive()){
87             LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
88             DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder =
89                     Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
90                     .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
91             if(isReplaySupported() && this.getLastEventTime() != null){
92                 inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
93             }
94             final ContainerNode input = inputBuilder.build();
95             CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
96             try {
97                 csFuture.checkedGet();
98                 setActive(true);
99             } catch (DOMRpcException e) {
100                 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
101                 setActive(false);
102                 return;
103             }
104         }
105     }
106
107     @Override
108     void deActivateNotificationSource() {
109         // no operations need
110     }
111
112     private void closeStream() {
113         if(isActive()){
114             for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
115                 reg.close();
116             }
117             notificationRegistrationMap.clear();
118             notificationTopicMap.clear();
119             setActive(false);
120         }
121     }
122
123     private String getStreamName() {
124         return getSourceName();
125     }
126
127     @Override
128     ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
129         return notificationTopicMap.get(notificationPath);
130     }
131
132     @Override
133     boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
134
135         if(checkNotificationPath(notificationPath) == false){
136             LOG.debug("Bad SchemaPath for notification try to register");
137             return false;
138         }
139
140         final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
141         if(notifyService.isPresent() == false){
142             LOG.debug("DOMNotificationService is not present");
143             return false;
144         }
145
146         activateNotificationSource();
147         if(isActive() == false){
148             LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString());
149             return false;
150         }
151
152         ListenerRegistration<NetconfEventSource> registration =
153                 notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
154         notificationRegistrationMap.put(notificationPath, registration);
155         ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
156         if(topicIds == null){
157             topicIds = new ArrayList<>();
158             topicIds.add(topicId);
159         } else {
160             if(topicIds.contains(topicId) == false){
161                 topicIds.add(topicId);
162             }
163         }
164
165         notificationTopicMap.put(notificationPath, topicIds);
166         return true;
167     }
168
169     @Override
170     synchronized void unRegisterNotificationTopic(TopicId topicId) {
171         List<SchemaPath> notificationPathToRemove = new ArrayList<>();
172         for(SchemaPath notifKey : notificationTopicMap.keySet()){
173             ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
174             if(topicList != null){
175                 topicList.remove(topicId);
176                 if(topicList.isEmpty()){
177                     notificationPathToRemove.add(notifKey);
178                 }
179             }
180         }
181         for(SchemaPath notifKey : notificationPathToRemove){
182             notificationTopicMap.remove(notifKey);
183             ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
184             if(reg != null){
185                 reg.close();
186             }
187         }
188     }
189
190     Optional<Date> getLastEventTime() {
191         return Optional.fromNullable(lastEventTime);
192     }
193
194
195     void setLastEventTime(Date lastEventTime) {
196         this.lastEventTime = lastEventTime;
197     }
198
199     @Override
200     public void close() throws Exception {
201         closeStream();
202     }
203
204 }