X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=6140a78ba586a7beaa89a92ae0ab9cc5fb109016;hp=c0700971ddb52fe53f4bd33f5fa5e21546835fc4;hb=1447e0132075bbd3013aa41b98384a373bd82d1a;hpb=3997099eb61b0f2adc47f7a85952c324e9de223f diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index c0700971dd..6140a78ba5 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -11,96 +11,228 @@ package org.opendaylight.controller.messagebus.app.impl; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.regex.Pattern; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.mdsal.DataStore; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; -public class EventSourceTopology { - private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class); - private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ; - private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId)); - private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL; +public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); - private static final InstanceIdentifier topologyInstanceIdentifier = + private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ; + private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID)); + private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL; + + private static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = InstanceIdentifier.create(NetworkTopology.class) - .child(Topology.class, topologyKey); + .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); - private static final InstanceIdentifier topologyTypeInstanceIdentifier = - topologyInstanceIdentifier + private static final InstanceIdentifier TOPOLOGY_TYPE_PATH = + EVENT_SOURCE_TOPOLOGY_PATH .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private static final InstanceIdentifier eventSourceTopologyPath = - InstanceIdentifier.create(NetworkTopology.class) - .child(Topology.class) - .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang - .network.topology.rev131021.network.topology.topology.Node.class); - - private final Map> registrations = + private final Map> topicListenerRegistrations = new ConcurrentHashMap<>(); + private final Map> routedRpcRegistrations = + new ConcurrentHashMap<>(); + + private final DataBroker dataBroker; + private final RpcRegistration aggregatorRpcReg; + private final EventSourceService eventSourceService; + private final RpcProviderRegistry rpcRegistry; + + public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + + this.dataBroker = dataBroker; + this.rpcRegistry = rpcRegistry; + aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); + eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); + + final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); + final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + LOG.info("EventSourceRegistry has been initialized"); + } + + private void putData(final LogicalDatastoreType store, + final InstanceIdentifier path, + final T data){ + + final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + tx.put(store, path, data, true); + tx.submit(); + + } + + private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ + final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + tx.delete(OPERATIONAL, path); + tx.submit(); + } + + private void insert(final KeyedInstanceIdentifier sourcePath) { + final NodeKey nodeKey = sourcePath.getKey(); + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); + putData(OPERATIONAL, augmentPath, nodeAgument); + } + + private void remove(final KeyedInstanceIdentifier sourcePath){ + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + deleteData(OPERATIONAL, augmentPath); + } + + private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ + + final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); + + final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ - private final DataStore dataStore; + @Override + public void onSuccess(Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + for (final Node node : nodes) { + if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { + eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + tx.close(); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Can not notify existing nodes {}", t); + tx.close(); + } + + }); - public EventSourceTopology(DataStore dataStore) { - this.dataStore = dataStore; } - public void mdsalReady() { - TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); - TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + @Override + public Future> createTopic(final CreateTopicInput input) { + LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", + input.getNotificationPattern(), + input.getNodeIdPattern()); + + final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); + final String nodeIdPattern = input.getNodeIdPattern().getValue(); + final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern); + final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); + + registerTopic(eventSourceTopic); - dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment); + notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic); + + final CreateTopicOutput cto = new CreateTopicOutputBuilder() + .setTopicId(eventSourceTopic.getTopicId()) + .build(); + + return Util.resultRpcSuccessFor(cto); } - public void insert(Node node) { - String nodeId = node.getKey().getId().getValue(); - NodeKey nodeKey = new NodeKey(new NodeId(nodeId)); - InstanceIdentifier topologyNodeAugment - = topologyInstanceIdentifier - .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang - .network.topology.rev131021.network.topology.topology.Node.class, nodeKey) - .augmentation(Node1.class); - - Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build(); - dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument); + @Override + public Future> destroyTopic(final DestroyTopicInput input) { + return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented")); } - // TODO: Should we expose this functioanlity over RPC? - public List snapshot() { - Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier); - return topology.getNode(); + @Override + public void close() { + aggregatorRpcReg.close(); + for(ListenerRegistration reg : topicListenerRegistrations.values()){ + reg.close(); + } } - public void registerDataChangeListener(DataChangeListener listener) { - ListenerRegistration listenerRegistration = dataStore.registerDataChangeListener(datastoreType, - eventSourceTopologyPath, + private void registerTopic(final EventSourceTopic listener) { + final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, + EVENT_SOURCE_TOPOLOGY_PATH, listener, DataBroker.DataChangeScope.SUBTREE); - registrations.put(listener, listenerRegistration); + topicListenerRegistrations.put(listener, listenerRegistration); + } + + public void register(final EventSource eventSource){ + NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + reg.registerPath(NodeContext.class, sourcePath); + routedRpcRegistrations.put(nodeKey,reg); + insert(sourcePath); + + for(EventSourceTopic est : topicListenerRegistrations.keySet()){ + est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey)); + } + } + + public void unRegister(final EventSource eventSource){ + final NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + final RoutedRpcRegistration removeRegistration = routedRpcRegistrations.remove(nodeKey); + if(removeRegistration != null){ + removeRegistration.close(); + remove(sourcePath); + } + } + + @Override + public EventSourceRegistration registerEventSource( + T eventSource) { + EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + register(eventSource); + return esr; } } +