934056dcab81f2eeb81f387a6d6986c31ec5b629
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
21 import org.opendaylight.controller.messagebus.app.util.Util;
22 import org.opendaylight.controller.messagebus.spi.EventSource;
23 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
24 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
25 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
26 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
27 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
42 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
43 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
53 import org.opendaylight.yangtools.yang.binding.DataObject;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcResult;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
61     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
62
63     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
64     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
65     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
66
67     static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
68             InstanceIdentifier.create(NetworkTopology.class)
69                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
70
71     private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
72             EVENT_SOURCE_TOPOLOGY_PATH
73                     .child(TopologyTypes.class)
74                     .augmentation(TopologyTypes1.class);
75
76     private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
77     private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
78             new ConcurrentHashMap<>();
79
80     private final DataBroker dataBroker;
81     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
82     private final EventSourceService eventSourceService;
83     private final RpcProviderRegistry rpcRegistry;
84
85     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
86
87         this.dataBroker = dataBroker;
88         this.rpcRegistry = rpcRegistry;
89         aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
90         eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
91
92         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
93         final TopologyTypes1 topologyTypeAugment =
94                 new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
95         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
96         LOG.info("EventSourceRegistry has been initialized");
97     }
98
99     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
100                                                  final InstanceIdentifier<T> path,
101                                                  final T data) {
102
103         final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
104         tx.put(store, path, data, true);
105         Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
106             @Override
107             public void onSuccess(final Void result) {
108                 LOG.trace("Data has put into datastore {} {}", store, path);
109             }
110
111             @Override
112             public void onFailure(final Throwable ex) {
113                 LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
114             }
115         }, MoreExecutors.directExecutor());
116     }
117
118     private <T extends DataObject>  void deleteData(final LogicalDatastoreType store,
119             final InstanceIdentifier<T> path) {
120         final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
121         tx.delete(OPERATIONAL, path);
122         Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
123             @Override
124             public void onSuccess(final Void result) {
125                 LOG.trace("Data has deleted from datastore {} {}", store, path);
126             }
127
128             @Override
129             public void onFailure(final Throwable ex) {
130                 LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
131             }
132         }, MoreExecutors.directExecutor());
133     }
134
135     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
136         final NodeKey nodeKey = sourcePath.getKey();
137         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
138         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
139                 new NodeId(nodeKey.getNodeId().getValue())).build();
140         putData(OPERATIONAL, augmentPath, nodeAgument);
141     }
142
143     private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
144         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
145         deleteData(OPERATIONAL, augmentPath);
146     }
147
148     @Override
149     public ListenableFuture<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
150         LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
151                 input.getNotificationPattern(),
152                 input.getNodeIdPattern());
153
154         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
155         //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
156         final String nodeIdPattern = input.getNodeIdPattern().getValue();
157         final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
158
159         eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
160
161         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
162                 .setTopicId(eventSourceTopic.getTopicId())
163                 .build();
164
165         LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
166                 input.getNotificationPattern(),
167                 input.getNodeIdPattern());
168
169         return Util.resultRpcSuccessFor(cto);
170     }
171
172     @Override
173     public ListenableFuture<RpcResult<DestroyTopicOutput>> destroyTopic(final DestroyTopicInput input) {
174         final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
175         if (topicToDestroy != null) {
176             topicToDestroy.close();
177         }
178         return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build());
179     }
180
181     @Override
182     public void close() {
183         aggregatorRpcReg.close();
184         eventSourceTopicMap.values().forEach(EventSourceTopic::close);
185     }
186
187     public void register(final EventSource eventSource) {
188
189         final NodeKey nodeKey = eventSource.getSourceNodeKey();
190         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
191         final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(
192                 EventSourceService.class, eventSource);
193         reg.registerPath(NodeContext.class, sourcePath);
194         routedRpcRegistrations.put(nodeKey,reg);
195         insert(sourcePath);
196
197     }
198
199     public void unRegister(final EventSource eventSource) {
200         final NodeKey nodeKey = eventSource.getSourceNodeKey();
201         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
202         final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
203         if (removeRegistration != null) {
204             removeRegistration.close();
205             remove(sourcePath);
206         }
207     }
208
209     @Override
210     public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
211         final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
212         register(eventSource);
213         return esr;
214     }
215
216     DataBroker getDataBroker() {
217         return dataBroker;
218     }
219
220     EventSourceService getEventSourceService() {
221         return eventSourceService;
222     }
223
224     @VisibleForTesting
225     Map<NodeKey, RoutedRpcRegistration<EventSourceService>> getRoutedRpcRegistrations() {
226         return routedRpcRegistrations;
227     }
228
229     @VisibleForTesting
230     Map<TopicId, EventSourceTopic> getEventSourceTopicMap() {
231         return eventSourceTopicMap;
232     }
233 }