Fix checkstyle violations in messagebus
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopic.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.UUID;
20 import java.util.concurrent.CopyOnWriteArraySet;
21 import java.util.concurrent.ExecutionException;
22 import java.util.regex.Pattern;
23 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.slf4j.LoggerFactory;
45
46 public final class EventSourceTopic implements DataTreeChangeListener<Node>, AutoCloseable {
47     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
48     private final NotificationPattern notificationPattern;
49     private final EventSourceService sourceService;
50     private final Pattern nodeIdPattern;
51     private final TopicId topicId;
52     private ListenerRegistration<?> listenerRegistration;
53     private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
54
55     public static EventSourceTopic create(final NotificationPattern notificationPattern,
56             final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology) {
57         final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern,
58                 eventSourceTopology.getEventSourceService());
59         est.registerListner(eventSourceTopology);
60         est.notifyExistingNodes(eventSourceTopology);
61         return est;
62     }
63
64     private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern,
65             final EventSourceService sourceService) {
66         this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
67         this.sourceService = Preconditions.checkNotNull(sourceService);
68         this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern);
69         this.topicId = new TopicId(getUUIDIdent());
70         this.listenerRegistration = null;
71         LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
72     }
73
74     public TopicId getTopicId() {
75         return topicId;
76     }
77
78     @Override
79     public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
80         for (DataTreeModification<Node> change: changes) {
81             final DataObjectModification<Node> rootNode = change.getRootNode();
82             switch (rootNode.getModificationType()) {
83                 case WRITE:
84                 case SUBTREE_MODIFIED:
85                     final Node node = rootNode.getDataAfter();
86                     if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) {
87                         notifyNode(change.getRootPath().getRootIdentifier());
88                     }
89                     break;
90                 default:
91                     break;
92             }
93         }
94     }
95
96     public void notifyNode(final InstanceIdentifier<?> nodeId) {
97         LOG.debug("Notify node: {}", nodeId);
98         try {
99             final RpcResult<JoinTopicOutput> rpcResultJoinTopic =
100                     sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
101             if (!rpcResultJoinTopic.isSuccessful()) {
102                 for (final RpcError err : rpcResultJoinTopic.getErrors()) {
103                     LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(),
104                             nodeId.toString(), err.toString());
105                 }
106             } else {
107                 joinedEventSources.add(nodeId);
108             }
109         } catch (InterruptedException | ExecutionException e) {
110             LOG.error("Could not invoke join topic for node {}", nodeId);
111         }
112     }
113
114     private void notifyExistingNodes(final EventSourceTopology eventSourceTopology) {
115         LOG.debug("Notify existing nodes");
116         final Pattern nodeRegex = this.nodeIdPattern;
117
118         final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
119         final ListenableFuture<Optional<Topology>> future =
120                 tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
121
122         Futures.addCallback(future, new FutureCallback<Optional<Topology>>() {
123             @Override
124             public void onSuccess(final Optional<Topology> data) {
125                 if (data.isPresent()) {
126                     final List<Node> nodes = data.get().getNode();
127                     if (nodes != null) {
128                         for (final Node node : nodes) {
129                             if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
130                                 notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class,
131                                         node.getKey()));
132                             }
133                         }
134                     }
135                 }
136                 tx.close();
137             }
138
139             @Override
140             public void onFailure(final Throwable ex) {
141                 LOG.error("Can not notify existing nodes", ex);
142                 tx.close();
143             }
144         }, MoreExecutors.directExecutor());
145     }
146
147     private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
148         final NodeRef nodeRef = new NodeRef(path);
149         final JoinTopicInput jti =
150                 new JoinTopicInputBuilder()
151                         .setNode(nodeRef.getValue())
152                         .setTopicId(topicId)
153                         .setNotificationPattern(notificationPattern)
154                         .build();
155         return jti;
156     }
157
158     public Pattern getNodeIdRegexPattern() {
159         return nodeIdPattern;
160     }
161
162     private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId) {
163         final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
164         final DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
165                 .setNode(nodeRef.getValue())
166                 .setTopicId(topicId)
167                 .build();
168         return dji;
169     }
170
171     private void registerListner(final EventSourceTopology eventSourceTopology) {
172         this.listenerRegistration =
173                 eventSourceTopology.getDataBroker().registerDataTreeChangeListener(new DataTreeIdentifier<>(
174                         LogicalDatastoreType.OPERATIONAL,
175                         EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class)),
176                         this);
177     }
178
179     @Override
180     public void close() {
181         if (this.listenerRegistration != null) {
182             this.listenerRegistration.close();
183         }
184         for (final InstanceIdentifier<?> eventSourceNodeId : joinedEventSources) {
185             try {
186                 final RpcResult<Void> result = sourceService
187                         .disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
188                 if (result.isSuccessful() == false) {
189                     for (final RpcError err : result.getErrors()) {
190                         LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(),
191                                 eventSourceNodeId, err.toString());
192                     }
193                 }
194             } catch (InterruptedException | ExecutionException ex) {
195                 LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(),
196                         eventSourceNodeId, ex);
197             }
198         }
199         joinedEventSources.clear();
200     }
201
202     private static String getUUIDIdent() {
203         final UUID uuid = UUID.randomUUID();
204         return uuid.toString();
205     }
206 }