Integrate netconf-mapping-api into netconf-server
[netconf.git] / apps / netconf-events-mdsal / src / main / java / org / opendaylight / netconf / server / events / mdsal / CreateSubscription.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.server.events.mdsal;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.Optional;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.opendaylight.netconf.api.DocumentedException;
18 import org.opendaylight.netconf.api.NetconfSession;
19 import org.opendaylight.netconf.api.messages.NotificationMessage;
20 import org.opendaylight.netconf.api.xml.XmlElement;
21 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
22 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
23 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
24 import org.opendaylight.netconf.server.api.operations.AbstractSingletonNetconfOperation;
25 import org.opendaylight.netconf.server.api.operations.SessionAwareNetconfOperation;
26 import org.opendaylight.netconf.server.spi.SubtreeFilter;
27 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
29 import org.opendaylight.yangtools.concepts.Registration;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.w3c.dom.Document;
33 import org.w3c.dom.Element;
34
35 /**
36  * Create subscription listens for create subscription requests and registers notification listeners into notification
37  * registry. Received notifications are sent to the client right away.
38  */
39 final class CreateSubscription extends AbstractSingletonNetconfOperation
40         implements SessionAwareNetconfOperation, AutoCloseable {
41     private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
42
43     static final @NonNull String CREATE_SUBSCRIPTION = "create-subscription";
44
45     private final List<Registration> subscriptions = new ArrayList<>();
46     private final NetconfNotificationRegistry notifications;
47
48     private NetconfSession netconfSession;
49
50     CreateSubscription(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notifications) {
51         super(netconfSessionIdForReporting);
52         this.notifications = requireNonNull(notifications);
53     }
54
55     @Override
56     protected Element handleWithNoSubsequentOperations(final Document document,
57                                                        final XmlElement operationElement) throws DocumentedException {
58         operationElement.checkName(CREATE_SUBSCRIPTION);
59         operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
60         // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
61         // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
62         // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
63
64         final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
65
66         // Replay not supported
67         final Optional<XmlElement> startTime =
68                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
69         checkArgument(startTime.isEmpty(), "StartTime element not yet supported");
70
71         // Stop time not supported
72         final Optional<XmlElement> stopTime =
73                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
74         checkArgument(stopTime.isEmpty(), "StopTime element not yet supported");
75
76         final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
77
78         requireNonNull(netconfSession);
79         // Premature streams are allowed (meaning listener can register even if no provider is available yet)
80         if (!notifications.isStreamAvailable(streamNameType)) {
81             LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
82                     getNetconfSessionIdForReporting());
83         }
84
85         subscriptions.add(notifications.registerNotificationListener(streamNameType,
86             new NotificationSubscription(netconfSession, filter)));
87
88         return document.createElement(XmlNetconfConstants.OK);
89     }
90
91     private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
92         final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
93         return stream.isPresent() ? new StreamNameType(stream.orElseThrow().getTextContent())
94                 : NetconfNotificationManager.BASE_STREAM_NAME;
95     }
96
97     @Override
98     protected String getOperationName() {
99         return CREATE_SUBSCRIPTION;
100     }
101
102     @Override
103     protected String getOperationNamespace() {
104         return CreateSubscriptionInput.QNAME.getNamespace().toString();
105     }
106
107     @Override
108     public void setSession(final NetconfSession session) {
109         netconfSession = session;
110     }
111
112     @Override
113     public void close() {
114         netconfSession = null;
115         // Unregister from notification streams
116         subscriptions.forEach(Registration::close);
117         subscriptions.clear();
118     }
119
120     private static class NotificationSubscription implements NetconfNotificationListener {
121         private final NetconfSession currentSession;
122         private final XmlElement filter;
123
124         NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
125             this.currentSession = currentSession;
126             this.filter = filter.orElse(null);
127         }
128
129         @Override
130         public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
131             if (filter == null) {
132                 currentSession.sendMessage(notification);
133                 return;
134             }
135
136             try {
137                 final Optional<Document> filtered =
138                         SubtreeFilter.applySubtreeNotificationFilter(filter, notification.getDocument());
139                 if (filtered.isPresent()) {
140                     currentSession.sendMessage(new NotificationMessage(filtered.orElseThrow(),
141                         notification.getEventTime()));
142                 }
143             } catch (DocumentedException e) {
144                 LOG.warn("Failed to process notification {}", notification, e);
145                 currentSession.sendMessage(notification);
146             }
147         }
148     }
149 }