2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.notifications.impl.ops;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.Date;
15 import java.util.List;
16 import org.opendaylight.netconf.api.DocumentedException;
17 import org.opendaylight.netconf.api.NetconfSession;
18 import org.opendaylight.netconf.api.xml.XmlElement;
19 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
20 import org.opendaylight.netconf.api.xml.XmlUtil;
21 import org.opendaylight.netconf.mapping.api.SessionAwareNetconfOperation;
22 import org.opendaylight.netconf.notifications.NetconfNotification;
23 import org.opendaylight.netconf.notifications.NetconfNotificationListener;
24 import org.opendaylight.netconf.notifications.NetconfNotificationRegistry;
25 import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
26 import org.opendaylight.netconf.notifications.impl.NetconfNotificationManager;
27 import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
28 import org.opendaylight.netconf.util.messages.SubtreeFilter;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.w3c.dom.Document;
34 import org.w3c.dom.Element;
37 * Create subscription listens for create subscription requests
38 * and registers notification listeners into notification registry.
39 * Received notifications are sent to the client right away
41 public class CreateSubscription extends AbstractSingletonNetconfOperation
42 implements SessionAwareNetconfOperation, AutoCloseable {
44 private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
46 static final String CREATE_SUBSCRIPTION = "create-subscription";
48 private final NetconfNotificationRegistry notifications;
49 private final List<NotificationListenerRegistration> subscriptions = Lists.newArrayList();
50 private NetconfSession netconfSession;
52 public CreateSubscription(final String netconfSessionIdForReporting,
53 final NetconfNotificationRegistry notifications) {
54 super(netconfSessionIdForReporting);
55 this.notifications = notifications;
59 protected Element handleWithNoSubsequentOperations(final Document document,
60 final XmlElement operationElement) throws DocumentedException {
61 operationElement.checkName(CREATE_SUBSCRIPTION);
62 operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
63 // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
64 // Binding doesn't support anyxml nodes yet, so filter could not be retrieved
65 // xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
67 final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
69 // Replay not supported
70 final Optional<XmlElement> startTime =
71 operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
72 Preconditions.checkArgument(!startTime.isPresent(), "StartTime element not yet supported");
74 // Stop time not supported
75 final Optional<XmlElement> stopTime =
76 operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
77 Preconditions.checkArgument(!stopTime.isPresent(), "StopTime element not yet supported");
79 final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
81 Preconditions.checkNotNull(netconfSession);
82 // Premature streams are allowed (meaning listener can register even if no provider is available yet)
83 if (!notifications.isStreamAvailable(streamNameType)) {
84 LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
85 getNetconfSessionIdForReporting());
88 final NotificationListenerRegistration notificationListenerRegistration = notifications
89 .registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession, filter));
90 subscriptions.add(notificationListenerRegistration);
92 return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.absent());
95 private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
96 final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
97 return stream.isPresent() ? new StreamNameType(stream.get().getTextContent())
98 : NetconfNotificationManager.BASE_STREAM_NAME;
102 protected String getOperationName() {
103 return CREATE_SUBSCRIPTION;
107 protected String getOperationNamespace() {
108 return CreateSubscriptionInput.QNAME.getNamespace().toString();
112 public void setSession(final NetconfSession session) {
113 this.netconfSession = session;
117 public void close() {
118 netconfSession = null;
119 // Unregister from notification streams
120 for (final NotificationListenerRegistration subscription : subscriptions) {
121 subscription.close();
125 private static class NotificationSubscription implements NetconfNotificationListener {
126 private final NetconfSession currentSession;
127 private final Optional<XmlElement> filter;
129 NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
130 this.currentSession = currentSession;
131 this.filter = filter;
135 public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
136 if (filter.isPresent()) {
138 final Optional<Document> filtered =
139 SubtreeFilter.applySubtreeNotificationFilter(this.filter.get(), notification.getDocument());
140 if (filtered.isPresent()) {
141 final Date eventTime = notification.getEventTime();
142 currentSession.sendMessage(new NetconfNotification(filtered.get(), eventTime));
144 } catch (DocumentedException e) {
145 LOG.warn(e.toString());
146 currentSession.sendMessage(notification);
149 currentSession.sendMessage(notification);