From: Robert Varga Date: Sat, 7 Jun 2014 10:31:55 +0000 (+0200) Subject: Topology manager: get rid of sychronized sections X-Git-Tag: release/helium~678 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1776863db26c84cddb3e79bc0d881193f67444de Topology manager: get rid of sychronized sections Instead of executing changes one by one, we aggregate them such that ordering is preserved and we have one outstanding transaction at any given moment. This is done by placing a queue between the notification and data store, dispatched by a background thread. This should improve scalability by reducing number of transactions and improving notification thread concurrency. Change-Id: Ica70971e4540a060491ac4c0b89a134984947fca Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java index cf53c9770e..6dbfd7225b 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java @@ -15,12 +15,8 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode; import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId; -import java.util.concurrent.Future; - import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener; @@ -36,124 +32,102 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRem import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.base.Preconditions; class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener { - protected final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class); - public static final TopologyKey TOPOLOGY = new TopologyKey(new TopologyId("flow:1")); - private static final InstanceIdentifier TOPOLOGY_PATH = - InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, TOPOLOGY).build(); - - // FIXME: Flow capable topology exporter should use transaction chaining API - private DataProviderService dataService; - - public DataProviderService getDataService() { - return dataService; - } - - public void setDataService(final DataProviderService dataService) { - this.dataService = dataService; - } + private final InstanceIdentifier topology; + private final OperationProcessor processor; - public void start() { - TopologyBuilder tb = new TopologyBuilder().setKey(TOPOLOGY); - DataModificationTransaction tx = dataService.beginTransaction(); - tx.putOperationalData(TOPOLOGY_PATH, tb.build()); - listenOnTransactionState(tx.getIdentifier(),tx.commit()); + FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier topology) { + this.processor = Preconditions.checkNotNull(processor); + this.topology = Preconditions.checkNotNull(topology); } @Override public void onNodeRemoved(final NodeRemoved notification) { - NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId()); - InstanceIdentifier nodeInstance = toNodeIdentifier(notification.getNodeRef()); - - synchronized (this) { - DataModificationTransaction tx = dataService.beginTransaction(); - tx.removeOperationalData(nodeInstance); - removeAffectedLinks(tx, nodeId); - listenOnTransactionState(tx.getIdentifier(),tx.commit()); - } + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(final DataModificationTransaction transaction) { + NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId()); + InstanceIdentifier nodeInstance = toNodeIdentifier(notification.getNodeRef()); + transaction.removeOperationalData(nodeInstance); + removeAffectedLinks(transaction, nodeId); + } + }); } @Override public void onNodeUpdated(final NodeUpdated notification) { FlowCapableNodeUpdated fcnu = notification.getAugmentation(FlowCapableNodeUpdated.class); if (fcnu != null) { - Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef()); - InstanceIdentifier path = getNodePath(toTopologyNodeId(notification.getId())); - - synchronized (this) { - DataModificationTransaction tx = dataService.beginTransaction(); - tx.putOperationalData(path, node); - listenOnTransactionState(tx.getIdentifier(),tx.commit()); - } + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(final DataModificationTransaction transaction) { + Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef()); + InstanceIdentifier path = getNodePath(toTopologyNodeId(notification.getId())); + transaction.putOperationalData(path, node); + } + }); } } @Override public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) { - InstanceIdentifier tpInstance = toTerminationPointIdentifier(notification - .getNodeConnectorRef()); - TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId()); - - synchronized (this) { - DataModificationTransaction tx = dataService.beginTransaction(); - tx.removeOperationalData(tpInstance); - removeAffectedLinks(tx, tpId); - listenOnTransactionState(tx.getIdentifier(),tx.commit()); - } + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(final DataModificationTransaction transaction) { + InstanceIdentifier tpInstance = toTerminationPointIdentifier(notification + .getNodeConnectorRef()); + TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId()); + + transaction.removeOperationalData(tpInstance); + removeAffectedLinks(transaction, tpId); + } + }); } @Override public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) { - FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class); + final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class); if (fcncu != null) { - NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId()); - TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()), - notification.getNodeConnectorRef()); - InstanceIdentifier path = tpPath(nodeId, point.getKey().getTpId()); - - synchronized (this) { - DataModificationTransaction tx = dataService.beginTransaction(); - tx.putOperationalData(path, point); - if ((fcncu.getState() != null && fcncu.getState().isLinkDown()) - || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) { - removeAffectedLinks(tx, point.getTpId()); + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(final DataModificationTransaction transaction) { + NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId()); + TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()), + notification.getNodeConnectorRef()); + InstanceIdentifier path = tpPath(nodeId, point.getKey().getTpId()); + + transaction.putOperationalData(path, point); + if ((fcncu.getState() != null && fcncu.getState().isLinkDown()) + || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) { + removeAffectedLinks(transaction, point.getTpId()); + } } - listenOnTransactionState(tx.getIdentifier(),tx.commit()); - } + }); } } @Override public void onLinkDiscovered(final LinkDiscovered notification) { - Link link = toTopologyLink(notification); - InstanceIdentifier path = linkPath(link); - - synchronized (this) { - DataModificationTransaction tx = dataService.beginTransaction(); - tx.putOperationalData(path, link); - listenOnTransactionState(tx.getIdentifier(),tx.commit()); - } + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(final DataModificationTransaction transaction) { + Link link = toTopologyLink(notification); + InstanceIdentifier path = linkPath(link); + transaction.putOperationalData(path, link); + } + }); } @Override @@ -163,13 +137,12 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open @Override public void onLinkRemoved(final LinkRemoved notification) { - InstanceIdentifier path = linkPath(toTopologyLink(notification)); - - synchronized (this) { - DataModificationTransaction tx = dataService.beginTransaction(); - tx.removeOperationalData(path); - listenOnTransactionState(tx.getIdentifier(),tx.commit()); - } + processor.enqueueOperation(new TopologyOperation() { + @Override + public void applyOperation(final DataModificationTransaction transaction) { + transaction.removeOperationalData(linkPath(toTopologyLink(notification))); + } + }); } @Override @@ -177,13 +150,13 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open // NOOP } - private static InstanceIdentifier toNodeIdentifier(final NodeRef ref) { + private InstanceIdentifier toNodeIdentifier(final NodeRef ref) { org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref); NodeKey nodeKey = new NodeKey(toTopologyNodeId(invNodeKey.getId())); - return TOPOLOGY_PATH.child(Node.class, nodeKey); + return topology.child(Node.class, nodeKey); } - private static InstanceIdentifier toTerminationPointIdentifier(final NodeConnectorRef ref) { + private InstanceIdentifier toTerminationPointIdentifier(final NodeConnectorRef ref) { org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref); NodeConnectorKey invNodeConnectorKey = getNodeConnectorKey(ref); return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId())); @@ -191,62 +164,39 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open private void removeAffectedLinks(final DataModificationTransaction transaction, final NodeId id) { TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction); - - Topology topologyData = reader.readOperationalData(TOPOLOGY_PATH); - if (topologyData == null) { - return; - } - for (Link link : topologyData.getLink()) { - if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) { - transaction.removeOperationalData(linkPath(link)); + Topology topologyData = reader.readOperationalData(topology); + if (topologyData != null) { + for (Link link : topologyData.getLink()) { + if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) { + transaction.removeOperationalData(linkPath(link)); + } } } } private void removeAffectedLinks(final DataModificationTransaction transaction, final TpId id) { TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction); - Topology topologyData = reader.readOperationalData(TOPOLOGY_PATH); - if (topologyData == null) { - return; - } - for (Link link : topologyData.getLink()) { - if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) { - transaction.removeOperationalData(linkPath(link)); + Topology topologyData = reader.readOperationalData(topology); + if (topologyData != null) { + for (Link link : topologyData.getLink()) { + if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) { + transaction.removeOperationalData(linkPath(link)); + } } } } - private static InstanceIdentifier getNodePath(final NodeId nodeId) { - return TOPOLOGY_PATH.child(Node.class, new NodeKey(nodeId)); + private InstanceIdentifier getNodePath(final NodeId nodeId) { + return topology.child(Node.class, new NodeKey(nodeId)); } - private static InstanceIdentifier tpPath(final NodeId nodeId, final TpId tpId) { + private InstanceIdentifier tpPath(final NodeId nodeId, final TpId tpId) { NodeKey nodeKey = new NodeKey(nodeId); TerminationPointKey tpKey = new TerminationPointKey(tpId); - return TOPOLOGY_PATH.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey); - } - - private static InstanceIdentifier linkPath(final Link link) { - return TOPOLOGY_PATH.child(Link.class, link.getKey()); + return topology.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey); } - /** - * @param txId transaction identificator - * @param future transaction result - */ - private static void listenOnTransactionState(final Object txId, final Future> future) { - Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback>() { - @Override - public void onFailure(final Throwable t) { - LOG.error("Topology export failed for Tx:{}", txId, t); - } - - @Override - public void onSuccess(final RpcResult result) { - if(!result.isSuccessful()) { - LOG.error("Topology export failed for Tx:{}", txId); - } - } - }); + private InstanceIdentifier linkPath(final Link link) { + return topology.child(Link.class, link.getKey()); } } diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java index e77ba8769c..d656bda932 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java @@ -7,11 +7,20 @@ */ package org.opendaylight.md.controller.topology.manager; +import java.util.concurrent.ExecutionException; + import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; import org.osgi.framework.BundleContext; import org.slf4j.Logger; @@ -19,58 +28,60 @@ import org.slf4j.LoggerFactory; public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider implements AutoCloseable { private final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class); + private Registration listenerRegistration; + private Thread thread; - private DataProviderService dataService; - - public DataProviderService getDataService() { - return this.dataService; - } + /** + * Gets called on start of a bundle. + * + * @param session + */ + @Override + public synchronized void onSessionInitiated(final ProviderContext session) { + final DataProviderService dataService = session.getSALService(DataProviderService.class); + final NotificationProviderService notificationService = session.getSALService(NotificationProviderService.class); - public void setDataService(final DataProviderService dataService) { - this.dataService = dataService; - } + final String name = "flow:1"; + final TopologyKey key = new TopologyKey(new TopologyId(name)); + final InstanceIdentifier path = InstanceIdentifier + .builder(NetworkTopology.class) + .child(Topology.class, key) + .build(); - private NotificationProviderService notificationService; + final OperationProcessor processor = new OperationProcessor(dataService); + final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path); + this.listenerRegistration = notificationService.registerNotificationListener(listener); - public NotificationProviderService getNotificationService() { - return this.notificationService; - } + final DataModificationTransaction tx = dataService.beginTransaction(); + tx.putOperationalData(path, new TopologyBuilder().setKey(key).build()); + try { + tx.commit().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Initial topology export failed, continuing anyway", e); + } - public void setNotificationService(final NotificationProviderService notificationService) { - this.notificationService = notificationService; + thread = new Thread(processor); + thread.setDaemon(true); + thread.setName("FlowCapableTopologyExporter-" + name); + thread.start(); } - private final FlowCapableTopologyExporter exporter = new FlowCapableTopologyExporter(); - private Registration listenerRegistration; - @Override - public void close() { - - FlowCapableTopologyProvider.LOG.info("FlowCapableTopologyProvider stopped."); - dataService = null; - notificationService = null; + public synchronized void close() throws InterruptedException { + LOG.info("FlowCapableTopologyProvider stopped."); if (this.listenerRegistration != null) { try { this.listenerRegistration.close(); } catch (Exception e) { - throw new IllegalStateException("Exception during close of listener registration.",e); + LOG.error("Failed to close listener registration", e); } + listenerRegistration = null; + } + if (thread != null) { + thread.interrupt(); + thread.join(); + thread = null; } - } - - /** - * Gets called on start of a bundle. - * - * @param session - */ - @Override - public void onSessionInitiated(final ProviderContext session) { - dataService = session.getSALService(DataProviderService.class); - notificationService = session.getSALService(NotificationProviderService.class); - this.exporter.setDataService(dataService); - this.exporter.start(); - this.listenerRegistration = notificationService.registerNotificationListener(this.exporter); - ; } /** @@ -81,6 +92,10 @@ public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider im */ @Override public void stopImpl(final BundleContext context) { - this.close(); + try { + this.close(); + } catch (InterruptedException e) { + LOG.error("Failed to stop provider", e); + } } } diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java new file mode 100644 index 0000000000..d60c88032d --- /dev/null +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.md.controller.topology.manager; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; +import org.opendaylight.controller.sal.binding.api.data.DataProviderService; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +final class OperationProcessor implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class); + private static final int MAX_TRANSACTION_OPERATIONS = 100; + private static final int OPERATION_QUEUE_DEPTH = 500; + + private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); + // FIXME: Flow capable topology exporter should use transaction chaining API + private final DataProviderService dataService; + + OperationProcessor(final DataProviderService dataService) { + this.dataService = Preconditions.checkNotNull(dataService); + } + + void enqueueOperation(final TopologyOperation task) { + try { + queue.put(task); + } catch (InterruptedException e) { + LOG.warn("Interrupted while submitting task {}", task, e); + } + } + + @Override + public void run() { + try { + for (;;) { + TopologyOperation op = queue.take(); + + LOG.debug("New operations available, starting transaction"); + final DataModificationTransaction tx = dataService.beginTransaction(); + + int ops = 0; + do { + op.applyOperation(tx); + + ops++; + if (ops < MAX_TRANSACTION_OPERATIONS) { + op = queue.poll(); + } else { + op = null; + } + } while (op != null); + + LOG.debug("Processed {} operations, submitting transaction", ops); + + try { + final RpcResult s = tx.commit().get(); + if (!s.isSuccessful()) { + LOG.error("Topology export failed for Tx:{}", tx.getIdentifier()); + } + } catch (ExecutionException e) { + LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause()); + } + } + } catch (InterruptedException e) { + LOG.info("Interrupted processing, terminating", e); + } + + // Drain all events, making sure any blocked threads are unblocked + while (!queue.isEmpty()) { + queue.poll(); + } + } +} diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java new file mode 100644 index 0000000000..29d06beade --- /dev/null +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.md.controller.topology.manager; + +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; + +/** + * Internal interface for submitted operations. Implementations of this + * interface are enqueued and batched into data store transactions. + */ +interface TopologyOperation { + /** + * Execute the operation on top of the transaction. + * + * @param transaction Datastore transaction + */ + void applyOperation(DataModificationTransaction transaction); +} \ No newline at end of file