* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.messagebus.app.util.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
-
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+@Deprecated(forRemoval = true)
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
- private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
- InstanceIdentifier.create(NetworkTopology.class)
- .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
+ static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
+ InstanceIdentifier.create(NetworkTopology.class).child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
- private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
- EVENT_SOURCE_TOPOLOGY_PATH
- .child(TopologyTypes.class)
- .augmentation(TopologyTypes1.class);
+ private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH = EVENT_SOURCE_TOPOLOGY_PATH
+ .child(TopologyTypes.class).augmentation(TopologyTypes1.class);
- private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
- new ConcurrentHashMap<>();
+ private final Map<TopicId, EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
+ private final Map<NodeKey, Registration> routedRpcRegistrations = new ConcurrentHashMap<>();
private final DataBroker dataBroker;
- private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
+ private final ObjectRegistration<EventSourceTopology> aggregatorRpcReg;
private final EventSourceService eventSourceService;
- private final RpcProviderRegistry rpcRegistry;
- private final ExecutorService executorService;
+ private final RpcProviderService rpcRegistry;
+
+ public EventSourceTopology(final DataBroker dataBroker, final RpcProviderService providerService,
+ final RpcConsumerRegistry rpcService) {
- public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
this.dataBroker = dataBroker;
- this.executorService = Executors.newCachedThreadPool();
- this.rpcRegistry = rpcRegistry;
- aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
- eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
+ this.rpcRegistry = providerService;
+ aggregatorRpcReg = providerService.registerRpcImplementation(EventAggregatorService.class, this);
+ eventSourceService = rpcService.getRpcService(EventSourceService.class);
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
- final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+ final TopologyTypes1 topologyTypeAugment =
+ new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+ LOG.info("EventSourceRegistry has been initialized");
}
- private <T extends DataObject> void putData(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path, final T data) {
+ private <T extends DataObject> void putData(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path,
+ final T data) {
- final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- tx.put(store, path, data, true);
- tx.submit();
+ final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
+ tx.mergeParentStructurePut(store, path, data);
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.trace("Data has put into datastore {} {}", store, path);
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Can not put data into datastore [store: {}] [path: {}]", store, path, ex);
+ }
+ }, MoreExecutors.directExecutor());
}
- private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
- final NodeKey nodeKey = node.getKey();
+ private <T extends DataObject> void deleteData(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path) {
+ final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
+ tx.delete(OPERATIONAL, path);
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.trace("Data has deleted from datastore {} {}", store, path);
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Can not delete data from datastore [store: {}] [path: {}]", store, path, ex);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+ final NodeKey nodeKey = sourcePath.getKey();
final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
- final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
+ final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
+ new NodeId(nodeKey.getNodeId().getValue())).build();
putData(OPERATIONAL, augmentPath, nodeAgument);
}
- private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
- executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+ private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+ final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+ deleteData(OPERATIONAL, augmentPath);
}
@Override
- public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
- LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+ public ListenableFuture<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+ LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
input.getNotificationPattern(),
input.getNodeIdPattern());
final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+ //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
final String nodeIdPattern = input.getNodeIdPattern().getValue();
- final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
- final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
-
- registerTopic(eventSourceTopic);
+ final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
- notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+ eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
final CreateTopicOutput cto = new CreateTopicOutputBuilder()
.setTopicId(eventSourceTopic.getTopicId())
.build();
- return Util.resultFor(cto);
+ LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
+ return Util.resultRpcSuccessFor(cto);
}
@Override
- public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
- return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
+ public ListenableFuture<RpcResult<DestroyTopicOutput>> destroyTopic(final DestroyTopicInput input) {
+ final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
+ if (topicToDestroy != null) {
+ topicToDestroy.close();
+ }
+ return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build());
}
@Override
public void close() {
aggregatorRpcReg.close();
+ eventSourceTopicMap.values().forEach(EventSourceTopic::close);
}
- public void registerTopic(final EventSourceTopic listener) {
- final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
- EVENT_SOURCE_TOPOLOGY_PATH,
- listener,
- DataBroker.DataChangeScope.SUBTREE);
-
- registrations.put(listener, listenerRegistration);
+ public void register(final EventSource eventSource) {
+ final NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ final Registration reg = rpcRegistry.registerRpcImplementation(EventSourceService.class, eventSource,
+ Collections.singleton(sourcePath));
+ routedRpcRegistrations.put(nodeKey, reg);
+ insert(sourcePath);
}
- public void register(final Node node, final NetconfEventSource netconfEventSource) {
- final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
- rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
- .registerPath(NodeContext.class, sourcePath);
- insert(sourcePath,node);
- // FIXME: Return registration object.
+ public void unRegister(final EventSource eventSource) {
+ final NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ final Registration removeRegistration = routedRpcRegistrations.remove(nodeKey);
+ if (removeRegistration != null) {
+ removeRegistration.close();
+ remove(sourcePath);
+ }
}
- private class NotifyAllNodeExecutor implements Runnable {
+ @Override
+ public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
+ final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+ register(eventSource);
+ return esr;
+ }
- private final EventSourceTopic topic;
- private final DataBroker dataBroker;
- private final Pattern nodeIdPatternRegex;
+ DataBroker getDataBroker() {
+ return dataBroker;
+ }
- public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
- this.topic = topic;
- this.dataBroker = dataBroker;
- this.nodeIdPatternRegex = nodeIdPatternRegex;
- }
+ EventSourceService getEventSourceService() {
+ return eventSourceService;
+ }
- @Override
- public void run() {
- //# Code reader note: Context of Node type is NetworkTopology
- final List<Node> nodes = snapshot();
- for (final Node node : nodes) {
- if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
- topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
- }
- }
- }
+ @VisibleForTesting
+ Map<NodeKey, Registration> getRoutedRpcRegistrations() {
+ return routedRpcRegistrations;
+ }
- private List<Node> snapshot() {
- try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
- final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
- if(data.isPresent()) {
- final List<Node> nodeList = data.get().getNode();
- if(nodeList != null) {
- return nodeList;
- }
- }
- return Collections.emptyList();
- } catch (final ReadFailedException e) {
- LOG.error("Unable to retrieve node list.", e);
- return Collections.emptyList();
- }
- }
+ @VisibleForTesting
+ Map<TopicId, EventSourceTopic> getEventSourceTopicMap() {
+ return eventSourceTopicMap;
}
}