2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.server.events.mdsal;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.Optional;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.opendaylight.netconf.api.DocumentedException;
18 import org.opendaylight.netconf.api.NetconfSession;
19 import org.opendaylight.netconf.api.messages.NotificationMessage;
20 import org.opendaylight.netconf.api.xml.XmlElement;
21 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
22 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationListener;
23 import org.opendaylight.netconf.server.api.notifications.NetconfNotificationRegistry;
24 import org.opendaylight.netconf.server.api.operations.AbstractSingletonNetconfOperation;
25 import org.opendaylight.netconf.server.api.operations.SessionAwareNetconfOperation;
26 import org.opendaylight.netconf.server.spi.SubtreeFilter;
27 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
30 import org.opendaylight.yangtools.concepts.Registration;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.w3c.dom.Document;
34 import org.w3c.dom.Element;
37 * Create subscription listens for create subscription requests and registers notification listeners into notification
38 * registry. Received notifications are sent to the client right away.
40 final class CreateSubscription extends AbstractSingletonNetconfOperation
41 implements SessionAwareNetconfOperation, AutoCloseable {
42 private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
44 static final @NonNull String CREATE_SUBSCRIPTION = "create-subscription";
46 private final List<Registration> subscriptions = new ArrayList<>();
47 private final NetconfNotificationRegistry notifications;
49 private NetconfSession netconfSession;
51 CreateSubscription(final SessionIdType sessionId, final NetconfNotificationRegistry notifications) {
53 this.notifications = requireNonNull(notifications);
57 protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement)
58 throws DocumentedException {
59 operationElement.checkName(CREATE_SUBSCRIPTION);
60 operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
61 // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
62 // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
63 // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
65 final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
67 // Replay not supported
68 final Optional<XmlElement> startTime =
69 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
70 checkArgument(startTime.isEmpty(), "StartTime element not yet supported");
72 // Stop time not supported
73 final Optional<XmlElement> stopTime =
74 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
75 checkArgument(stopTime.isEmpty(), "StopTime element not yet supported");
77 final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
79 requireNonNull(netconfSession);
80 // Premature streams are allowed (meaning listener can register even if no provider is available yet)
81 if (!notifications.isStreamAvailable(streamNameType)) {
82 LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
83 sessionId().getValue());
86 subscriptions.add(notifications.registerNotificationListener(streamNameType,
87 new NotificationSubscription(netconfSession, filter)));
89 return document.createElement(XmlNetconfConstants.OK);
92 private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
93 final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
94 return stream.isPresent() ? new StreamNameType(stream.orElseThrow().getTextContent())
95 : NetconfNotificationManager.BASE_STREAM_NAME;
99 protected String getOperationName() {
100 return CREATE_SUBSCRIPTION;
104 protected String getOperationNamespace() {
105 return CreateSubscriptionInput.QNAME.getNamespace().toString();
109 public void setSession(final NetconfSession session) {
110 netconfSession = session;
114 public void close() {
115 netconfSession = null;
116 // Unregister from notification streams
117 subscriptions.forEach(Registration::close);
118 subscriptions.clear();
121 private static class NotificationSubscription implements NetconfNotificationListener {
122 private final NetconfSession currentSession;
123 private final XmlElement filter;
125 NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
126 this.currentSession = currentSession;
127 this.filter = filter.orElse(null);
131 public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
132 if (filter == null) {
133 currentSession.sendMessage(notification);
138 final Optional<Document> filtered =
139 SubtreeFilter.applySubtreeNotificationFilter(filter, notification.getDocument());
140 if (filtered.isPresent()) {
141 currentSession.sendMessage(new NotificationMessage(filtered.orElseThrow(),
142 notification.getEventTime()));
144 } catch (DocumentedException e) {
145 LOG.warn("Failed to process notification {}", notification, e);
146 currentSession.sendMessage(notification);