X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopic.java;h=a84f260005fc80e2329e4a4962ff61a46c822d47;hb=f9814cf027886294b74fb6c8748f4a3e0a545e86;hp=d6132beb8c4e8f2ae81c7df9c5554479af18d9eb;hpb=cce450550bec259d4f925389bafd007676f2186f;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java index d6132beb8c..a84f260005 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java @@ -10,25 +10,28 @@ package org.opendaylight.controller.messagebus.app.impl; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; +import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener; +import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; +import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; @@ -37,29 +40,31 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.LoggerFactory; -public class EventSourceTopic implements DataChangeListener, AutoCloseable { +public final class EventSourceTopic implements DataTreeChangeListener, AutoCloseable { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class); private final NotificationPattern notificationPattern; private final EventSourceService sourceService; private final Pattern nodeIdPattern; private final TopicId topicId; - private ListenerRegistration listenerRegistration; + private ListenerRegistration listenerRegistration; private final CopyOnWriteArraySet> joinedEventSources = new CopyOnWriteArraySet<>(); - public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){ - final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService()); + public static EventSourceTopic create(final NotificationPattern notificationPattern, + final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology) { + final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, + eventSourceTopology.getEventSourceService()); est.registerListner(eventSourceTopology); est.notifyExistingNodes(eventSourceTopology); return est; } - private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) { + private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, + final EventSourceService sourceService) { this.notificationPattern = Preconditions.checkNotNull(notificationPattern); this.sourceService = Preconditions.checkNotNull(sourceService); this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern); @@ -73,25 +78,19 @@ public class EventSourceTopic implements DataChangeListener, AutoCloseable { } @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - - for (final Map.Entry, DataObject> createdEntry : event.getCreatedData().entrySet()) { - if (createdEntry.getValue() instanceof Node) { - final Node node = (Node) createdEntry.getValue(); - LOG.debug("Create node..."); - if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) { - LOG.debug("Matched..."); - notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } - - for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { - if (changeEntry.getValue() instanceof Node) { - final Node node = (Node) changeEntry.getValue(); - if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) { - notifyNode(changeEntry.getKey()); - } + public void onDataTreeChanged(final Collection> changes) { + for (DataTreeModification change: changes) { + final DataObjectModification rootNode = change.getRootNode(); + switch (rootNode.getModificationType()) { + case WRITE: + case SUBTREE_MODIFIED: + final Node node = rootNode.getDataAfter(); + if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) { + notifyNode(change.getRootPath().getRootIdentifier()); + } + break; + default: + break; } } } @@ -99,52 +98,52 @@ public class EventSourceTopic implements DataChangeListener, AutoCloseable { public void notifyNode(final InstanceIdentifier nodeId) { LOG.debug("Notify node: {}", nodeId); try { - final RpcResult rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); - if(rpcResultJoinTopic.isSuccessful() == false){ - for(final RpcError err : rpcResultJoinTopic.getErrors()){ - LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString()); + final RpcResult rpcResultJoinTopic = + sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); + if (!rpcResultJoinTopic.isSuccessful()) { + for (final RpcError err : rpcResultJoinTopic.getErrors()) { + LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(), + nodeId.toString(), err.toString()); } } else { joinedEventSources.add(nodeId); } - } catch (final Exception e) { + } catch (InterruptedException | ExecutionException e) { LOG.error("Could not invoke join topic for node {}", nodeId); } } - private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){ + private void notifyExistingNodes(final EventSourceTopology eventSourceTopology) { LOG.debug("Notify existing nodes"); final Pattern nodeRegex = this.nodeIdPattern; final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction(); - final CheckedFuture, ReadFailedException> future = + final ListenableFuture> future = tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH); - Futures.addCallback(future, new FutureCallback>(){ - + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(final Optional data) { - if(data.isPresent()) { - final List nodes = data.get().getNode(); - if(nodes != null){ + public void onSuccess(@Nonnull final Optional data) { + if (data.isPresent()) { + final List nodes = data.get().getNode(); + if (nodes != null) { for (final Node node : nodes) { - if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) { - notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } + if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) { + notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, + node.key())); + } + } + } } tx.close(); } @Override - public void onFailure(final Throwable t) { - LOG.error("Can not notify existing nodes", t); + public void onFailure(final Throwable ex) { + LOG.error("Can not notify existing nodes", ex); tx.close(); } - - }); - + }, MoreExecutors.directExecutor()); } private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier path) { @@ -162,7 +161,7 @@ public class EventSourceTopic implements DataChangeListener, AutoCloseable { return nodeIdPattern; } - private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier eventSourceNodeId){ + private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier eventSourceNodeId) { final NodeRef nodeRef = new NodeRef(eventSourceNodeId); final DisJoinTopicInput dji = new DisJoinTopicInputBuilder() .setNode(nodeRef.getValue()) @@ -173,34 +172,36 @@ public class EventSourceTopic implements DataChangeListener, AutoCloseable { private void registerListner(final EventSourceTopology eventSourceTopology) { this.listenerRegistration = - eventSourceTopology.getDataBroker().registerDataChangeListener( + eventSourceTopology.getDataBroker().registerDataTreeChangeListener(new DataTreeIdentifier<>( LogicalDatastoreType.OPERATIONAL, - EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH, - this, - DataBroker.DataChangeScope.SUBTREE); + EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class)), + this); } @Override public void close() { - if(this.listenerRegistration != null){ + if (this.listenerRegistration != null) { this.listenerRegistration.close(); } - for(final InstanceIdentifier eventSourceNodeId : joinedEventSources){ + for (final InstanceIdentifier eventSourceNodeId : joinedEventSources) { try { - final RpcResult result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get(); - if(result.isSuccessful() == false){ - for(final RpcError err : result.getErrors()){ - LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString()); + final RpcResult result = sourceService + .disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get(); + if (result.isSuccessful() == false) { + for (final RpcError err : result.getErrors()) { + LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(), + eventSourceNodeId, err.toString()); } } } catch (InterruptedException | ExecutionException ex) { - LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex); + LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), + eventSourceNodeId, ex); } } joinedEventSources.clear(); } - private static String getUUIDIdent(){ + private static String getUUIDIdent() { final UUID uuid = UUID.randomUUID(); return uuid.toString(); }