2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.messagebus.app.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Collection;
16 import java.util.Optional;
17 import java.util.UUID;
18 import java.util.concurrent.CopyOnWriteArraySet;
19 import java.util.concurrent.ExecutionException;
20 import java.util.regex.Pattern;
21 import org.opendaylight.mdsal.binding.api.DataObjectModification;
22 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
23 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.mdsal.binding.api.DataTreeModification;
25 import org.opendaylight.mdsal.binding.api.ReadTransaction;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicOutput;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.opendaylight.yangtools.yang.common.RpcError;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.LoggerFactory;
45 public final class EventSourceTopic implements DataTreeChangeListener<Node>, AutoCloseable {
46 private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
47 private final NotificationPattern notificationPattern;
48 private final EventSourceService sourceService;
49 private final Pattern nodeIdPattern;
50 private final TopicId topicId;
51 private ListenerRegistration<?> listenerRegistration;
52 private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
54 public static EventSourceTopic create(final NotificationPattern notificationPattern,
55 final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology) {
56 final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern,
57 eventSourceTopology.getEventSourceService());
58 est.registerListner(eventSourceTopology);
59 est.notifyExistingNodes(eventSourceTopology);
63 private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern,
64 final EventSourceService sourceService) {
65 this.notificationPattern = requireNonNull(notificationPattern);
66 this.sourceService = requireNonNull(sourceService);
67 this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern);
68 this.topicId = new TopicId(getUUIDIdent());
69 this.listenerRegistration = null;
70 LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
73 public TopicId getTopicId() {
78 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
79 for (DataTreeModification<Node> change: changes) {
80 final DataObjectModification<Node> rootNode = change.getRootNode();
81 switch (rootNode.getModificationType()) {
83 case SUBTREE_MODIFIED:
84 final Node node = rootNode.getDataAfter();
85 if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) {
86 notifyNode(change.getRootPath().getRootIdentifier());
95 public void notifyNode(final InstanceIdentifier<?> nodeId) {
96 LOG.debug("Notify node: {}", nodeId);
98 final RpcResult<JoinTopicOutput> rpcResultJoinTopic =
99 sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
100 if (!rpcResultJoinTopic.isSuccessful()) {
101 for (final RpcError err : rpcResultJoinTopic.getErrors()) {
102 LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(),
103 nodeId.toString(), err.toString());
106 joinedEventSources.add(nodeId);
108 } catch (InterruptedException | ExecutionException e) {
109 LOG.error("Could not invoke join topic for node {}", nodeId);
113 private void notifyExistingNodes(final EventSourceTopology eventSourceTopology) {
114 LOG.debug("Notify existing nodes");
115 final Pattern nodeRegex = this.nodeIdPattern;
117 final FluentFuture<Optional<Topology>> future;
118 try (ReadTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction()) {
119 future = tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
122 future.addCallback(new FutureCallback<Optional<Topology>>() {
124 public void onSuccess(final Optional<Topology> data) {
125 if (data.isPresent()) {
126 for (final Node node : data.get().nonnullNode().values()) {
127 if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
128 notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.key()));
135 public void onFailure(final Throwable ex) {
136 LOG.error("Can not notify existing nodes", ex);
138 }, MoreExecutors.directExecutor());
141 private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
142 final NodeRef nodeRef = new NodeRef(path);
143 final JoinTopicInput jti =
144 new JoinTopicInputBuilder()
145 .setNode(nodeRef.getValue())
147 .setNotificationPattern(notificationPattern)
152 public Pattern getNodeIdRegexPattern() {
153 return nodeIdPattern;
156 private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId) {
157 final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
158 final DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
159 .setNode(nodeRef.getValue())
165 private void registerListner(final EventSourceTopology eventSourceTopology) {
166 this.listenerRegistration = eventSourceTopology.getDataBroker().registerDataTreeChangeListener(
167 DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
168 EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class)), this);
172 public void close() {
173 if (this.listenerRegistration != null) {
174 this.listenerRegistration.close();
176 for (final InstanceIdentifier<?> eventSourceNodeId : joinedEventSources) {
178 final RpcResult<DisJoinTopicOutput> result = sourceService
179 .disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
180 if (result.isSuccessful() == false) {
181 for (final RpcError err : result.getErrors()) {
182 LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(),
183 eventSourceNodeId, err.toString());
186 } catch (InterruptedException | ExecutionException ex) {
187 LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(),
188 eventSourceNodeId, ex);
191 joinedEventSources.clear();
194 private static String getUUIDIdent() {
195 final UUID uuid = UUID.randomUUID();
196 return uuid.toString();