d6132beb8c4e8f2ae81c7df9c5554479af18d9eb
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopic.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.UUID;
19 import java.util.concurrent.CopyOnWriteArraySet;
20 import java.util.concurrent.ExecutionException;
21 import java.util.regex.Pattern;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.yang.binding.DataObject;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.slf4j.LoggerFactory;
45
46 public class EventSourceTopic implements DataChangeListener, AutoCloseable {
47     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
48     private final NotificationPattern notificationPattern;
49     private final EventSourceService sourceService;
50     private final Pattern nodeIdPattern;
51     private final TopicId topicId;
52     private ListenerRegistration<DataChangeListener> listenerRegistration;
53     private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
54
55     public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){
56         final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService());
57         est.registerListner(eventSourceTopology);
58         est.notifyExistingNodes(eventSourceTopology);
59         return est;
60     }
61
62     private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) {
63         this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
64         this.sourceService = Preconditions.checkNotNull(sourceService);
65         this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern);
66         this.topicId = new TopicId(getUUIDIdent());
67         this.listenerRegistration = null;
68         LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
69     }
70
71     public TopicId getTopicId() {
72         return topicId;
73     }
74
75     @Override
76     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
77
78         for (final Map.Entry<InstanceIdentifier<?>, DataObject> createdEntry : event.getCreatedData().entrySet()) {
79             if (createdEntry.getValue() instanceof Node) {
80                 final Node node = (Node) createdEntry.getValue();
81                 LOG.debug("Create node...");
82                 if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) {
83                     LOG.debug("Matched...");
84                     notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
85                 }
86             }
87         }
88
89         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
90             if (changeEntry.getValue() instanceof Node) {
91                 final Node node = (Node) changeEntry.getValue();
92                 if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) {
93                     notifyNode(changeEntry.getKey());
94                 }
95             }
96         }
97     }
98
99     public void notifyNode(final InstanceIdentifier<?> nodeId) {
100         LOG.debug("Notify node: {}", nodeId);
101         try {
102             final RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
103             if(rpcResultJoinTopic.isSuccessful() == false){
104                 for(final RpcError err : rpcResultJoinTopic.getErrors()){
105                     LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
106                 }
107             } else {
108                 joinedEventSources.add(nodeId);
109             }
110         } catch (final Exception e) {
111             LOG.error("Could not invoke join topic for node {}", nodeId);
112         }
113     }
114
115     private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){
116         LOG.debug("Notify existing nodes");
117         final Pattern nodeRegex = this.nodeIdPattern;
118
119         final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
120         final CheckedFuture<Optional<Topology>, ReadFailedException> future =
121                 tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
122
123         Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
124
125             @Override
126             public void onSuccess(final Optional<Topology> data) {
127                 if(data.isPresent()) {
128                      final List<Node> nodes = data.get().getNode();
129                      if(nodes != null){
130                         for (final Node node : nodes) {
131                              if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
132                                  notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
133                              }
134                          }
135                      }
136                 }
137                 tx.close();
138             }
139
140             @Override
141             public void onFailure(final Throwable t) {
142                 LOG.error("Can not notify existing nodes", t);
143                 tx.close();
144             }
145
146         });
147
148     }
149
150     private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
151         final NodeRef nodeRef = new NodeRef(path);
152         final JoinTopicInput jti =
153                 new JoinTopicInputBuilder()
154                         .setNode(nodeRef.getValue())
155                         .setTopicId(topicId)
156                         .setNotificationPattern(notificationPattern)
157                         .build();
158         return jti;
159     }
160
161     public Pattern getNodeIdRegexPattern() {
162         return nodeIdPattern;
163     }
164
165     private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId){
166         final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
167         final DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
168                 .setNode(nodeRef.getValue())
169                 .setTopicId(topicId)
170                 .build();
171         return dji;
172     }
173
174     private void registerListner(final EventSourceTopology eventSourceTopology) {
175         this.listenerRegistration =
176                 eventSourceTopology.getDataBroker().registerDataChangeListener(
177                         LogicalDatastoreType.OPERATIONAL,
178                         EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH,
179                         this,
180                         DataBroker.DataChangeScope.SUBTREE);
181     }
182
183     @Override
184     public void close() {
185         if(this.listenerRegistration != null){
186             this.listenerRegistration.close();
187         }
188         for(final InstanceIdentifier<?> eventSourceNodeId : joinedEventSources){
189             try {
190                 final RpcResult<Void> result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
191                 if(result.isSuccessful() == false){
192                     for(final RpcError err : result.getErrors()){
193                         LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString());
194                     }
195                 }
196             } catch (InterruptedException | ExecutionException ex) {
197                 LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex);
198             }
199         }
200         joinedEventSources.clear();
201     }
202
203     private static String getUUIDIdent(){
204         final UUID uuid = UUID.randomUUID();
205         return uuid.toString();
206     }
207 }