From: Tony Tkacik Date: Wed, 25 Mar 2015 09:18:32 +0000 (+0000) Subject: Merge "BUG 2799: Migration of Message Bus from deprecated Helium MD-SAL APIs to Lithi... X-Git-Tag: release/lithium~356 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=edfebf2ecb683a0b16286977c43efa5a67b6744a;hp=00cc355c0c58e999ffebd531bca3a507e150e441 Merge "BUG 2799: Migration of Message Bus from deprecated Helium MD-SAL APIs to Lithium API" --- diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 603f34bac9..076d1b2fc7 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,16 +8,11 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; - -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.regex.Pattern; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; @@ -57,7 +52,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.regex.Pattern; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; public class EventSourceTopology implements EventAggregatorService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -82,11 +80,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl private final RpcRegistration aggregatorRpcReg; private final EventSourceService eventSourceService; private final RpcProviderRegistry rpcRegistry; - private final ExecutorService executorService; public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + this.dataBroker = dataBroker; - this.executorService = Executors.newCachedThreadPool(); this.rpcRegistry = rpcRegistry; aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); @@ -94,14 +91,17 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + } private void putData(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { + final InstanceIdentifier path, + final T data){ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); tx.put(store, path, data, true); tx.submit(); + } private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { @@ -112,7 +112,34 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl } private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ - executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic)); + + final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); + + final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ + + @Override + public void onSuccess(Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + for (final Node node : nodes) { + if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { + eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + tx.close(); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Can not notify existing nodes {}", t); + tx.close(); + } + + }); + } @Override @@ -164,45 +191,4 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl // FIXME: Return registration object. } - private class NotifyAllNodeExecutor implements Runnable { - - private final EventSourceTopic topic; - private final DataBroker dataBroker; - private final Pattern nodeIdPatternRegex; - - public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) { - this.topic = topic; - this.dataBroker = dataBroker; - this.nodeIdPatternRegex = nodeIdPatternRegex; - } - - @Override - public void run() { - //# Code reader note: Context of Node type is NetworkTopology - final List nodes = snapshot(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } - - private List snapshot() { - try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) { - - final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet(); - - if(data.isPresent()) { - final List nodeList = data.get().getNode(); - if(nodeList != null) { - return nodeList; - } - } - return Collections.emptyList(); - } catch (final ReadFailedException e) { - LOG.error("Unable to retrieve node list.", e); - return Collections.emptyList(); - } - } - } }