BUG 3030 - reconnect netconf event source
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.Future;
15 import java.util.regex.Pattern;
16
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
20 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
23 import org.opendaylight.controller.messagebus.spi.EventSource;
24 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
25 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
26 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
51 import org.opendaylight.yangtools.concepts.ListenerRegistration;
52 import org.opendaylight.yangtools.yang.binding.DataObject;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 import com.google.common.base.Optional;
60 import com.google.common.util.concurrent.CheckedFuture;
61 import com.google.common.util.concurrent.FutureCallback;
62 import com.google.common.util.concurrent.Futures;
63
64
65 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
66     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
67
68     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
69     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
70     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
71
72     private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
73             InstanceIdentifier.create(NetworkTopology.class)
74                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
75
76     private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
77             EVENT_SOURCE_TOPOLOGY_PATH
78                     .child(TopologyTypes.class)
79                     .augmentation(TopologyTypes1.class);
80
81     private final Map<EventSourceTopic, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
82             new ConcurrentHashMap<>();
83     private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
84             new ConcurrentHashMap<>();
85
86     private final DataBroker dataBroker;
87     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
88     private final EventSourceService eventSourceService;
89     private final RpcProviderRegistry rpcRegistry;
90
91     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
92
93         this.dataBroker = dataBroker;
94         this.rpcRegistry = rpcRegistry;
95         aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
96         eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
97
98         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
99         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
100         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
101         LOG.info("EventSourceRegistry has been initialized");
102     }
103
104     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
105                                                  final InstanceIdentifier<T> path,
106                                                  final T data){
107
108         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
109         tx.put(store, path, data, true);
110         tx.submit();
111
112     }
113
114     private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
115         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116         tx.delete(OPERATIONAL, path);
117         tx.submit();
118     }
119
120     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
121         final NodeKey nodeKey = sourcePath.getKey();
122         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
123         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
124         putData(OPERATIONAL, augmentPath, nodeAgument);
125     }
126
127     private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
128         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
129         deleteData(OPERATIONAL, augmentPath);
130     }
131
132     private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){
133
134         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
135
136         final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
137
138         Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
139
140             @Override
141             public void onSuccess(Optional<Topology> data) {
142                 if(data.isPresent()) {
143                     LOG.info("Topology data are present...");
144                      final List<Node> nodes = data.get().getNode();
145                      if(nodes != null){
146                          LOG.info("List of nodes is not null...");
147                          final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern();
148                      for (final Node node : nodes) {
149                          if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
150                              eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
151                          }
152                      }
153                      } else {
154                          LOG.info("List of nodes is NULL...");
155                      }
156                 }
157                 tx.close();
158             }
159
160             @Override
161             public void onFailure(Throwable t) {
162                 LOG.error("Can not notify existing nodes {}", t);
163                 tx.close();
164             }
165
166         });
167
168     }
169
170     @Override
171     public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
172         LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
173                 input.getNotificationPattern(),
174                 input.getNodeIdPattern());
175
176         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
177         final String nodeIdPattern = input.getNodeIdPattern().getValue();
178         final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
179
180         registerTopic(eventSourceTopic);
181
182         notifyExistingNodes(eventSourceTopic);
183
184         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
185                 .setTopicId(eventSourceTopic.getTopicId())
186                 .build();
187
188         return Util.resultRpcSuccessFor(cto);
189     }
190
191     @Override
192     public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
193         return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
194     }
195
196     @Override
197     public void close() {
198         aggregatorRpcReg.close();
199         for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
200             reg.close();
201         }
202     }
203
204     private void registerTopic(final EventSourceTopic listener) {
205         final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
206                 EVENT_SOURCE_TOPOLOGY_PATH,
207                 listener,
208                 DataBroker.DataChangeScope.SUBTREE);
209
210         topicListenerRegistrations.put(listener, listenerRegistration);
211     }
212
213     public void register(final EventSource eventSource){
214         NodeKey nodeKey = eventSource.getSourceNodeKey();
215         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
216         RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
217         reg.registerPath(NodeContext.class, sourcePath);
218         routedRpcRegistrations.put(nodeKey,reg);
219         insert(sourcePath);
220
221         for(EventSourceTopic est : topicListenerRegistrations.keySet()){
222             if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){
223                 est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
224             }
225         }
226     }
227
228     public void unRegister(final EventSource eventSource){
229         final NodeKey nodeKey = eventSource.getSourceNodeKey();
230         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
231         final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
232         if(removeRegistration != null){
233             removeRegistration.close();
234         remove(sourcePath);
235         }
236     }
237
238     @Override
239     public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
240             T eventSource) {
241         EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
242         register(eventSource);
243         return esr;
244     }
245 }
246