* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
-import com.google.common.base.Optional;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
.build();
private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
-
- private final EventSourceTopology eventSourceTopology;
private final Map<String, String> streamMap;
-
- private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
- private final ListenerRegistration<DataChangeListener> listenerReg;
+ private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
private final DOMNotificationPublishService publishService;
private final DOMMountPointService domMounts;
private final MountPointService bindingMounts;
+ private ListenerRegistration<DataChangeListener> listenerRegistration;
+ private final EventSourceRegistry eventSourceRegistry;
+
+ public static NetconfEventSourceManager create(final DataBroker dataBroker,
+ final DOMNotificationPublishService domPublish,
+ final DOMMountPointService domMount,
+ final MountPointService bindingMount,
+ final EventSourceRegistry eventSourceRegistry,
+ final List<NamespaceToStream> namespaceMapping){
+
+ final NetconfEventSourceManager eventSourceManager =
+ new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
- public NetconfEventSourceManager(final DataBroker dataStore,
- final DOMNotificationPublishService domPublish,
+ eventSourceManager.initialize(dataBroker);
+
+ return eventSourceManager;
+
+ }
+
+ private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
final DOMMountPointService domMount,
final MountPointService bindingMount,
- final EventSourceTopology eventSourceTopology,
+ final EventSourceRegistry eventSourceRegistry,
final List<NamespaceToStream> namespaceMapping) {
- listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
- this.eventSourceTopology = eventSourceTopology;
+ Preconditions.checkNotNull(domPublish);
+ Preconditions.checkNotNull(domMount);
+ Preconditions.checkNotNull(bindingMount);
+ Preconditions.checkNotNull(eventSourceRegistry);
+ Preconditions.checkNotNull(namespaceMapping);
this.streamMap = namespaceToStreamMapping(namespaceMapping);
this.domMounts = domMount;
this.bindingMounts = bindingMount;
this.publishService = domPublish;
- LOGGER.info("EventSourceManager initialized.");
+ this.eventSourceRegistry = eventSourceRegistry;
+ }
+
+ private void initialize(final DataBroker dataBroker){
+ Preconditions.checkNotNull(dataBroker);
+ listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+ LOGGER.info("NetconfEventSourceManager initialized.");
}
private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
@Override
public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+
LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
}
}
-
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
}
}
-
}
private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
// we listen on node tree, therefore we should rather throw IllegalStateException when node is null
if ( node == null ) {
- LOGGER.debug("OnDataChanged Event. Node is null.");
- return;
+ throw new IllegalStateException("Node is null");
}
if ( isNetconfNode(node) == false ) {
LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
return;
}
- if(!netconfSources.containsKey(key)) {
+ if(!eventSourceRegistration.containsKey(key)) {
createEventSource(key,node);
}
}
final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
if(netconfMount.isPresent() && bindingMount.isPresent()) {
- final String nodeId = node.getNodeId().getValue();
- final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
- eventSourceTopology.register(node,netconfEventSource);
- netconfSources.putIfAbsent(key, netconfEventSource);
+
+ final NetconfEventSource netconfEventSource =
+ new NetconfEventSource(node, streamMap, netconfMount.get(), publishService, bindingMount.get());
+ final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
+ eventSourceRegistration.putIfAbsent(key, registration);
+
}
}
return node.getAugmentation(NetconfNode.class) != null ;
}
- public boolean isEventSource(final Node node) {
- final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ private boolean isEventSource(final Node node) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
return isEventSource(netconfNode);
+
}
private boolean isEventSource(final NetconfNode node) {
+ if (node.getAvailableCapabilities() == null) {
+ return false;
+ }
+ final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
+ if(capabilities == null) {
+ return false;
+ }
for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
return true;
@Override
public void close() {
- listenerReg.close();
+ for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+ reg.close();
+ }
+ listenerRegistration.close();
}
+
}
\ No newline at end of file