BUG 3121 - destroy topic implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopic.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import java.util.List;
12 import java.util.Map;
13 import java.util.UUID;
14 import java.util.concurrent.CopyOnWriteArraySet;
15 import java.util.concurrent.ExecutionException;
16 import java.util.regex.Pattern;
17
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
20 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
24 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
25 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
26 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.opendaylight.yangtools.yang.common.RpcError;
39 import org.opendaylight.yangtools.yang.common.RpcResult;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.base.Optional;
43 import com.google.common.base.Preconditions;
44 import com.google.common.util.concurrent.CheckedFuture;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47
48 public class EventSourceTopic implements DataChangeListener, AutoCloseable {
49     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
50     private final NotificationPattern notificationPattern;
51     private final EventSourceService sourceService;
52     private final Pattern nodeIdRegexPattern;
53     private final TopicId topicId;
54     private ListenerRegistration<DataChangeListener> listenerRegistration;
55     private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
56
57     public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){
58         EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService());
59         est.registerListner(eventSourceTopology);
60         est.notifyExistingNodes(eventSourceTopology);
61         return est;
62     }
63
64     private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) {
65         this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
66         this.sourceService = Preconditions.checkNotNull(sourceService);
67         this.nodeIdRegexPattern = Pattern.compile(nodeIdRegexPattern);
68         this.topicId = new TopicId(getUUIDIdent());
69         this.listenerRegistration = null;
70         LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
71     }
72
73     public TopicId getTopicId() {
74         return topicId;
75     }
76
77     @Override
78     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
79
80         for (final Map.Entry<InstanceIdentifier<?>, DataObject> createdEntry : event.getCreatedData().entrySet()) {
81             if (createdEntry.getValue() instanceof Node) {
82                 final Node node = (Node) createdEntry.getValue();
83                 LOG.debug("Create node...");
84                 if (this.nodeIdRegexPattern.matcher(node.getNodeId().getValue()).matches()) {
85                     LOG.debug("Matched...");
86                     notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
87                 }
88             }
89         }
90
91         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
92             if (changeEntry.getValue() instanceof Node) {
93                 final Node node = (Node) changeEntry.getValue();
94                 LOG.debug("Update node...");
95                 if (this.nodeIdRegexPattern.matcher(node.getNodeId().getValue()).matches()) {
96                     LOG.debug("Matched...");
97                     notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
98                 }
99             }
100         }
101     }
102
103     public void notifyNode(final InstanceIdentifier<?> nodeId) {
104         LOG.debug("Notify node: {}", nodeId);
105         try {
106             RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
107             if(rpcResultJoinTopic.isSuccessful() == false){
108                 for(RpcError err : rpcResultJoinTopic.getErrors()){
109                     LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
110                 }
111             } else {
112                 joinedEventSources.add(nodeId);
113             }
114         } catch (final Exception e) {
115             LOG.error("Could not invoke join topic for node {}", nodeId);
116         }
117     }
118
119     private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){
120         LOG.debug("Notify existing nodes");
121         final Pattern nodeRegex = this.nodeIdRegexPattern;
122
123         final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
124         final CheckedFuture<Optional<Topology>, ReadFailedException> future =
125                 tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
126
127         Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
128
129             @Override
130             public void onSuccess(Optional<Topology> data) {
131                 if(data.isPresent()) {
132                      final List<Node> nodes = data.get().getNode();
133                      if(nodes != null){
134                         for (final Node node : nodes) {
135                              if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
136                                  notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
137                              }
138                          }
139                      }
140                 }
141                 tx.close();
142             }
143
144             @Override
145             public void onFailure(Throwable t) {
146                 LOG.error("Can not notify existing nodes", t);
147                 tx.close();
148             }
149
150         });
151
152     }
153
154     private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
155         final NodeRef nodeRef = new NodeRef(path);
156         final JoinTopicInput jti =
157                 new JoinTopicInputBuilder()
158                         .setNode(nodeRef.getValue())
159                         .setTopicId(topicId)
160                         .setNotificationPattern(notificationPattern)
161                         .build();
162         return jti;
163     }
164
165     private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId){
166         final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
167         DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
168                 .setNode(nodeRef.getValue())
169                 .setTopicId(topicId)
170                 .build();
171         return dji;
172     }
173
174     private void registerListner(final EventSourceTopology eventSourceTopology) {
175         this.listenerRegistration =
176                 eventSourceTopology.getDataBroker().registerDataChangeListener(
177                         LogicalDatastoreType.OPERATIONAL,
178                         EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH,
179                         this,
180                         DataBroker.DataChangeScope.SUBTREE);
181     }
182
183     @Override
184     public void close() {
185         if(this.listenerRegistration != null){
186             this.listenerRegistration.close();
187         }
188         for(InstanceIdentifier<?> eventSourceNodeId : joinedEventSources){
189             try {
190                 RpcResult<Void> result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
191                 if(result.isSuccessful() == false){
192                     for(RpcError err : result.getErrors()){
193                         LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString());
194                     }
195                 }
196             } catch (InterruptedException | ExecutionException ex) {
197                 LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex);
198             }
199         }
200         joinedEventSources.clear();
201     }
202
203     private static String getUUIDIdent(){
204         UUID uuid = UUID.randomUUID();
205         return uuid.toString();
206     }
207 }