package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.regex.Pattern;
+
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
public class EventSourceTopic implements DataChangeListener {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
private final NotificationPattern notificationPattern;
final String regex = Util.wildcardToRegex(nodeIdPattern);
this.nodeIdPattern = Pattern.compile(regex);
-
- // FIXME: We need to perform some salting in order to make
- // the topic IDs less predictable.
- this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern));
+ this.topicId = new TopicId(Util.getUUIDIdent());
}
public TopicId getTopicId() {
}
public void notifyNode(final InstanceIdentifier<?> nodeId) {
+
try {
- sourceService.joinTopic(getJoinTopicInputArgument(nodeId));
+ RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
+ if(rpcResultJoinTopic.isSuccessful() == false){
+ for(RpcError err : rpcResultJoinTopic.getErrors()){
+ LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
+ }
+ }
} catch (final Exception e) {
LOG.error("Could not invoke join topic for node {}", nodeId);
}
return jti;
}
-
}