Merge "BUG 2799: Migration of Message Bus from deprecated Helium MD-SAL APIs to Lithi...
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Futures;
13
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Future;
21
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
25 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
29 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
42 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.DataObject;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcResult;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import java.util.regex.Pattern;
61
62 public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
63     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
64
65     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
66     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
67     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
68
69     private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
70             InstanceIdentifier.create(NetworkTopology.class)
71                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
72
73     private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
74             EVENT_SOURCE_TOPOLOGY_PATH
75                     .child(TopologyTypes.class)
76                     .augmentation(TopologyTypes1.class);
77
78     private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
79             new ConcurrentHashMap<>();
80
81     private final DataBroker dataBroker;
82     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
83     private final EventSourceService eventSourceService;
84     private final RpcProviderRegistry rpcRegistry;
85     private final ExecutorService executorService;
86
87     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
88         this.dataBroker = dataBroker;
89         this.executorService = Executors.newCachedThreadPool();
90         this.rpcRegistry = rpcRegistry;
91         aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
92         eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
93
94         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
95         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
96         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
97     }
98
99     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
100             final InstanceIdentifier<T> path, final T data) {
101
102         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
103         tx.put(store, path, data, true);
104         tx.submit();
105     }
106
107     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
108         final NodeKey nodeKey = node.getKey();
109         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
110         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
111         putData(OPERATIONAL, augmentPath, nodeAgument);
112     }
113
114     private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
115         executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
116     }
117
118     @Override
119     public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
120         LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
121                 input.getNotificationPattern(),
122                 input.getNodeIdPattern());
123
124         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
125         final String nodeIdPattern = input.getNodeIdPattern().getValue();
126         final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
127         final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
128
129         registerTopic(eventSourceTopic);
130
131         notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
132
133         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
134                 .setTopicId(eventSourceTopic.getTopicId())
135                 .build();
136
137         return Util.resultFor(cto);
138     }
139
140     @Override
141     public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
142         return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
143     }
144
145     @Override
146     public void close() {
147         aggregatorRpcReg.close();
148     }
149
150     public void registerTopic(final EventSourceTopic listener) {
151         final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
152                 EVENT_SOURCE_TOPOLOGY_PATH,
153                 listener,
154                 DataBroker.DataChangeScope.SUBTREE);
155
156         registrations.put(listener, listenerRegistration);
157     }
158
159     public void register(final Node node, final NetconfEventSource netconfEventSource) {
160         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
161         rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
162             .registerPath(NodeContext.class, sourcePath);
163         insert(sourcePath,node);
164         // FIXME: Return registration object.
165     }
166
167     private class NotifyAllNodeExecutor implements Runnable {
168
169         private final EventSourceTopic topic;
170         private final DataBroker dataBroker;
171         private final Pattern nodeIdPatternRegex;
172
173         public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
174             this.topic = topic;
175             this.dataBroker = dataBroker;
176             this.nodeIdPatternRegex = nodeIdPatternRegex;
177         }
178
179         @Override
180         public void run() {
181             //# Code reader note: Context of Node type is NetworkTopology
182             final List<Node> nodes = snapshot();
183             for (final Node node : nodes) {
184                 if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
185                     topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
186                 }
187             }
188         }
189
190         private List<Node> snapshot() {
191             try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
192
193                 final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
194
195                 if(data.isPresent()) {
196                     final List<Node> nodeList = data.get().getNode();
197                     if(nodeList != null) {
198                         return nodeList;
199                     }
200                 }
201                 return Collections.emptyList();
202             } catch (final ReadFailedException e) {
203                 LOG.error("Unable to retrieve node list.", e);
204                 return Collections.emptyList();
205             }
206         }
207     }
208 }