2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.messagebus.app.impl;
11 import java.util.List;
13 import java.util.UUID;
14 import java.util.concurrent.CopyOnWriteArraySet;
15 import java.util.concurrent.ExecutionException;
16 import java.util.regex.Pattern;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
20 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
24 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
25 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
26 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.opendaylight.yangtools.yang.common.RpcError;
39 import org.opendaylight.yangtools.yang.common.RpcResult;
40 import org.slf4j.LoggerFactory;
42 import com.google.common.base.Optional;
43 import com.google.common.base.Preconditions;
44 import com.google.common.util.concurrent.CheckedFuture;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
48 public class EventSourceTopic implements DataChangeListener, AutoCloseable {
49 private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
50 private final NotificationPattern notificationPattern;
51 private final EventSourceService sourceService;
52 private final Pattern nodeIdRegexPattern;
53 private final TopicId topicId;
54 private ListenerRegistration<DataChangeListener> listenerRegistration;
55 private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
57 public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){
58 EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService());
59 est.registerListner(eventSourceTopology);
60 est.notifyExistingNodes(eventSourceTopology);
64 private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) {
65 this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
66 this.sourceService = Preconditions.checkNotNull(sourceService);
67 this.nodeIdRegexPattern = Pattern.compile(nodeIdRegexPattern);
68 this.topicId = new TopicId(getUUIDIdent());
69 this.listenerRegistration = null;
70 LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
73 public TopicId getTopicId() {
78 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
80 for (final Map.Entry<InstanceIdentifier<?>, DataObject> createdEntry : event.getCreatedData().entrySet()) {
81 if (createdEntry.getValue() instanceof Node) {
82 final Node node = (Node) createdEntry.getValue();
83 LOG.debug("Create node...");
84 if (this.nodeIdRegexPattern.matcher(node.getNodeId().getValue()).matches()) {
85 LOG.debug("Matched...");
86 notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
91 for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
92 if (changeEntry.getValue() instanceof Node) {
93 final Node node = (Node) changeEntry.getValue();
94 LOG.debug("Update node...");
95 if (this.nodeIdRegexPattern.matcher(node.getNodeId().getValue()).matches()) {
96 LOG.debug("Matched...");
97 notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
103 public void notifyNode(final InstanceIdentifier<?> nodeId) {
104 LOG.debug("Notify node: {}", nodeId);
106 RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
107 if(rpcResultJoinTopic.isSuccessful() == false){
108 for(RpcError err : rpcResultJoinTopic.getErrors()){
109 LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
112 joinedEventSources.add(nodeId);
114 } catch (final Exception e) {
115 LOG.error("Could not invoke join topic for node {}", nodeId);
119 private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){
120 LOG.debug("Notify existing nodes");
121 final Pattern nodeRegex = this.nodeIdRegexPattern;
123 final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
124 final CheckedFuture<Optional<Topology>, ReadFailedException> future =
125 tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
127 Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
130 public void onSuccess(Optional<Topology> data) {
131 if(data.isPresent()) {
132 final List<Node> nodes = data.get().getNode();
134 for (final Node node : nodes) {
135 if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
136 notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
145 public void onFailure(Throwable t) {
146 LOG.error("Can not notify existing nodes", t);
154 private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
155 final NodeRef nodeRef = new NodeRef(path);
156 final JoinTopicInput jti =
157 new JoinTopicInputBuilder()
158 .setNode(nodeRef.getValue())
160 .setNotificationPattern(notificationPattern)
165 private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId){
166 final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
167 DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
168 .setNode(nodeRef.getValue())
174 private void registerListner(final EventSourceTopology eventSourceTopology) {
175 this.listenerRegistration =
176 eventSourceTopology.getDataBroker().registerDataChangeListener(
177 LogicalDatastoreType.OPERATIONAL,
178 EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH,
180 DataBroker.DataChangeScope.SUBTREE);
184 public void close() {
185 if(this.listenerRegistration != null){
186 this.listenerRegistration.close();
188 for(InstanceIdentifier<?> eventSourceNodeId : joinedEventSources){
190 RpcResult<Void> result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
191 if(result.isSuccessful() == false){
192 for(RpcError err : result.getErrors()){
193 LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString());
196 } catch (InterruptedException | ExecutionException ex) {
197 LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex);
200 joinedEventSources.clear();
203 private static String getUUIDIdent(){
204 UUID uuid = UUID.randomUUID();
205 return uuid.toString();