Merge "Remove blocking checkedGet call"
[netconf.git] / netconf / netconf-notifications-impl / src / main / java / org / opendaylight / netconf / notifications / impl / ops / CreateSubscription.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.notifications.impl.ops;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.Date;
15 import java.util.List;
16 import org.opendaylight.controller.config.util.xml.DocumentedException;
17 import org.opendaylight.controller.config.util.xml.XmlElement;
18 import org.opendaylight.controller.config.util.xml.XmlUtil;
19 import org.opendaylight.netconf.api.NetconfSession;
20 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
21 import org.opendaylight.netconf.mapping.api.SessionAwareNetconfOperation;
22 import org.opendaylight.netconf.notifications.NetconfNotification;
23 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
24 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
25 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
26 import org.opendaylight.netconf.notifications.impl.NetconfNotificationManager;
27 import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
28 import org.opendaylight.netconf.util.messages.SubtreeFilter;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.w3c.dom.Document;
34 import org.w3c.dom.Element;
35
36 /**
37  * Create subscription listens for create subscription requests and registers notification listeners into notification registry.
38  * Received notifications are sent to the client right away
39  */
40 public class CreateSubscription extends AbstractSingletonNetconfOperation implements SessionAwareNetconfOperation, AutoCloseable {
41
42     private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
43
44     static final String CREATE_SUBSCRIPTION = "create-subscription";
45
46     private final NetconfNotificationRegistry notifications;
47     private final List<NotificationListenerRegistration> subscriptions = Lists.newArrayList();
48     private NetconfSession netconfSession;
49
50     public CreateSubscription(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notifications) {
51         super(netconfSessionIdForReporting);
52         this.notifications = notifications;
53     }
54
55     @Override
56     protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws DocumentedException {
57         operationElement.checkName(CREATE_SUBSCRIPTION);
58         operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
59         // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
60         // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
61         // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
62
63         final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
64
65         // Replay not supported
66         final Optional<XmlElement> startTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
67         Preconditions.checkArgument(startTime.isPresent() == false, "StartTime element not yet supported");
68
69         // Stop time not supported
70         final Optional<XmlElement> stopTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
71         Preconditions.checkArgument(stopTime.isPresent() == false, "StopTime element not yet supported");
72
73         final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
74
75         Preconditions.checkNotNull(netconfSession);
76         // Premature streams are allowed (meaning listener can register even if no provider is available yet)
77         if(notifications.isStreamAvailable(streamNameType) == false) {
78             LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType, getNetconfSessionIdForReporting());
79         }
80
81         final NotificationListenerRegistration notificationListenerRegistration =
82                 notifications.registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession, filter));
83         subscriptions.add(notificationListenerRegistration);
84
85         return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
86     }
87
88     private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
89         final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
90         return stream.isPresent() ? new StreamNameType(stream.get().getTextContent()) : NetconfNotificationManager.BASE_STREAM_NAME;
91     }
92
93     @Override
94     protected String getOperationName() {
95         return CREATE_SUBSCRIPTION;
96     }
97
98     @Override
99     protected String getOperationNamespace() {
100         return CreateSubscriptionInput.QNAME.getNamespace().toString();
101     }
102
103     @Override
104     public void setSession(final NetconfSession session) {
105         this.netconfSession = session;
106     }
107
108     @Override
109     public void close() {
110         netconfSession = null;
111         // Unregister from notification streams
112         for (final NotificationListenerRegistration subscription : subscriptions) {
113             subscription.close();
114         }
115     }
116
117     private static class NotificationSubscription implements NetconfNotificationListener {
118         private final NetconfSession currentSession;
119         private final Optional<XmlElement> filter;
120
121         public NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
122             this.currentSession = currentSession;
123             this.filter = filter;
124         }
125
126         @Override
127         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
128             if (filter.isPresent()) {
129                 try {
130                     final Optional<Document> filtered = SubtreeFilter.applySubtreeNotificationFilter(this.filter.get(), notification.getDocument());
131                     if (filtered.isPresent()) {
132                         final Date eventTime = notification.getEventTime();
133                         currentSession.sendMessage(new NetconfNotification(filtered.get(), eventTime));
134                     }
135                 } catch (DocumentedException e) {
136                     LOG.warn(e.toString());
137                     currentSession.sendMessage(notification);
138                 }
139             } else {
140                 currentSession.sendMessage(notification);
141             }
142         }
143     }
144 }