b22eee233e612679da6b766126792f299e426fba
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.messagebus.app.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.Collections;
15 import java.util.Map;
16 import java.util.concurrent.ConcurrentHashMap;
17 import org.opendaylight.controller.messagebus.app.util.Util;
18 import org.opendaylight.controller.messagebus.spi.EventSource;
19 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
20 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
21 import org.opendaylight.mdsal.binding.api.DataBroker;
22 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
23 import org.opendaylight.mdsal.binding.api.RpcProviderService;
24 import org.opendaylight.mdsal.binding.api.WriteTransaction;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
42 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
51 import org.opendaylight.yangtools.concepts.ObjectRegistration;
52 import org.opendaylight.yangtools.concepts.Registration;
53 import org.opendaylight.yangtools.yang.binding.DataObject;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcResult;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 @Deprecated(forRemoval = true)
61 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
62     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
63
64     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
65     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
66     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
67
68     static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
69             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
70
71     private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH = EVENT_SOURCE_TOPOLOGY_PATH
72             .child(TopologyTypes.class).augmentation(TopologyTypes1.class);
73
74     private final Map<TopicId, EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
75     private final Map<NodeKey, Registration> routedRpcRegistrations = new ConcurrentHashMap<>();
76
77     private final DataBroker dataBroker;
78     private final ObjectRegistration<EventSourceTopology> aggregatorRpcReg;
79     private final EventSourceService eventSourceService;
80     private final RpcProviderService rpcRegistry;
81
82     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderService providerService,
83             final RpcConsumerRegistry rpcService) {
84
85         this.dataBroker = dataBroker;
86         this.rpcRegistry = providerService;
87         aggregatorRpcReg = providerService.registerRpcImplementation(EventAggregatorService.class, this);
88         eventSourceService = rpcService.getRpcService(EventSourceService.class);
89
90         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
91         final TopologyTypes1 topologyTypeAugment =
92                 new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
93         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
94         LOG.info("EventSourceRegistry has been initialized");
95     }
96
97     private <T extends DataObject> void putData(final LogicalDatastoreType store,
98                                                  final InstanceIdentifier<T> path,
99                                                  final T data) {
100
101         final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
102         tx.mergeParentStructurePut(store, path, data);
103         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
104             @Override
105             public void onSuccess(final CommitInfo result) {
106                 LOG.trace("Data has put into datastore {} {}", store, path);
107             }
108
109             @Override
110             public void onFailure(final Throwable ex) {
111                 LOG.error("Can not put data into datastore [store: {}] [path: {}]", store, path, ex);
112             }
113         }, MoreExecutors.directExecutor());
114     }
115
116     private <T extends DataObject>  void deleteData(final LogicalDatastoreType store,
117             final InstanceIdentifier<T> path) {
118         final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
119         tx.delete(OPERATIONAL, path);
120         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
121             @Override
122             public void onSuccess(final CommitInfo result) {
123                 LOG.trace("Data has deleted from datastore {} {}", store, path);
124             }
125
126             @Override
127             public void onFailure(final Throwable ex) {
128                 LOG.error("Can not delete data from datastore [store: {}] [path: {}]", store, path, ex);
129             }
130         }, MoreExecutors.directExecutor());
131     }
132
133     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
134         final NodeKey nodeKey = sourcePath.getKey();
135         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
136         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
137                 new NodeId(nodeKey.getNodeId().getValue())).build();
138         putData(OPERATIONAL, augmentPath, nodeAgument);
139     }
140
141     private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
142         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
143         deleteData(OPERATIONAL, augmentPath);
144     }
145
146     @Override
147     public ListenableFuture<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
148         LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
149                 input.getNotificationPattern(),
150                 input.getNodeIdPattern());
151
152         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
153         //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
154         final String nodeIdPattern = input.getNodeIdPattern().getValue();
155         final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
156
157         eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
158
159         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
160                 .setTopicId(eventSourceTopic.getTopicId())
161                 .build();
162
163         LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
164                 input.getNotificationPattern(),
165                 input.getNodeIdPattern());
166
167         return Util.resultRpcSuccessFor(cto);
168     }
169
170     @Override
171     public ListenableFuture<RpcResult<DestroyTopicOutput>> destroyTopic(final DestroyTopicInput input) {
172         final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
173         if (topicToDestroy != null) {
174             topicToDestroy.close();
175         }
176         return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build());
177     }
178
179     @Override
180     public void close() {
181         aggregatorRpcReg.close();
182         eventSourceTopicMap.values().forEach(EventSourceTopic::close);
183     }
184
185     public void register(final EventSource eventSource) {
186         final NodeKey nodeKey = eventSource.getSourceNodeKey();
187         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
188         final Registration reg = rpcRegistry.registerRpcImplementation(EventSourceService.class, eventSource,
189             Collections.singleton(sourcePath));
190         routedRpcRegistrations.put(nodeKey, reg);
191         insert(sourcePath);
192     }
193
194     public void unRegister(final EventSource eventSource) {
195         final NodeKey nodeKey = eventSource.getSourceNodeKey();
196         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
197         final Registration removeRegistration = routedRpcRegistrations.remove(nodeKey);
198         if (removeRegistration != null) {
199             removeRegistration.close();
200             remove(sourcePath);
201         }
202     }
203
204     @Override
205     public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
206         final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
207         register(eventSource);
208         return esr;
209     }
210
211     DataBroker getDataBroker() {
212         return dataBroker;
213     }
214
215     EventSourceService getEventSourceService() {
216         return eventSourceService;
217     }
218
219     @VisibleForTesting
220     Map<NodeKey, Registration> getRoutedRpcRegistrations() {
221         return routedRpcRegistrations;
222     }
223
224     @VisibleForTesting
225     Map<TopicId, EventSourceTopic> getEventSourceTopicMap() {
226         return eventSourceTopicMap;
227     }
228 }