2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.messagebus.app.impl;
11 import java.util.List;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.Future;
15 import java.util.regex.Pattern;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
20 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
26 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
27 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
29 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
30 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
31 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
32 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
33 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
34 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
35 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import com.google.common.base.Optional;
56 import com.google.common.util.concurrent.CheckedFuture;
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
60 public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
63 private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
64 private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
65 private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
67 private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
68 InstanceIdentifier.create(NetworkTopology.class)
69 .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
71 private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
72 EVENT_SOURCE_TOPOLOGY_PATH
73 .child(TopologyTypes.class)
74 .augmentation(TopologyTypes1.class);
76 private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
77 new ConcurrentHashMap<>();
79 private final DataBroker dataBroker;
80 private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
81 private final EventSourceService eventSourceService;
82 private final RpcProviderRegistry rpcRegistry;
84 public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
86 this.dataBroker = dataBroker;
87 this.rpcRegistry = rpcRegistry;
88 aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
89 eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
91 final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
92 final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
93 putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
97 private <T extends DataObject> void putData(final LogicalDatastoreType store,
98 final InstanceIdentifier<T> path,
101 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
102 tx.put(store, path, data, true);
107 private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
108 final NodeKey nodeKey = node.getKey();
109 final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
110 final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
111 putData(OPERATIONAL, augmentPath, nodeAgument);
114 private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
116 final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
118 final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
120 Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
123 public void onSuccess(Optional<Topology> data) {
124 if(data.isPresent()) {
125 final List<Node> nodes = data.get().getNode();
126 for (final Node node : nodes) {
127 if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
128 eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
136 public void onFailure(Throwable t) {
137 LOG.error("Can not notify existing nodes {}", t);
146 public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
147 LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
148 input.getNotificationPattern(),
149 input.getNodeIdPattern());
151 final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
152 final String nodeIdPattern = input.getNodeIdPattern().getValue();
153 final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
154 final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
156 registerTopic(eventSourceTopic);
158 notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
160 final CreateTopicOutput cto = new CreateTopicOutputBuilder()
161 .setTopicId(eventSourceTopic.getTopicId())
164 return Util.resultFor(cto);
168 public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
169 return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
173 public void close() {
174 aggregatorRpcReg.close();
177 public void registerTopic(final EventSourceTopic listener) {
178 final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
179 EVENT_SOURCE_TOPOLOGY_PATH,
181 DataBroker.DataChangeScope.SUBTREE);
183 registrations.put(listener, listenerRegistration);
186 public void register(final Node node, final NetconfEventSource netconfEventSource) {
187 final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
188 rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
189 .registerPath(NodeContext.class, sourcePath);
190 insert(sourcePath,node);
191 // FIXME: Return registration object.