X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopic.java;h=ea04d9995109c8be4fd567612151e33c706102ec;hp=13e50b5ce57030b9a1a97864c02b2fe1613c33bd;hb=30535bcc4c2770cb8500469fe40bdfb37d8ade4d;hpb=c31a6fcf9fb070d4419ca4c32d8b531fdcb5030d diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java index 13e50b5ce5..ea04d99951 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java @@ -8,43 +8,64 @@ package org.opendaylight.controller.messagebus.app.impl; -import java.util.Map; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; - -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; +import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener; +import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; +import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - -public class EventSourceTopic implements DataChangeListener { +public class EventSourceTopic implements DataTreeChangeListener, AutoCloseable { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class); private final NotificationPattern notificationPattern; private final EventSourceService sourceService; private final Pattern nodeIdPattern; private final TopicId topicId; + private ListenerRegistration listenerRegistration; + private final CopyOnWriteArraySet> joinedEventSources = new CopyOnWriteArraySet<>(); - public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) { - this.notificationPattern = Preconditions.checkNotNull(notificationPattern); - this.sourceService = eventSource; - - // FIXME: regex should be the language of nodeIdPattern - final String regex = Util.wildcardToRegex(nodeIdPattern); - this.nodeIdPattern = Pattern.compile(regex); + public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){ + final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService()); + est.registerListner(eventSourceTopology); + est.notifyExistingNodes(eventSourceTopology); + return est; + } - this.topicId = new TopicId(Util.getUUIDIdent()); + private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) { + this.notificationPattern = Preconditions.checkNotNull(notificationPattern); + this.sourceService = Preconditions.checkNotNull(sourceService); + this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern); + this.topicId = new TopicId(getUUIDIdent()); + this.listenerRegistration = null; + LOG.info("EventSourceTopic created - topicId {}", topicId.getValue()); } public TopicId getTopicId() { @@ -52,31 +73,74 @@ public class EventSourceTopic implements DataChangeListener { } @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { - if (changeEntry.getValue() instanceof Node) { - final Node node = (Node) changeEntry.getValue(); - if (nodeIdPattern.matcher(node.getId().getValue()).matches()) { - notifyNode(changeEntry.getKey()); - } + public void onDataTreeChanged(Collection> changes) { + for (DataTreeModification change: changes) { + final DataObjectModification rootNode = change.getRootNode(); + switch (rootNode.getModificationType()) { + case WRITE: + case SUBTREE_MODIFIED: + final Node node = rootNode.getDataAfter(); + if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) { + notifyNode(change.getRootPath().getRootIdentifier()); + } + break; + default: + break; } } } public void notifyNode(final InstanceIdentifier nodeId) { - + LOG.debug("Notify node: {}", nodeId); try { - RpcResult rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); + final RpcResult rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); if(rpcResultJoinTopic.isSuccessful() == false){ - for(RpcError err : rpcResultJoinTopic.getErrors()){ + for(final RpcError err : rpcResultJoinTopic.getErrors()){ LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString()); } + } else { + joinedEventSources.add(nodeId); } } catch (final Exception e) { LOG.error("Could not invoke join topic for node {}", nodeId); } } + private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){ + LOG.debug("Notify existing nodes"); + final Pattern nodeRegex = this.nodeIdPattern; + + final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction(); + final CheckedFuture, ReadFailedException> future = + tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ + + @Override + public void onSuccess(final Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + if(nodes != null){ + for (final Node node : nodes) { + if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) { + notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + } + tx.close(); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Can not notify existing nodes", t); + tx.close(); + } + + }); + + } + private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier path) { final NodeRef nodeRef = new NodeRef(path); final JoinTopicInput jti = @@ -88,4 +152,49 @@ public class EventSourceTopic implements DataChangeListener { return jti; } + public Pattern getNodeIdRegexPattern() { + return nodeIdPattern; + } + + private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier eventSourceNodeId){ + final NodeRef nodeRef = new NodeRef(eventSourceNodeId); + final DisJoinTopicInput dji = new DisJoinTopicInputBuilder() + .setNode(nodeRef.getValue()) + .setTopicId(topicId) + .build(); + return dji; + } + + private void registerListner(final EventSourceTopology eventSourceTopology) { + this.listenerRegistration = + eventSourceTopology.getDataBroker().registerDataTreeChangeListener(new DataTreeIdentifier<>( + LogicalDatastoreType.OPERATIONAL, + EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class)), + this); + } + + @Override + public void close() { + if(this.listenerRegistration != null){ + this.listenerRegistration.close(); + } + for(final InstanceIdentifier eventSourceNodeId : joinedEventSources){ + try { + final RpcResult result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get(); + if(result.isSuccessful() == false){ + for(final RpcError err : result.getErrors()){ + LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString()); + } + } + } catch (InterruptedException | ExecutionException ex) { + LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex); + } + } + joinedEventSources.clear(); + } + + private static String getUUIDIdent(){ + final UUID uuid = UUID.randomUUID(); + return uuid.toString(); + } }