X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopic.java;h=6de407f58be5d999961e82e03d572162ce2900d6;hp=98e168eee9678cdcc6db391f3eb56f70d58d8c5b;hb=refs%2Fchanges%2F98%2F21798%2F2;hpb=35128aa4927b06a97e3d1f505a6852105dc81fed diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java index 98e168eee9..6de407f58b 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java @@ -8,9 +8,9 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Preconditions; import java.util.Map; import java.util.regex.Pattern; + import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; @@ -18,12 +18,17 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class EventSourceTopic implements DataChangeListener { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class); private final NotificationPattern notificationPattern; @@ -34,15 +39,9 @@ public class EventSourceTopic implements DataChangeListener { public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) { this.notificationPattern = Preconditions.checkNotNull(notificationPattern); this.sourceService = eventSource; + this.nodeIdPattern = Pattern.compile(nodeIdPattern); - // FIXME: regex should be the language of nodeIdPattern - final String regex = Util.wildcardToRegex(nodeIdPattern); - this.nodeIdPattern = Pattern.compile(regex); - - - // FIXME: We need to perform some salting in order to make - // the topic IDs less predictable. - this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern)); + this.topicId = new TopicId(Util.getUUIDIdent()); } public TopicId getTopicId() { @@ -54,7 +53,7 @@ public class EventSourceTopic implements DataChangeListener { for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { if (changeEntry.getValue() instanceof Node) { final Node node = (Node) changeEntry.getValue(); - if (nodeIdPattern.matcher(node.getId().getValue()).matches()) { + if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) { notifyNode(changeEntry.getKey()); } } @@ -62,8 +61,14 @@ public class EventSourceTopic implements DataChangeListener { } public void notifyNode(final InstanceIdentifier nodeId) { + try { - sourceService.joinTopic(getJoinTopicInputArgument(nodeId)); + RpcResult rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); + if(rpcResultJoinTopic.isSuccessful() == false){ + for(RpcError err : rpcResultJoinTopic.getErrors()){ + LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString()); + } + } } catch (final Exception e) { LOG.error("Could not invoke join topic for node {}", nodeId); } @@ -80,5 +85,8 @@ public class EventSourceTopic implements DataChangeListener { return jti; } + public Pattern getNodeIdRegexPattern() { + return nodeIdPattern; + } }