X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=6140a78ba586a7beaa89a92ae0ab9cc5fb109016;hb=e1fc2efacc0b2da9075b6cd4e5ae149a141c486d;hp=603f34bac994d9ff44c8e37ed9f99bca11b17c47;hpb=753515e8868a1a15982d3f2697439f522f273db5;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 603f34bac9..6140a78ba5 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,16 +8,11 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; - -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.regex.Pattern; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; @@ -25,6 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; @@ -57,9 +56,13 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.regex.Pattern; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; + -public class EventSourceTopology implements EventAggregatorService, AutoCloseable { +public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ; @@ -75,18 +78,19 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private final Map> registrations = + private final Map> topicListenerRegistrations = + new ConcurrentHashMap<>(); + private final Map> routedRpcRegistrations = new ConcurrentHashMap<>(); private final DataBroker dataBroker; private final RpcRegistration aggregatorRpcReg; private final EventSourceService eventSourceService; private final RpcProviderRegistry rpcRegistry; - private final ExecutorService executorService; public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + this.dataBroker = dataBroker; - this.executorService = Executors.newCachedThreadPool(); this.rpcRegistry = rpcRegistry; aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); @@ -94,25 +98,66 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + LOG.info("EventSourceRegistry has been initialized"); } private void putData(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { + final InstanceIdentifier path, + final T data){ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); tx.put(store, path, data, true); tx.submit(); + } - private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { - final NodeKey nodeKey = node.getKey(); + private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ + final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + tx.delete(OPERATIONAL, path); + tx.submit(); + } + + private void insert(final KeyedInstanceIdentifier sourcePath) { + final NodeKey nodeKey = sourcePath.getKey(); final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); putData(OPERATIONAL, augmentPath, nodeAgument); } + private void remove(final KeyedInstanceIdentifier sourcePath){ + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + deleteData(OPERATIONAL, augmentPath); + } + private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ - executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic)); + + final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); + + final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ + + @Override + public void onSuccess(Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + for (final Node node : nodes) { + if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { + eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + tx.close(); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Can not notify existing nodes {}", t); + tx.close(); + } + + }); + } @Override @@ -123,8 +168,8 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); final String nodeIdPattern = input.getNodeIdPattern().getValue(); - final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern)); - final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService); + final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern); + final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); registerTopic(eventSourceTopic); @@ -134,7 +179,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl .setTopicId(eventSourceTopic.getTopicId()) .build(); - return Util.resultFor(cto); + return Util.resultRpcSuccessFor(cto); } @Override @@ -145,64 +190,49 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl @Override public void close() { aggregatorRpcReg.close(); + for(ListenerRegistration reg : topicListenerRegistrations.values()){ + reg.close(); + } } - public void registerTopic(final EventSourceTopic listener) { + private void registerTopic(final EventSourceTopic listener) { final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH, listener, DataBroker.DataChangeScope.SUBTREE); - registrations.put(listener, listenerRegistration); - } - - public void register(final Node node, final NetconfEventSource netconfEventSource) { - final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()); - rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource) - .registerPath(NodeContext.class, sourcePath); - insert(sourcePath,node); - // FIXME: Return registration object. + topicListenerRegistrations.put(listener, listenerRegistration); } - private class NotifyAllNodeExecutor implements Runnable { + public void register(final EventSource eventSource){ + NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + reg.registerPath(NodeContext.class, sourcePath); + routedRpcRegistrations.put(nodeKey,reg); + insert(sourcePath); - private final EventSourceTopic topic; - private final DataBroker dataBroker; - private final Pattern nodeIdPatternRegex; - - public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) { - this.topic = topic; - this.dataBroker = dataBroker; - this.nodeIdPatternRegex = nodeIdPatternRegex; + for(EventSourceTopic est : topicListenerRegistrations.keySet()){ + est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey)); } + } - @Override - public void run() { - //# Code reader note: Context of Node type is NetworkTopology - final List nodes = snapshot(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } + public void unRegister(final EventSource eventSource){ + final NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + final RoutedRpcRegistration removeRegistration = routedRpcRegistrations.remove(nodeKey); + if(removeRegistration != null){ + removeRegistration.close(); + remove(sourcePath); } + } - private List snapshot() { - try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) { - - final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet(); - - if(data.isPresent()) { - final List nodeList = data.get().getNode(); - if(nodeList != null) { - return nodeList; - } - } - return Collections.emptyList(); - } catch (final ReadFailedException e) { - LOG.error("Unable to retrieve node list.", e); - return Collections.emptyList(); - } - } + @Override + public EventSourceRegistration registerEventSource( + T eventSource) { + EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + register(eventSource); + return esr; } } +