X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Feventsources%2Fnetconf%2FStreamNotificationTopicRegistration.java;h=64ddb31d932dc83976faacc873fc9279f4b4527b;hp=2e654d0b8b3cb49db462c22f72fc63d1e5fc3e9e;hb=23fe9ca678ada6263fec5dd996f4025e4a32fcf5;hpb=071a641d7c12c0e6112d5ce0afe806b54f116ed2 diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java index 2e654d0b8b..64ddb31d93 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java @@ -1,204 +1,204 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.messagebus.eventsources.netconf; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; - -public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { - - private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); - private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); - private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); - private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime")); - - final private DOMMountPoint domMountPoint; - final private String nodeId; - final private NetconfEventSource netconfEventSource; - final private Stream stream; - private Date lastEventTime; - - private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); - private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); - - public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { - super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); - this.domMountPoint = netconfEventSource.getDOMMountPoint(); - this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); - this.netconfEventSource = netconfEventSource; - this.stream = stream; - this.lastEventTime= null; - setReplaySupported(this.stream.isReplaySupport()); - setActive(false); - LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); - } - - void activateNotificationSource() { - if(isActive() == false){ - LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())) - .build(); - CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - try { - csFuture.checkedGet(); - setActive(true); - } catch (DOMRpcException e) { - LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); - setActive(false); - return; - } - } else { - LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); - } - } - - void reActivateNotificationSource(){ - if(isActive()){ - LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); - DataContainerNodeAttrBuilder inputBuilder = - Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); - if(isReplaySupported() && this.getLastEventTime() != null){ - inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); - } - final ContainerNode input = inputBuilder.build(); - CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - try { - csFuture.checkedGet(); - setActive(true); - } catch (DOMRpcException e) { - LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); - setActive(false); - return; - } - } - } - - @Override - void deActivateNotificationSource() { - // no operations need - } - - private void closeStream() { - if(isActive()){ - for(ListenerRegistration reg : notificationRegistrationMap.values()){ - reg.close(); - } - notificationRegistrationMap.clear(); - notificationTopicMap.clear(); - setActive(false); - } - } - - private String getStreamName() { - return getSourceName(); - } - - @Override - ArrayList getNotificationTopicIds(SchemaPath notificationPath){ - return notificationTopicMap.get(notificationPath); - } - - @Override - boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){ - - if(checkNotificationPath(notificationPath) == false){ - LOG.debug("Bad SchemaPath for notification try to register"); - return false; - } - - final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); - if(notifyService.isPresent() == false){ - LOG.debug("DOMNotificationService is not present"); - return false; - } - - activateNotificationSource(); - if(isActive() == false){ - LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); - return false; - } - - ListenerRegistration registration = - notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); - notificationRegistrationMap.put(notificationPath, registration); - ArrayList topicIds = getNotificationTopicIds(notificationPath); - if(topicIds == null){ - topicIds = new ArrayList<>(); - topicIds.add(topicId); - } else { - if(topicIds.contains(topicId) == false){ - topicIds.add(topicId); - } - } - - notificationTopicMap.put(notificationPath, topicIds); - return true; - } - - @Override - synchronized void unRegisterNotificationTopic(TopicId topicId) { - List notificationPathToRemove = new ArrayList<>(); - for(SchemaPath notifKey : notificationTopicMap.keySet()){ - ArrayList topicList = notificationTopicMap.get(notifKey); - if(topicList != null){ - topicList.remove(topicId); - if(topicList.isEmpty()){ - notificationPathToRemove.add(notifKey); - } - } - } - for(SchemaPath notifKey : notificationPathToRemove){ - notificationTopicMap.remove(notifKey); - ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); - if(reg != null){ - reg.close(); - } - } - } - - Optional getLastEventTime() { - return Optional.fromNullable(lastEventTime); - } - - - void setLastEventTime(Date lastEventTime) { - this.lastEventTime = lastEventTime; - } - - @Override - public void close() throws Exception { - closeStream(); - } - -} +///* +// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. +// * +// * This program and the accompanying materials are made available under the +// * terms of the Eclipse Public License v1.0 which accompanies this distribution, +// * and is available at http://www.eclipse.org/legal/epl-v10.html +// */ +//package org.opendaylight.controller.messagebus.eventsources.netconf; +// +//import java.util.ArrayList; +//import java.util.Date; +//import java.util.List; +//import java.util.concurrent.ConcurrentHashMap; +// +//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +//import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +//import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +//import org.opendaylight.yangtools.concepts.ListenerRegistration; +//import org.opendaylight.yangtools.yang.common.QName; +//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +//import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +//import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; +//import org.opendaylight.yangtools.yang.model.api.SchemaPath; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.google.common.base.Optional; +//import com.google.common.util.concurrent.CheckedFuture; +// +//public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { +// +// private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); +// private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); +// private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); +// private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime")); +// +// final private DOMMountPoint domMountPoint; +// final private String nodeId; +// final private NetconfEventSource netconfEventSource; +// final private Stream stream; +// private Date lastEventTime; +// +// private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); +// private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); +// +// public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { +// super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); +// this.domMountPoint = netconfEventSource.getDOMMountPoint(); +// this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); +// this.netconfEventSource = netconfEventSource; +// this.stream = stream; +// this.lastEventTime= null; +// setReplaySupported(this.stream.isReplaySupport()); +// setActive(false); +// LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); +// } +// +// void activateNotificationSource() { +// if(isActive() == false){ +// LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); +// final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) +// .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())) +// .build(); +// CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); +// try { +// csFuture.checkedGet(); +// setActive(true); +// } catch (DOMRpcException e) { +// LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); +// setActive(false); +// return; +// } +// } else { +// LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); +// } +// } +// +// void reActivateNotificationSource(){ +// if(isActive()){ +// LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); +// DataContainerNodeAttrBuilder inputBuilder = +// Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) +// .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); +// if(isReplaySupported() && this.getLastEventTime() != null){ +// inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); +// } +// final ContainerNode input = inputBuilder.build(); +// CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); +// try { +// csFuture.checkedGet(); +// setActive(true); +// } catch (DOMRpcException e) { +// LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); +// setActive(false); +// return; +// } +// } +// } +// +// @Override +// void deActivateNotificationSource() { +// // no operations need +// } +// +// private void closeStream() { +// if(isActive()){ +// for(ListenerRegistration reg : notificationRegistrationMap.values()){ +// reg.close(); +// } +// notificationRegistrationMap.clear(); +// notificationTopicMap.clear(); +// setActive(false); +// } +// } +// +// private String getStreamName() { +// return getSourceName(); +// } +// +// @Override +// ArrayList getNotificationTopicIds(SchemaPath notificationPath){ +// return notificationTopicMap.get(notificationPath); +// } +// +// @Override +// boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){ +// +// if(checkNotificationPath(notificationPath) == false){ +// LOG.debug("Bad SchemaPath for notification try to register"); +// return false; +// } +// +// final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); +// if(notifyService.isPresent() == false){ +// LOG.debug("DOMNotificationService is not present"); +// return false; +// } +// +// activateNotificationSource(); +// if(isActive() == false){ +// LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); +// return false; +// } +// +// ListenerRegistration registration = +// notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); +// notificationRegistrationMap.put(notificationPath, registration); +// ArrayList topicIds = getNotificationTopicIds(notificationPath); +// if(topicIds == null){ +// topicIds = new ArrayList<>(); +// topicIds.add(topicId); +// } else { +// if(topicIds.contains(topicId) == false){ +// topicIds.add(topicId); +// } +// } +// +// notificationTopicMap.put(notificationPath, topicIds); +// return true; +// } +// +// @Override +// synchronized void unRegisterNotificationTopic(TopicId topicId) { +// List notificationPathToRemove = new ArrayList<>(); +// for(SchemaPath notifKey : notificationTopicMap.keySet()){ +// ArrayList topicList = notificationTopicMap.get(notifKey); +// if(topicList != null){ +// topicList.remove(topicId); +// if(topicList.isEmpty()){ +// notificationPathToRemove.add(notifKey); +// } +// } +// } +// for(SchemaPath notifKey : notificationPathToRemove){ +// notificationTopicMap.remove(notifKey); +// ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); +// if(reg != null){ +// reg.close(); +// } +// } +// } +// +// Optional getLastEventTime() { +// return Optional.fromNullable(lastEventTime); +// } +// +// +// void setLastEventTime(Date lastEventTime) { +// this.lastEventTime = lastEventTime; +// } +// +// @Override +// public void close() throws Exception { +// closeStream(); +// } +// +//}