2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server.events.mdsal;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.Optional;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.opendaylight.netconf.api.DocumentedException;
18 import org.opendaylight.netconf.api.NetconfSession;
19 import org.opendaylight.netconf.api.messages.NotificationMessage;
20 import org.opendaylight.netconf.api.xml.XmlElement;
21 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
22 import org.opendaylight.netconf.mapping.api.SessionAwareNetconfOperation;
23 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
24 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
25 import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
26 import org.opendaylight.netconf.util.messages.SubtreeFilter;
27 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
29 import org.opendaylight.yangtools.concepts.Registration;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.w3c.dom.Document;
33 import org.w3c.dom.Element;
36 * Create subscription listens for create subscription requests and registers notification listeners into notification
37 * registry. Received notifications are sent to the client right away.
39 final class CreateSubscription extends AbstractSingletonNetconfOperation
40 implements SessionAwareNetconfOperation, AutoCloseable {
41 private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
43 static final @NonNull String CREATE_SUBSCRIPTION = "create-subscription";
45 private final List<Registration> subscriptions = new ArrayList<>();
46 private final NetconfNotificationRegistry notifications;
48 private NetconfSession netconfSession;
50 CreateSubscription(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notifications) {
51 super(netconfSessionIdForReporting);
52 this.notifications = requireNonNull(notifications);
56 protected Element handleWithNoSubsequentOperations(final Document document,
57 final XmlElement operationElement) throws DocumentedException {
58 operationElement.checkName(CREATE_SUBSCRIPTION);
59 operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
60 // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
61 // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
62 // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
64 final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
66 // Replay not supported
67 final Optional<XmlElement> startTime =
68 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
69 checkArgument(startTime.isEmpty(), "StartTime element not yet supported");
71 // Stop time not supported
72 final Optional<XmlElement> stopTime =
73 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
74 checkArgument(stopTime.isEmpty(), "StopTime element not yet supported");
76 final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
78 requireNonNull(netconfSession);
79 // Premature streams are allowed (meaning listener can register even if no provider is available yet)
80 if (!notifications.isStreamAvailable(streamNameType)) {
81 LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
82 getNetconfSessionIdForReporting());
85 subscriptions.add(notifications.registerNotificationListener(streamNameType,
86 new NotificationSubscription(netconfSession, filter)));
88 return document.createElement(XmlNetconfConstants.OK);
91 private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
92 final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
93 return stream.isPresent() ? new StreamNameType(stream.orElseThrow().getTextContent())
94 : NetconfNotificationManager.BASE_STREAM_NAME;
98 protected String getOperationName() {
99 return CREATE_SUBSCRIPTION;
103 protected String getOperationNamespace() {
104 return CreateSubscriptionInput.QNAME.getNamespace().toString();
108 public void setSession(final NetconfSession session) {
109 netconfSession = session;
113 public void close() {
114 netconfSession = null;
115 // Unregister from notification streams
116 subscriptions.forEach(Registration::close);
117 subscriptions.clear();
120 private static class NotificationSubscription implements NetconfNotificationListener {
121 private final NetconfSession currentSession;
122 private final XmlElement filter;
124 NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
125 this.currentSession = currentSession;
126 this.filter = filter.orElse(null);
130 public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
131 if (filter == null) {
132 currentSession.sendMessage(notification);
137 final Optional<Document> filtered =
138 SubtreeFilter.applySubtreeNotificationFilter(filter, notification.getDocument());
139 if (filtered.isPresent()) {
140 currentSession.sendMessage(new NotificationMessage(filtered.orElseThrow(),
141 notification.getEventTime()));
143 } catch (DocumentedException e) {
144 LOG.warn("Failed to process notification {}", notification, e);
145 currentSession.sendMessage(notification);