Use Object.requireNonNull
[netconf.git] / netconf / mdsal-netconf-notification / src / main / java / org / opendaylight / netconf / mdsal / notification / impl / ops / CreateSubscription.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.mdsal.notification.impl.ops;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import java.util.ArrayList;
14 import java.util.Date;
15 import java.util.List;
16 import java.util.Optional;
17 import org.opendaylight.netconf.api.DocumentedException;
18 import org.opendaylight.netconf.api.NetconfSession;
19 import org.opendaylight.netconf.api.xml.XmlElement;
20 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
21 import org.opendaylight.netconf.mapping.api.SessionAwareNetconfOperation;
22 import org.opendaylight.netconf.mdsal.notification.impl.NetconfNotificationManager;
23 import org.opendaylight.netconf.notifications.NetconfNotification;
24 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
25 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
26 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
27 import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
28 import org.opendaylight.netconf.util.messages.SubtreeFilter;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.w3c.dom.Document;
34 import org.w3c.dom.Element;
35
36 /**
37  * Create subscription listens for create subscription requests
38  * and registers notification listeners into notification registry.
39  * Received notifications are sent to the client right away
40  */
41 public class CreateSubscription extends AbstractSingletonNetconfOperation
42         implements SessionAwareNetconfOperation, AutoCloseable {
43
44     private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
45
46     static final String CREATE_SUBSCRIPTION = "create-subscription";
47
48     private final NetconfNotificationRegistry notifications;
49     private final List<NotificationListenerRegistration> subscriptions = new ArrayList<>();
50     private NetconfSession netconfSession;
51
52     public CreateSubscription(final String netconfSessionIdForReporting,
53                               final NetconfNotificationRegistry notifications) {
54         super(netconfSessionIdForReporting);
55         this.notifications = notifications;
56     }
57
58     @Override
59     protected Element handleWithNoSubsequentOperations(final Document document,
60                                                        final XmlElement operationElement) throws DocumentedException {
61         operationElement.checkName(CREATE_SUBSCRIPTION);
62         operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
63         // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
64         // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
65         // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
66
67         final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
68
69         // Replay not supported
70         final Optional<XmlElement> startTime =
71                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
72         checkArgument(!startTime.isPresent(), "StartTime element not yet supported");
73
74         // Stop time not supported
75         final Optional<XmlElement> stopTime =
76                 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
77         checkArgument(!stopTime.isPresent(), "StopTime element not yet supported");
78
79         final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
80
81         requireNonNull(netconfSession);
82         // Premature streams are allowed (meaning listener can register even if no provider is available yet)
83         if (!notifications.isStreamAvailable(streamNameType)) {
84             LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
85                     getNetconfSessionIdForReporting());
86         }
87
88         final NotificationListenerRegistration notificationListenerRegistration = notifications
89                 .registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession, filter));
90         subscriptions.add(notificationListenerRegistration);
91
92         return document.createElement(XmlNetconfConstants.OK);
93     }
94
95     private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
96         final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
97         return stream.isPresent() ? new StreamNameType(stream.get().getTextContent())
98                 : NetconfNotificationManager.BASE_STREAM_NAME;
99     }
100
101     @Override
102     protected String getOperationName() {
103         return CREATE_SUBSCRIPTION;
104     }
105
106     @Override
107     protected String getOperationNamespace() {
108         return CreateSubscriptionInput.QNAME.getNamespace().toString();
109     }
110
111     @Override
112     public void setSession(final NetconfSession session) {
113         this.netconfSession = session;
114     }
115
116     @Override
117     public void close() {
118         netconfSession = null;
119         // Unregister from notification streams
120         for (final NotificationListenerRegistration subscription : subscriptions) {
121             subscription.close();
122         }
123     }
124
125     private static class NotificationSubscription implements NetconfNotificationListener {
126         private final NetconfSession currentSession;
127         private final Optional<XmlElement> filter;
128
129         NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
130             this.currentSession = currentSession;
131             this.filter = filter;
132         }
133
134         @Override
135         public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
136             if (filter.isPresent()) {
137                 try {
138                     final Optional<Document> filtered =
139                             SubtreeFilter.applySubtreeNotificationFilter(this.filter.get(), notification.getDocument());
140                     if (filtered.isPresent()) {
141                         final Date eventTime = notification.getEventTime();
142                         currentSession.sendMessage(new NetconfNotification(filtered.get(), eventTime));
143                     }
144                 } catch (DocumentedException e) {
145                     LOG.warn("Failed to process notification {}", notification, e);
146                     currentSession.sendMessage(notification);
147                 }
148             } else {
149                 currentSession.sendMessage(notification);
150             }
151         }
152     }
153 }