X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=3aa470b10ad9940729043fd918a9727ffbf5ab84;hp=b879eacda015b5d810ce535234dcbf1396f8d78c;hb=cce450550bec259d4f925389bafd007676f2186f;hpb=9fb1df14f2dc885fee1dce821b753cc99af6e54f diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index b879eacda0..3aa470b10a 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,18 +8,14 @@ package org.opendaylight.controller.messagebus.app.impl; -import java.util.List; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; -import java.util.regex.Pattern; - import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.messagebus.spi.EventSource; import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; @@ -32,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder; @@ -48,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; @@ -56,11 +52,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; - public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -69,7 +60,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID)); private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL; - private static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = + static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = InstanceIdentifier.create(NetworkTopology.class) .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); @@ -78,10 +69,9 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private final Map> topicListenerRegistrations = - new ConcurrentHashMap<>(); + private final Map eventSourceTopicMap = new ConcurrentHashMap<>(); private final Map> routedRpcRegistrations = - new ConcurrentHashMap<>();; + new ConcurrentHashMap<>(); private final DataBroker dataBroker; private final RpcRegistration aggregatorRpcReg; @@ -105,16 +95,39 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR final InstanceIdentifier path, final T data){ - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.put(store, path, data, true); - tx.submit(); + Futures.addCallback( tx.submit(), new FutureCallback(){ + + @Override + public void onSuccess(final Void result) { + LOG.trace("Data has put into datastore {} {}", store, path); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t); + } + }); } private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.delete(OPERATIONAL, path); - tx.submit(); + Futures.addCallback( tx.submit(), new FutureCallback(){ + + @Override + public void onSuccess(final Void result) { + LOG.trace("Data has deleted from datastore {} {}", store, path); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t); + } + + }); } private void insert(final KeyedInstanceIdentifier sourcePath) { @@ -129,88 +142,56 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR deleteData(OPERATIONAL, augmentPath); } - private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ - - final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); - - final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); - - Futures.addCallback(future, new FutureCallback>(){ - - @Override - public void onSuccess(Optional data) { - if(data.isPresent()) { - final List nodes = data.get().getNode(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } - tx.close(); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Can not notify existing nodes {}", t); - tx.close(); - } - - }); - - } - @Override public Future> createTopic(final CreateTopicInput input) { - LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", + LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", input.getNotificationPattern(), input.getNodeIdPattern()); final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); + //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex final String nodeIdPattern = input.getNodeIdPattern().getValue(); - final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern); - final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); - - registerTopic(eventSourceTopic); + final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this); - notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic); + eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic); final CreateTopicOutput cto = new CreateTopicOutputBuilder() .setTopicId(eventSourceTopic.getTopicId()) .build(); + LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}", + input.getNotificationPattern(), + input.getNodeIdPattern()); + return Util.resultRpcSuccessFor(cto); } @Override public Future> destroyTopic(final DestroyTopicInput input) { - return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented")); + final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId()); + if(topicToDestroy != null){ + topicToDestroy.close(); + } + return Util.resultRpcSuccessFor((Void) null); } @Override public void close() { aggregatorRpcReg.close(); - for(ListenerRegistration reg : topicListenerRegistrations.values()){ - reg.close(); + for(final EventSourceTopic est : eventSourceTopicMap.values()){ + est.close(); } } - private void registerTopic(final EventSourceTopic listener) { - final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, - EVENT_SOURCE_TOPOLOGY_PATH, - listener, - DataBroker.DataChangeScope.SUBTREE); - - topicListenerRegistrations.put(listener, listenerRegistration); - } - public void register(final EventSource eventSource){ - NodeKey nodeKey = eventSource.getSourceNodeKey(); + + final NodeKey nodeKey = eventSource.getSourceNodeKey(); final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); - RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + final RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); reg.registerPath(NodeContext.class, sourcePath); routedRpcRegistrations.put(nodeKey,reg); insert(sourcePath); + } public void unRegister(final EventSource eventSource){ @@ -224,11 +205,18 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR } @Override - public EventSourceRegistration registerEventSource( - T eventSource) { - EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + public EventSourceRegistration registerEventSource(final T eventSource) { + final EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); register(eventSource); return esr; } + + DataBroker getDataBroker() { + return dataBroker; + } + + EventSourceService getEventSourceService() { + return eventSourceService; + } }