Clean up NotificationMessage
[netconf.git] / netconf / mdsal-netconf-notification / src / main / java / org / opendaylight / netconf / mdsal / notification / impl / CreateSubscription.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.mdsal.notification.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.Optional;
16 import org.opendaylight.netconf.api.DocumentedException;
17 import org.opendaylight.netconf.api.NetconfSession;
18 import org.opendaylight.netconf.api.messages.NotificationMessage;
19 import org.opendaylight.netconf.api.xml.XmlElement;
20 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
21 import org.opendaylight.netconf.mapping.api.SessionAwareNetconfOperation;
22 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
23 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
24 import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
25 import org.opendaylight.netconf.util.messages.SubtreeFilter;
26 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
27 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
28 import org.opendaylight.yangtools.concepts.Registration;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.w3c.dom.Document;
32 import org.w3c.dom.Element;
33
34 /**
35  * Create subscription listens for create subscription requests
36  * and registers notification listeners into notification registry.
37  * Received notifications are sent to the client right away
38  */
39 public class CreateSubscription extends AbstractSingletonNetconfOperation
40         implements SessionAwareNetconfOperation, AutoCloseable {
41
42     private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
43
44     static final String CREATE_SUBSCRIPTION = "create-subscription";
45
46     private final NetconfNotificationRegistry notifications;
47     private final List<Registration> subscriptions = new ArrayList<>();
48     private NetconfSession netconfSession;
49
50     public CreateSubscription(final String netconfSessionIdForReporting,
51                               final NetconfNotificationRegistry notifications) {
52         super(netconfSessionIdForReporting);
53         this.notifications = notifications;
54     }
55
56     @Override
57     protected Element handleWithNoSubsequentOperations(final Document document,
58                                                        final XmlElement operationElement) throws DocumentedException {
59         operationElement.checkName(CREATE_SUBSCRIPTION);
60         operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
61         // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
62         // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
63         // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
64
65         final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
66
67         // Replay not supported
68         final Optional<XmlElement> startTime =
69                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
70         checkArgument(startTime.isEmpty(), "StartTime element not yet supported");
71
72         // Stop time not supported
73         final Optional<XmlElement> stopTime =
74                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
75         checkArgument(stopTime.isEmpty(), "StopTime element not yet supported");
76
77         final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
78
79         requireNonNull(netconfSession);
80         // Premature streams are allowed (meaning listener can register even if no provider is available yet)
81         if (!notifications.isStreamAvailable(streamNameType)) {
82             LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
83                     getNetconfSessionIdForReporting());
84         }
85
86         subscriptions.add(notifications.registerNotificationListener(streamNameType,
87             new NotificationSubscription(netconfSession, filter)));
88
89         return document.createElement(XmlNetconfConstants.OK);
90     }
91
92     private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
93         final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
94         return stream.isPresent() ? new StreamNameType(stream.orElseThrow().getTextContent())
95                 : NetconfNotificationManager.BASE_STREAM_NAME;
96     }
97
98     @Override
99     protected String getOperationName() {
100         return CREATE_SUBSCRIPTION;
101     }
102
103     @Override
104     protected String getOperationNamespace() {
105         return CreateSubscriptionInput.QNAME.getNamespace().toString();
106     }
107
108     @Override
109     public void setSession(final NetconfSession session) {
110         netconfSession = session;
111     }
112
113     @Override
114     public void close() {
115         netconfSession = null;
116         // Unregister from notification streams
117         subscriptions.forEach(Registration::close);
118         subscriptions.clear();
119     }
120
121     private static class NotificationSubscription implements NetconfNotificationListener {
122         private final NetconfSession currentSession;
123         private final XmlElement filter;
124
125         NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
126             this.currentSession = currentSession;
127             this.filter = filter.orElse(null);
128         }
129
130         @Override
131         public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
132             if (filter == null) {
133                 currentSession.sendMessage(notification);
134                 return;
135             }
136
137             try {
138                 final Optional<Document> filtered =
139                         SubtreeFilter.applySubtreeNotificationFilter(filter, notification.getDocument());
140                 if (filtered.isPresent()) {
141                     currentSession.sendMessage(new NotificationMessage(filtered.orElseThrow(),
142                         notification.getEventTime()));
143                 }
144             } catch (DocumentedException e) {
145                 LOG.warn("Failed to process notification {}", notification, e);
146                 currentSession.sendMessage(notification);
147             }
148         }
149     }
150 }