BUG 3121 - destroy topic implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import java.util.Map;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.Future;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
19 import org.opendaylight.controller.messagebus.spi.EventSource;
20 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
21 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
26 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55
56 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
57     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
58
59     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
60     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
61     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
62
63     static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
64             InstanceIdentifier.create(NetworkTopology.class)
65                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
66
67     private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
68             EVENT_SOURCE_TOPOLOGY_PATH
69                     .child(TopologyTypes.class)
70                     .augmentation(TopologyTypes1.class);
71
72     private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
73     private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
74             new ConcurrentHashMap<>();
75
76     private final DataBroker dataBroker;
77     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
78     private final EventSourceService eventSourceService;
79     private final RpcProviderRegistry rpcRegistry;
80
81     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
82
83         this.dataBroker = dataBroker;
84         this.rpcRegistry = rpcRegistry;
85         aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
86         eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
87
88         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
89         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
90         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
91         LOG.info("EventSourceRegistry has been initialized");
92     }
93
94     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
95                                                  final InstanceIdentifier<T> path,
96                                                  final T data){
97
98         final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
99         tx.put(store, path, data, true);
100         Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
101
102             @Override
103             public void onSuccess(final Void result) {
104                 LOG.trace("Data has put into datastore {} {}", store, path);
105             }
106
107             @Override
108             public void onFailure(final Throwable t) {
109                 LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
110             }
111         });
112
113     }
114
115     private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
116         final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
117         tx.delete(OPERATIONAL, path);
118         Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
119
120             @Override
121             public void onSuccess(final Void result) {
122                 LOG.trace("Data has deleted from datastore {} {}", store, path);
123             }
124
125             @Override
126             public void onFailure(final Throwable t) {
127                 LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
128             }
129
130         });
131     }
132
133     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
134         final NodeKey nodeKey = sourcePath.getKey();
135         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
136         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
137         putData(OPERATIONAL, augmentPath, nodeAgument);
138     }
139
140     private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
141         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
142         deleteData(OPERATIONAL, augmentPath);
143     }
144
145     @Override
146     public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
147         LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
148                 input.getNotificationPattern(),
149                 input.getNodeIdPattern());
150
151         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
152         //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
153         final String nodeIdPattern = input.getNodeIdPattern().getValue();
154         final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
155
156         eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
157
158         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
159                 .setTopicId(eventSourceTopic.getTopicId())
160                 .build();
161
162         LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
163                 input.getNotificationPattern(),
164                 input.getNodeIdPattern());
165
166         return Util.resultRpcSuccessFor(cto);
167     }
168
169     @Override
170     public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
171         final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
172         if(topicToDestroy != null){
173             topicToDestroy.close();
174         }
175         return Util.resultRpcSuccessFor((Void) null);
176     }
177
178     @Override
179     public void close() {
180         aggregatorRpcReg.close();
181         for(final EventSourceTopic est : eventSourceTopicMap.values()){
182             est.close();
183         }
184     }
185
186     public void register(final EventSource eventSource){
187
188         final NodeKey nodeKey = eventSource.getSourceNodeKey();
189         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
190         final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
191         reg.registerPath(NodeContext.class, sourcePath);
192         routedRpcRegistrations.put(nodeKey,reg);
193         insert(sourcePath);
194
195     }
196
197     public void unRegister(final EventSource eventSource){
198         final NodeKey nodeKey = eventSource.getSourceNodeKey();
199         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
200         final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
201         if(removeRegistration != null){
202             removeRegistration.close();
203         remove(sourcePath);
204         }
205     }
206
207     @Override
208     public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
209         final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
210         register(eventSource);
211         return esr;
212     }
213
214     DataBroker getDataBroker() {
215         return dataBroker;
216     }
217
218     EventSourceService getEventSourceService() {
219         return eventSourceService;
220     }
221 }
222