X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=inline;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=b22eee233e612679da6b766126792f299e426fba;hb=5f587c3e2bfabc09fec49463d04a6fbeba414e9c;hp=603f34bac994d9ff44c8e37ed9f99bca11b17c47;hpb=f3473ee42d45f1524dcafa6cc37e19e0393e9693;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 603f34bac9..b22eee233e 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -5,34 +5,34 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.messagebus.app.util.Util; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder; @@ -40,7 +40,6 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; @@ -49,7 +48,8 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; -import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; @@ -57,152 +57,172 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.regex.Pattern; - -public class EventSourceTopology implements EventAggregatorService, AutoCloseable { +@Deprecated(forRemoval = true) +public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ; private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID)); private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL; - private static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = - InstanceIdentifier.create(NetworkTopology.class) - .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); + static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = + InstanceIdentifier.create(NetworkTopology.class).child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); - private static final InstanceIdentifier TOPOLOGY_TYPE_PATH = - EVENT_SOURCE_TOPOLOGY_PATH - .child(TopologyTypes.class) - .augmentation(TopologyTypes1.class); + private static final InstanceIdentifier TOPOLOGY_TYPE_PATH = EVENT_SOURCE_TOPOLOGY_PATH + .child(TopologyTypes.class).augmentation(TopologyTypes1.class); - private final Map> registrations = - new ConcurrentHashMap<>(); + private final Map eventSourceTopicMap = new ConcurrentHashMap<>(); + private final Map routedRpcRegistrations = new ConcurrentHashMap<>(); private final DataBroker dataBroker; - private final RpcRegistration aggregatorRpcReg; + private final ObjectRegistration aggregatorRpcReg; private final EventSourceService eventSourceService; - private final RpcProviderRegistry rpcRegistry; - private final ExecutorService executorService; + private final RpcProviderService rpcRegistry; + + public EventSourceTopology(final DataBroker dataBroker, final RpcProviderService providerService, + final RpcConsumerRegistry rpcService) { - public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { this.dataBroker = dataBroker; - this.executorService = Executors.newCachedThreadPool(); - this.rpcRegistry = rpcRegistry; - aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); - eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); + this.rpcRegistry = providerService; + aggregatorRpcReg = providerService.registerRpcImplementation(EventAggregatorService.class, this); + eventSourceService = rpcService.getRpcService(EventSourceService.class); final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); - final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + final TopologyTypes1 topologyTypeAugment = + new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + LOG.info("EventSourceRegistry has been initialized"); } - private void putData(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { + private void putData(final LogicalDatastoreType store, + final InstanceIdentifier path, + final T data) { - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); - tx.put(store, path, data, true); - tx.submit(); + final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); + tx.mergeParentStructurePut(store, path, data); + tx.commit().addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.trace("Data has put into datastore {} {}", store, path); + } + + @Override + public void onFailure(final Throwable ex) { + LOG.error("Can not put data into datastore [store: {}] [path: {}]", store, path, ex); + } + }, MoreExecutors.directExecutor()); } - private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { - final NodeKey nodeKey = node.getKey(); + private void deleteData(final LogicalDatastoreType store, + final InstanceIdentifier path) { + final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); + tx.delete(OPERATIONAL, path); + tx.commit().addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.trace("Data has deleted from datastore {} {}", store, path); + } + + @Override + public void onFailure(final Throwable ex) { + LOG.error("Can not delete data from datastore [store: {}] [path: {}]", store, path, ex); + } + }, MoreExecutors.directExecutor()); + } + + private void insert(final KeyedInstanceIdentifier sourcePath) { + final NodeKey nodeKey = sourcePath.getKey(); final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); - final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); + final Node1 nodeAgument = new Node1Builder().setEventSourceNode( + new NodeId(nodeKey.getNodeId().getValue())).build(); putData(OPERATIONAL, augmentPath, nodeAgument); } - private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ - executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic)); + private void remove(final KeyedInstanceIdentifier sourcePath) { + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + deleteData(OPERATIONAL, augmentPath); } @Override - public Future> createTopic(final CreateTopicInput input) { - LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", + public ListenableFuture> createTopic(final CreateTopicInput input) { + LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", input.getNotificationPattern(), input.getNodeIdPattern()); final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); + //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex final String nodeIdPattern = input.getNodeIdPattern().getValue(); - final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern)); - final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService); - - registerTopic(eventSourceTopic); + final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this); - notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic); + eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic); final CreateTopicOutput cto = new CreateTopicOutputBuilder() .setTopicId(eventSourceTopic.getTopicId()) .build(); - return Util.resultFor(cto); + LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}", + input.getNotificationPattern(), + input.getNodeIdPattern()); + + return Util.resultRpcSuccessFor(cto); } @Override - public Future> destroyTopic(final DestroyTopicInput input) { - return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented")); + public ListenableFuture> destroyTopic(final DestroyTopicInput input) { + final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId()); + if (topicToDestroy != null) { + topicToDestroy.close(); + } + return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build()); } @Override public void close() { aggregatorRpcReg.close(); + eventSourceTopicMap.values().forEach(EventSourceTopic::close); } - public void registerTopic(final EventSourceTopic listener) { - final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, - EVENT_SOURCE_TOPOLOGY_PATH, - listener, - DataBroker.DataChangeScope.SUBTREE); - - registrations.put(listener, listenerRegistration); + public void register(final EventSource eventSource) { + final NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + final Registration reg = rpcRegistry.registerRpcImplementation(EventSourceService.class, eventSource, + Collections.singleton(sourcePath)); + routedRpcRegistrations.put(nodeKey, reg); + insert(sourcePath); } - public void register(final Node node, final NetconfEventSource netconfEventSource) { - final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()); - rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource) - .registerPath(NodeContext.class, sourcePath); - insert(sourcePath,node); - // FIXME: Return registration object. + public void unRegister(final EventSource eventSource) { + final NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + final Registration removeRegistration = routedRpcRegistrations.remove(nodeKey); + if (removeRegistration != null) { + removeRegistration.close(); + remove(sourcePath); + } } - private class NotifyAllNodeExecutor implements Runnable { + @Override + public EventSourceRegistration registerEventSource(final T eventSource) { + final EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + register(eventSource); + return esr; + } - private final EventSourceTopic topic; - private final DataBroker dataBroker; - private final Pattern nodeIdPatternRegex; + DataBroker getDataBroker() { + return dataBroker; + } - public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) { - this.topic = topic; - this.dataBroker = dataBroker; - this.nodeIdPatternRegex = nodeIdPatternRegex; - } + EventSourceService getEventSourceService() { + return eventSourceService; + } - @Override - public void run() { - //# Code reader note: Context of Node type is NetworkTopology - final List nodes = snapshot(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } + @VisibleForTesting + Map getRoutedRpcRegistrations() { + return routedRpcRegistrations; + } - private List snapshot() { - try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) { - - final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet(); - - if(data.isPresent()) { - final List nodeList = data.get().getNode(); - if(nodeList != null) { - return nodeList; - } - } - return Collections.emptyList(); - } catch (final ReadFailedException e) { - LOG.error("Unable to retrieve node list.", e); - return Collections.emptyList(); - } - } + @VisibleForTesting + Map getEventSourceTopicMap() { + return eventSourceTopicMap; } }