Migrate netconf-console to Karaf 4 way
[netconf.git] / netconf / netconf-notifications-impl / src / main / java / org / opendaylight / netconf / notifications / impl / ops / CreateSubscription.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.notifications.impl.ops;
9
10 import com.google.common.base.Preconditions;
11 import java.util.ArrayList;
12 import java.util.Date;
13 import java.util.List;
14 import java.util.Optional;
15 import org.opendaylight.netconf.api.DocumentedException;
16 import org.opendaylight.netconf.api.NetconfSession;
17 import org.opendaylight.netconf.api.xml.XmlElement;
18 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
19 import org.opendaylight.netconf.api.xml.XmlUtil;
20 import org.opendaylight.netconf.mapping.api.SessionAwareNetconfOperation;
21 import org.opendaylight.netconf.notifications.NetconfNotification;
22 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
23 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
24 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
25 import org.opendaylight.netconf.notifications.impl.NetconfNotificationManager;
26 import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
27 import org.opendaylight.netconf.util.messages.SubtreeFilter;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.w3c.dom.Document;
33 import org.w3c.dom.Element;
34
35 /**
36  * Create subscription listens for create subscription requests
37  * and registers notification listeners into notification registry.
38  * Received notifications are sent to the client right away
39  */
40 public class CreateSubscription extends AbstractSingletonNetconfOperation
41         implements SessionAwareNetconfOperation, AutoCloseable {
42
43     private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
44
45     static final String CREATE_SUBSCRIPTION = "create-subscription";
46
47     private final NetconfNotificationRegistry notifications;
48     private final List<NotificationListenerRegistration> subscriptions = new ArrayList<>();
49     private NetconfSession netconfSession;
50
51     public CreateSubscription(final String netconfSessionIdForReporting,
52                               final NetconfNotificationRegistry notifications) {
53         super(netconfSessionIdForReporting);
54         this.notifications = notifications;
55     }
56
57     @Override
58     protected Element handleWithNoSubsequentOperations(final Document document,
59                                                        final XmlElement operationElement) throws DocumentedException {
60         operationElement.checkName(CREATE_SUBSCRIPTION);
61         operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
62         // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
63         // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
64         // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
65
66         final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
67
68         // Replay not supported
69         final Optional<XmlElement> startTime =
70                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
71         Preconditions.checkArgument(!startTime.isPresent(), "StartTime element not yet supported");
72
73         // Stop time not supported
74         final Optional<XmlElement> stopTime =
75                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
76         Preconditions.checkArgument(!stopTime.isPresent(), "StopTime element not yet supported");
77
78         final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
79
80         Preconditions.checkNotNull(netconfSession);
81         // Premature streams are allowed (meaning listener can register even if no provider is available yet)
82         if (!notifications.isStreamAvailable(streamNameType)) {
83             LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
84                     getNetconfSessionIdForReporting());
85         }
86
87         final NotificationListenerRegistration notificationListenerRegistration = notifications
88                 .registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession, filter));
89         subscriptions.add(notificationListenerRegistration);
90
91         return XmlUtil.createElement(document, XmlNetconfConstants.OK);
92     }
93
94     private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
95         final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
96         return stream.isPresent() ? new StreamNameType(stream.get().getTextContent())
97                 : NetconfNotificationManager.BASE_STREAM_NAME;
98     }
99
100     @Override
101     protected String getOperationName() {
102         return CREATE_SUBSCRIPTION;
103     }
104
105     @Override
106     protected String getOperationNamespace() {
107         return CreateSubscriptionInput.QNAME.getNamespace().toString();
108     }
109
110     @Override
111     public void setSession(final NetconfSession session) {
112         this.netconfSession = session;
113     }
114
115     @Override
116     public void close() {
117         netconfSession = null;
118         // Unregister from notification streams
119         for (final NotificationListenerRegistration subscription : subscriptions) {
120             subscription.close();
121         }
122     }
123
124     private static class NotificationSubscription implements NetconfNotificationListener {
125         private final NetconfSession currentSession;
126         private final Optional<XmlElement> filter;
127
128         NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
129             this.currentSession = currentSession;
130             this.filter = filter;
131         }
132
133         @Override
134         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
135             if (filter.isPresent()) {
136                 try {
137                     final Optional<Document> filtered =
138                             SubtreeFilter.applySubtreeNotificationFilter(this.filter.get(), notification.getDocument());
139                     if (filtered.isPresent()) {
140                         final Date eventTime = notification.getEventTime();
141                         currentSession.sendMessage(new NetconfNotification(filtered.get(), eventTime));
142                     }
143                 } catch (DocumentedException e) {
144                     LOG.warn("Failed to process notification {}", notification, e);
145                     currentSession.sendMessage(notification);
146                 }
147             } else {
148                 currentSession.sendMessage(notification);
149             }
150         }
151     }
152 }