Topology manager: get rid of sychronized sections 05/7805/4
authorRobert Varga <rovarga@cisco.com>
Sat, 7 Jun 2014 10:31:55 +0000 (12:31 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 9 Jun 2014 15:46:12 +0000 (17:46 +0200)
Instead of executing changes one by one, we aggregate them such that
ordering is preserved and we have one outstanding transaction at any
given moment. This is done by placing a queue between the notification
and data store, dispatched by a background thread. This should improve
scalability by reducing number of transactions and improving
notification thread concurrency.

Change-Id: Ica70971e4540a060491ac4c0b89a134984947fca
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java [new file with mode: 0644]
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java [new file with mode: 0644]

index cf53c97..6dbfd72 100644 (file)
@@ -15,12 +15,8 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
-import java.util.concurrent.Future;
-
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener;
@@ -36,124 +32,102 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRem
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.base.Preconditions;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
-    protected final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
-    public static final TopologyKey TOPOLOGY = new TopologyKey(new TopologyId("flow:1"));
-    private static final InstanceIdentifier<Topology> TOPOLOGY_PATH =
-            InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, TOPOLOGY).build();
-
-    // FIXME: Flow capable topology exporter should use transaction chaining API
-    private DataProviderService dataService;
-
-    public DataProviderService getDataService() {
-        return dataService;
-    }
-
-    public void setDataService(final DataProviderService dataService) {
-        this.dataService = dataService;
-    }
+    private final InstanceIdentifier<Topology> topology;
+    private final OperationProcessor processor;
 
-    public void start() {
-        TopologyBuilder tb = new TopologyBuilder().setKey(TOPOLOGY);
-        DataModificationTransaction tx = dataService.beginTransaction();
-        tx.putOperationalData(TOPOLOGY_PATH, tb.build());
-        listenOnTransactionState(tx.getIdentifier(),tx.commit());
+    FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier<Topology> topology) {
+        this.processor = Preconditions.checkNotNull(processor);
+        this.topology = Preconditions.checkNotNull(topology);
     }
 
     @Override
     public void onNodeRemoved(final NodeRemoved notification) {
-        NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
-        InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
-
-        synchronized (this) {
-            DataModificationTransaction tx = dataService.beginTransaction();
-            tx.removeOperationalData(nodeInstance);
-            removeAffectedLinks(tx, nodeId);
-            listenOnTransactionState(tx.getIdentifier(),tx.commit());
-        }
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction transaction) {
+                NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
+                InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
+                transaction.removeOperationalData(nodeInstance);
+                removeAffectedLinks(transaction, nodeId);
+            }
+        });
     }
 
     @Override
     public void onNodeUpdated(final NodeUpdated notification) {
         FlowCapableNodeUpdated fcnu = notification.getAugmentation(FlowCapableNodeUpdated.class);
         if (fcnu != null) {
-            Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef());
-            InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
-
-            synchronized (this) {
-                DataModificationTransaction tx = dataService.beginTransaction();
-                tx.putOperationalData(path, node);
-                listenOnTransactionState(tx.getIdentifier(),tx.commit());
-            }
+            processor.enqueueOperation(new TopologyOperation() {
+                @Override
+                public void applyOperation(final DataModificationTransaction transaction) {
+                    Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef());
+                    InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
+                    transaction.putOperationalData(path, node);
+                }
+            });
         }
     }
 
     @Override
     public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
-        InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
-                .getNodeConnectorRef());
-        TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
-
-        synchronized (this) {
-            DataModificationTransaction tx = dataService.beginTransaction();
-            tx.removeOperationalData(tpInstance);
-            removeAffectedLinks(tx, tpId);
-            listenOnTransactionState(tx.getIdentifier(),tx.commit());
-        }
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction transaction) {
+                InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
+                        .getNodeConnectorRef());
+                TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
+
+                transaction.removeOperationalData(tpInstance);
+                removeAffectedLinks(transaction, tpId);
+            }
+        });
     }
 
     @Override
     public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) {
-        FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
+        final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
         if (fcncu != null) {
-            NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId());
-            TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()),
-                    notification.getNodeConnectorRef());
-            InstanceIdentifier<TerminationPoint> path = tpPath(nodeId, point.getKey().getTpId());
-
-            synchronized (this) {
-                DataModificationTransaction tx = dataService.beginTransaction();
-                tx.putOperationalData(path, point);
-                if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
-                        || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
-                    removeAffectedLinks(tx, point.getTpId());
+            processor.enqueueOperation(new TopologyOperation() {
+                @Override
+                public void applyOperation(final DataModificationTransaction transaction) {
+                    NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId());
+                    TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()),
+                            notification.getNodeConnectorRef());
+                    InstanceIdentifier<TerminationPoint> path = tpPath(nodeId, point.getKey().getTpId());
+
+                    transaction.putOperationalData(path, point);
+                    if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
+                            || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
+                        removeAffectedLinks(transaction, point.getTpId());
+                    }
                 }
-                listenOnTransactionState(tx.getIdentifier(),tx.commit());
-            }
+            });
         }
     }
 
     @Override
     public void onLinkDiscovered(final LinkDiscovered notification) {
-        Link link = toTopologyLink(notification);
-        InstanceIdentifier<Link> path = linkPath(link);
-
-        synchronized (this) {
-            DataModificationTransaction tx = dataService.beginTransaction();
-            tx.putOperationalData(path, link);
-            listenOnTransactionState(tx.getIdentifier(),tx.commit());
-        }
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction transaction) {
+                Link link = toTopologyLink(notification);
+                InstanceIdentifier<Link> path = linkPath(link);
+                transaction.putOperationalData(path, link);
+            }
+        });
     }
 
     @Override
@@ -163,13 +137,12 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
 
     @Override
     public void onLinkRemoved(final LinkRemoved notification) {
-        InstanceIdentifier<Link> path = linkPath(toTopologyLink(notification));
-
-        synchronized (this) {
-            DataModificationTransaction tx = dataService.beginTransaction();
-            tx.removeOperationalData(path);
-            listenOnTransactionState(tx.getIdentifier(),tx.commit());
-        }
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction transaction) {
+                transaction.removeOperationalData(linkPath(toTopologyLink(notification)));
+            }
+        });
     }
 
     @Override
@@ -177,13 +150,13 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         // NOOP
     }
 
-    private static InstanceIdentifier<Node> toNodeIdentifier(final NodeRef ref) {
+    private InstanceIdentifier<Node> toNodeIdentifier(final NodeRef ref) {
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
         NodeKey nodeKey = new NodeKey(toTopologyNodeId(invNodeKey.getId()));
-        return TOPOLOGY_PATH.child(Node.class, nodeKey);
+        return topology.child(Node.class, nodeKey);
     }
 
-    private static InstanceIdentifier<TerminationPoint> toTerminationPointIdentifier(final NodeConnectorRef ref) {
+    private InstanceIdentifier<TerminationPoint> toTerminationPointIdentifier(final NodeConnectorRef ref) {
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
         NodeConnectorKey invNodeConnectorKey = getNodeConnectorKey(ref);
         return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
@@ -191,62 +164,39 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
 
     private void removeAffectedLinks(final DataModificationTransaction transaction, final NodeId id) {
         TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction);
-
-        Topology topologyData = reader.readOperationalData(TOPOLOGY_PATH);
-        if (topologyData == null) {
-            return;
-        }
-        for (Link link : topologyData.getLink()) {
-            if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
-                transaction.removeOperationalData(linkPath(link));
+        Topology topologyData = reader.readOperationalData(topology);
+        if (topologyData != null) {
+            for (Link link : topologyData.getLink()) {
+                if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
+                    transaction.removeOperationalData(linkPath(link));
+                }
             }
         }
     }
 
     private void removeAffectedLinks(final DataModificationTransaction transaction, final TpId id) {
         TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction);
-        Topology topologyData = reader.readOperationalData(TOPOLOGY_PATH);
-        if (topologyData == null) {
-            return;
-        }
-        for (Link link : topologyData.getLink()) {
-            if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
-                transaction.removeOperationalData(linkPath(link));
+        Topology topologyData = reader.readOperationalData(topology);
+        if (topologyData != null) {
+            for (Link link : topologyData.getLink()) {
+                if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
+                    transaction.removeOperationalData(linkPath(link));
+                }
             }
         }
     }
 
-    private static InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
-        return TOPOLOGY_PATH.child(Node.class, new NodeKey(nodeId));
+    private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
+        return topology.child(Node.class, new NodeKey(nodeId));
     }
 
-    private static InstanceIdentifier<TerminationPoint> tpPath(final NodeId nodeId, final TpId tpId) {
+    private InstanceIdentifier<TerminationPoint> tpPath(final NodeId nodeId, final TpId tpId) {
         NodeKey nodeKey = new NodeKey(nodeId);
         TerminationPointKey tpKey = new TerminationPointKey(tpId);
-        return TOPOLOGY_PATH.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey);
-    }
-
-    private static InstanceIdentifier<Link> linkPath(final Link link) {
-        return TOPOLOGY_PATH.child(Link.class, link.getKey());
+        return topology.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey);
     }
 
-    /**
-     * @param txId transaction identificator
-     * @param future transaction result
-     */
-    private static void listenOnTransactionState(final Object txId, final Future<RpcResult<TransactionStatus>> future) {
-        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Topology export failed for Tx:{}", txId, t);
-            }
-
-            @Override
-            public void onSuccess(final RpcResult<TransactionStatus> result) {
-                if(!result.isSuccessful()) {
-                    LOG.error("Topology export failed for Tx:{}", txId);
-                }
-            }
-        });
+    private InstanceIdentifier<Link> linkPath(final Link link) {
+        return topology.child(Link.class, link.getKey());
     }
 }
index e77ba87..d656bda 100644 (file)
@@ -7,11 +7,20 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
+import java.util.concurrent.ExecutionException;
+
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
@@ -19,58 +28,60 @@ import org.slf4j.LoggerFactory;
 
 public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider implements AutoCloseable {
     private final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class);
+    private Registration<NotificationListener> listenerRegistration;
+    private Thread thread;
 
-    private DataProviderService dataService;
-
-    public DataProviderService getDataService() {
-        return this.dataService;
-    }
+    /**
+     * Gets called on start of a bundle.
+     *
+     * @param session
+     */
+    @Override
+    public synchronized void onSessionInitiated(final ProviderContext session) {
+        final DataProviderService dataService = session.getSALService(DataProviderService.class);
+        final NotificationProviderService notificationService = session.getSALService(NotificationProviderService.class);
 
-    public void setDataService(final DataProviderService dataService) {
-        this.dataService = dataService;
-    }
+        final String name = "flow:1";
+        final TopologyKey key = new TopologyKey(new TopologyId(name));
+        final InstanceIdentifier<Topology> path = InstanceIdentifier
+                .builder(NetworkTopology.class)
+                .child(Topology.class, key)
+                .build();
 
-    private NotificationProviderService notificationService;
+        final OperationProcessor processor = new OperationProcessor(dataService);
+        final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path);
+        this.listenerRegistration = notificationService.registerNotificationListener(listener);
 
-    public NotificationProviderService getNotificationService() {
-        return this.notificationService;
-    }
+        final DataModificationTransaction tx = dataService.beginTransaction();
+        tx.putOperationalData(path, new TopologyBuilder().setKey(key).build());
+        try {
+            tx.commit().get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.warn("Initial topology export failed, continuing anyway", e);
+        }
 
-    public void setNotificationService(final NotificationProviderService notificationService) {
-        this.notificationService = notificationService;
+        thread = new Thread(processor);
+        thread.setDaemon(true);
+        thread.setName("FlowCapableTopologyExporter-" + name);
+        thread.start();
     }
 
-    private final FlowCapableTopologyExporter exporter = new FlowCapableTopologyExporter();
-    private Registration<NotificationListener> listenerRegistration;
-
     @Override
-    public void close() {
-
-        FlowCapableTopologyProvider.LOG.info("FlowCapableTopologyProvider stopped.");
-        dataService = null;
-        notificationService = null;
+    public synchronized void close() throws InterruptedException {
+        LOG.info("FlowCapableTopologyProvider stopped.");
         if (this.listenerRegistration != null) {
             try {
                 this.listenerRegistration.close();
             } catch (Exception e) {
-                throw new IllegalStateException("Exception during close of listener registration.",e);
+                LOG.error("Failed to close listener registration", e);
             }
+            listenerRegistration = null;
+        }
+        if (thread != null) {
+            thread.interrupt();
+            thread.join();
+            thread = null;
         }
-    }
-
-    /**
-     * Gets called on start of a bundle.
-     *
-     * @param session
-     */
-    @Override
-    public void onSessionInitiated(final ProviderContext session) {
-        dataService = session.getSALService(DataProviderService.class);
-        notificationService = session.getSALService(NotificationProviderService.class);
-        this.exporter.setDataService(dataService);
-        this.exporter.start();
-        this.listenerRegistration = notificationService.registerNotificationListener(this.exporter);
-        ;
     }
 
     /**
@@ -81,6 +92,10 @@ public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider im
      */
     @Override
     public void stopImpl(final BundleContext context) {
-        this.close();
+        try {
+            this.close();
+        } catch (InterruptedException e) {
+            LOG.error("Failed to stop provider", e);
+        }
     }
 }
diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
new file mode 100644 (file)
index 0000000..d60c880
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.md.controller.topology.manager;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+final class OperationProcessor implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
+    private static final int MAX_TRANSACTION_OPERATIONS = 100;
+    private static final int OPERATION_QUEUE_DEPTH = 500;
+
+    private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
+    // FIXME: Flow capable topology exporter should use transaction chaining API
+    private final DataProviderService dataService;
+
+    OperationProcessor(final DataProviderService dataService) {
+        this.dataService = Preconditions.checkNotNull(dataService);
+    }
+
+    void enqueueOperation(final TopologyOperation task) {
+        try {
+            queue.put(task);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while submitting task {}", task, e);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (;;) {
+                TopologyOperation op = queue.take();
+
+                LOG.debug("New operations available, starting transaction");
+                final DataModificationTransaction tx = dataService.beginTransaction();
+
+                int ops = 0;
+                do {
+                    op.applyOperation(tx);
+
+                    ops++;
+                    if (ops < MAX_TRANSACTION_OPERATIONS) {
+                        op = queue.poll();
+                    } else {
+                        op = null;
+                    }
+                } while (op != null);
+
+                LOG.debug("Processed {} operations, submitting transaction", ops);
+
+                try {
+                    final RpcResult<TransactionStatus> s = tx.commit().get();
+                    if (!s.isSuccessful()) {
+                        LOG.error("Topology export failed for Tx:{}", tx.getIdentifier());
+                    }
+                } catch (ExecutionException e) {
+                    LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause());
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted processing, terminating", e);
+        }
+
+        // Drain all events, making sure any blocked threads are unblocked
+        while (!queue.isEmpty()) {
+            queue.poll();
+        }
+    }
+}
diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java
new file mode 100644 (file)
index 0000000..29d06be
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.md.controller.topology.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+
+/**
+ * Internal interface for submitted operations. Implementations of this
+ * interface are enqueued and batched into data store transactions.
+ */
+interface TopologyOperation {
+    /**
+     * Execute the operation on top of the transaction.
+     *
+     * @param transaction Datastore transaction
+     */
+    void applyOperation(DataModificationTransaction transaction);
+}
\ No newline at end of file