e0d4fe21e178b5cd4fd9d6026fecf56a32dcc940
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.messagebus.eventsources.netconf;
9
10 import java.util.ArrayList;
11 import java.util.Date;
12 import java.util.concurrent.ConcurrentHashMap;
13
14 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
15 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
16 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
18 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
19 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
20 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
21 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
22 import org.opendaylight.yangtools.concepts.ListenerRegistration;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
25 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
26 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
27 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
28 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
29 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.google.common.base.Optional;
34 import com.google.common.util.concurrent.CheckedFuture;
35
36 public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
37
38     private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
39     private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
40     private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
41     private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime"));
42
43     final private DOMMountPoint domMountPoint;
44     final private String nodeId;
45     final private NetconfEventSource netconfEventSource;
46     final private Stream stream;
47     private Date lastEventTime;
48
49     private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
50     private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
51
52     public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) {
53         super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
54         this.domMountPoint = netconfEventSource.getDOMMountPoint();
55         this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
56         this.netconfEventSource = netconfEventSource;
57         this.stream = stream;
58         this.lastEventTime= null;
59         setReplaySupported(this.stream.isReplaySupport());
60         setActive(false);
61     }
62
63     void activateNotificationSource() {
64         if(isActive() == false){
65             LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
66             final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
67                     .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()))
68                     .build();
69             CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
70             try {
71                 csFuture.checkedGet();
72                 setActive(true);
73             } catch (DOMRpcException e) {
74                 LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
75                 setActive(false);
76                 return;
77             }
78         } else {
79             LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
80         }
81     }
82
83     void reActivateNotificationSource(){
84         if(isActive()){
85             LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
86             DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder =
87                     Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
88                     .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
89             if(isReplaySupported() && this.getLastEventTime() != null){
90                 inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
91             }
92             final ContainerNode input = inputBuilder.build();
93             CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
94             try {
95                 csFuture.checkedGet();
96                 setActive(true);
97             } catch (DOMRpcException e) {
98                 LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
99                 setActive(false);
100                 return;
101             }
102         }
103     }
104
105     @Override
106     void deActivateNotificationSource() {
107         // no operations need
108     }
109
110     private void closeStream() {
111         if(isActive()){
112             for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
113                 reg.close();
114             }
115             notificationRegistrationMap.clear();
116             notificationTopicMap.clear();
117             setActive(false);
118         }
119     }
120
121     private String getStreamName() {
122         return getSourceName();
123     }
124
125     @Override
126     ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
127         return notificationTopicMap.get(notificationPath);
128     }
129
130     @Override
131     boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
132         if(validateNotificationPath(notificationPath) == false){
133             LOG.debug("Bad SchemaPath for notification try to register");
134             return false;
135         }
136         final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
137         if(notifyService.isPresent() == false){
138             LOG.debug("DOMNotificationService is not present");
139             return false;
140         }
141         ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
142         notificationRegistrationMap.put(notificationPath, registration);
143         ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
144         if(topicIds == null){
145             topicIds = new ArrayList<>();
146             topicIds.add(topicId);
147         } else {
148             if(topicIds.contains(topicId) == false){
149                 topicIds.add(topicId);
150             }
151         }
152         notificationTopicMap.put(notificationPath, topicIds);
153         return true;
154     }
155
156     private boolean validateNotificationPath(SchemaPath notificationPath){
157         if(notificationPath == null){
158             return false;
159         }
160         String nameSpace = notificationPath.getLastComponent().toString();
161         return nameSpace.startsWith(getNotificationUrnPrefix());
162     }
163
164     Optional<Date> getLastEventTime() {
165         return Optional.fromNullable(lastEventTime);
166     }
167
168
169     void setLastEventTime(Date lastEventTime) {
170         this.lastEventTime = lastEventTime;
171     }
172
173     @Override
174     public void close() throws Exception {
175         closeStream();
176     }
177
178     @Override
179     void unRegisterNotificationTopic(TopicId topicId) {
180         // TODO: use it when destroy topic will be implemented
181     }
182
183 }