import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import java.util.Date;
import java.util.List;
import org.opendaylight.controller.config.util.xml.DocumentedException;
import org.opendaylight.controller.config.util.xml.XmlElement;
import org.opendaylight.netconf.notifications.NotificationListenerRegistration;
import org.opendaylight.netconf.notifications.impl.NetconfNotificationManager;
import org.opendaylight.netconf.util.mapping.AbstractSingletonNetconfOperation;
+import org.opendaylight.netconf.util.messages.SubtreeFilter;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
import org.slf4j.Logger;
import org.w3c.dom.Element;
/**
- * Create subscription listens for create subscription requests and registers notification listeners into notification registry.
+ * Create subscription listens for create subscription requests
+ * and registers notification listeners into notification registry.
* Received notifications are sent to the client right away
*/
-public class CreateSubscription extends AbstractSingletonNetconfOperation implements SessionAwareNetconfOperation, AutoCloseable {
+public class CreateSubscription extends AbstractSingletonNetconfOperation
+ implements SessionAwareNetconfOperation, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
private final List<NotificationListenerRegistration> subscriptions = Lists.newArrayList();
private NetconfSession netconfSession;
- public CreateSubscription(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notifications) {
+ public CreateSubscription(final String netconfSessionIdForReporting,
+ final NetconfNotificationRegistry notifications) {
super(netconfSessionIdForReporting);
this.notifications = notifications;
}
@Override
- protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws DocumentedException {
+ protected Element handleWithNoSubsequentOperations(final Document document,
+ final XmlElement operationElement) throws DocumentedException {
operationElement.checkName(CREATE_SUBSCRIPTION);
operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
// FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
// Binding doesn't support anyxml nodes yet, so filter could not be retrieved
// xml -> normalized node -> CreateSubscriptionInput conversion could be slower than current approach
- // FIXME filter could be supported same way as netconf server filters get and get-config results
final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
- Preconditions.checkArgument(filter.isPresent() == false, "Filter element not yet supported");
// Replay not supported
- final Optional<XmlElement> startTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
- Preconditions.checkArgument(startTime.isPresent() == false, "StartTime element not yet supported");
+ final Optional<XmlElement> startTime =
+ operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
+ Preconditions.checkArgument(!startTime.isPresent(), "StartTime element not yet supported");
// Stop time not supported
- final Optional<XmlElement> stopTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
- Preconditions.checkArgument(stopTime.isPresent() == false, "StopTime element not yet supported");
+ final Optional<XmlElement> stopTime =
+ operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
+ Preconditions.checkArgument(!stopTime.isPresent(), "StopTime element not yet supported");
final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
Preconditions.checkNotNull(netconfSession);
// Premature streams are allowed (meaning listener can register even if no provider is available yet)
- if(notifications.isStreamAvailable(streamNameType) == false) {
- LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType, getNetconfSessionIdForReporting());
+ if (!notifications.isStreamAvailable(streamNameType)) {
+ LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType,
+ getNetconfSessionIdForReporting());
}
- final NotificationListenerRegistration notificationListenerRegistration =
- notifications.registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession));
+ final NotificationListenerRegistration notificationListenerRegistration = notifications
+ .registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession, filter));
subscriptions.add(notificationListenerRegistration);
return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
private static StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws DocumentedException {
final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
- return stream.isPresent() ? new StreamNameType(stream.get().getTextContent()) : NetconfNotificationManager.BASE_STREAM_NAME;
+ return stream.isPresent() ? new StreamNameType(stream.get().getTextContent())
+ : NetconfNotificationManager.BASE_STREAM_NAME;
}
@Override
private static class NotificationSubscription implements NetconfNotificationListener {
private final NetconfSession currentSession;
+ private final Optional<XmlElement> filter;
- public NotificationSubscription(final NetconfSession currentSession) {
+ NotificationSubscription(final NetconfSession currentSession, final Optional<XmlElement> filter) {
this.currentSession = currentSession;
+ this.filter = filter;
}
@Override
public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
- currentSession.sendMessage(notification);
+ if (filter.isPresent()) {
+ try {
+ final Optional<Document> filtered =
+ SubtreeFilter.applySubtreeNotificationFilter(this.filter.get(), notification.getDocument());
+ if (filtered.isPresent()) {
+ final Date eventTime = notification.getEventTime();
+ currentSession.sendMessage(new NetconfNotification(filtered.get(), eventTime));
+ }
+ } catch (DocumentedException e) {
+ LOG.warn(e.toString());
+ currentSession.sendMessage(notification);
+ }
+ } else {
+ currentSession.sendMessage(notification);
+ }
}
}
}