2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.messagebus.app.impl;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.concurrent.ConcurrentHashMap;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.controller.messagebus.app.util.Util;
21 import org.opendaylight.controller.messagebus.spi.EventSource;
22 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
23 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
24 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
25 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
26 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
42 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
52 import org.opendaylight.yangtools.yang.binding.DataObject;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
60 private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
62 private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
63 private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
64 private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
66 static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
67 InstanceIdentifier.create(NetworkTopology.class)
68 .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
70 private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
71 EVENT_SOURCE_TOPOLOGY_PATH
72 .child(TopologyTypes.class)
73 .augmentation(TopologyTypes1.class);
75 private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
76 private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
77 new ConcurrentHashMap<>();
79 private final DataBroker dataBroker;
80 private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
81 private final EventSourceService eventSourceService;
82 private final RpcProviderRegistry rpcRegistry;
84 public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
86 this.dataBroker = dataBroker;
87 this.rpcRegistry = rpcRegistry;
88 aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
89 eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
91 final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
92 final TopologyTypes1 topologyTypeAugment =
93 new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
94 putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
95 LOG.info("EventSourceRegistry has been initialized");
98 private <T extends DataObject> void putData(final LogicalDatastoreType store,
99 final InstanceIdentifier<T> path,
102 final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
103 tx.put(store, path, data, true);
104 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
106 public void onSuccess(final Void result) {
107 LOG.trace("Data has put into datastore {} {}", store, path);
111 public void onFailure(final Throwable ex) {
112 LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
114 }, MoreExecutors.directExecutor());
117 private <T extends DataObject> void deleteData(final LogicalDatastoreType store,
118 final InstanceIdentifier<T> path) {
119 final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
120 tx.delete(OPERATIONAL, path);
121 Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
123 public void onSuccess(final Void result) {
124 LOG.trace("Data has deleted from datastore {} {}", store, path);
128 public void onFailure(final Throwable ex) {
129 LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
131 }, MoreExecutors.directExecutor());
134 private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
135 final NodeKey nodeKey = sourcePath.getKey();
136 final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
137 final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
138 new NodeId(nodeKey.getNodeId().getValue())).build();
139 putData(OPERATIONAL, augmentPath, nodeAgument);
142 private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
143 final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
144 deleteData(OPERATIONAL, augmentPath);
148 public ListenableFuture<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
149 LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
150 input.getNotificationPattern(),
151 input.getNodeIdPattern());
153 final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
154 //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
155 final String nodeIdPattern = input.getNodeIdPattern().getValue();
156 final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
158 eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
160 final CreateTopicOutput cto = new CreateTopicOutputBuilder()
161 .setTopicId(eventSourceTopic.getTopicId())
164 LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
165 input.getNotificationPattern(),
166 input.getNodeIdPattern());
168 return Util.resultRpcSuccessFor(cto);
172 public ListenableFuture<RpcResult<DestroyTopicOutput>> destroyTopic(final DestroyTopicInput input) {
173 final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
174 if (topicToDestroy != null) {
175 topicToDestroy.close();
177 return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build());
181 public void close() {
182 aggregatorRpcReg.close();
183 eventSourceTopicMap.values().forEach(EventSourceTopic::close);
186 public void register(final EventSource eventSource) {
188 final NodeKey nodeKey = eventSource.getSourceNodeKey();
189 final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
190 final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(
191 EventSourceService.class, eventSource);
192 reg.registerPath(NodeContext.class, sourcePath);
193 routedRpcRegistrations.put(nodeKey,reg);
198 public void unRegister(final EventSource eventSource) {
199 final NodeKey nodeKey = eventSource.getSourceNodeKey();
200 final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
201 final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
202 if (removeRegistration != null) {
203 removeRegistration.close();
209 public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
210 final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
211 register(eventSource);
215 DataBroker getDataBroker() {
219 EventSourceService getEventSourceService() {
220 return eventSourceService;