Inventory manager: get rid of synchronized blocks 28/7828/6
authorRobert Varga <rovarga@cisco.com>
Mon, 9 Jun 2014 15:46:24 +0000 (17:46 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Mon, 23 Jun 2014 09:09:22 +0000 (09:09 +0000)
This patch reworks the inventory manager to operate on batches, such
that it does not block notification threads and lowers the pressure on
the datastore.

Change-Id: I1953ce22446853b99a201381ff4d7b64a3cfcee7
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryActivator.java
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java [new file with mode: 0644]
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java

index 7e4190f..6ed61e3 100644 (file)
  */
 package org.opendaylight.controller.md.inventory.manager;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FlowCapableInventoryProvider implements AutoCloseable {
+import com.google.common.base.Preconditions;
+
+class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
+    private static final int QUEUE_DEPTH = 500;
+    private static final int MAX_BATCH = 100;
+
+    private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
+    private final NotificationProviderService notificationService;
+    private final DataProviderService dataService;
+    private Registration<?> listenerRegistration;
+    private Thread thread;
 
-    private final static Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
+    FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) {
+        this.dataService = Preconditions.checkNotNull(dataService);
+        this.notificationService = Preconditions.checkNotNull(notificationService);
+    }
+
+    void start() {
+        final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
+        this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
 
-    private DataProviderService dataService;
-    private NotificationProviderService notificationService;
-    private Registration<NotificationListener> listenerRegistration;
-    private final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
+        thread = new Thread(this);
+        thread.setDaemon(true);
+        thread.setName("FlowCapableInventoryProvider");
+        thread.start();
 
-    public void start() {
-        this.listenerRegistration = this.notificationService.registerNotificationListener(this.changeCommiter);
         LOG.info("Flow Capable Inventory Provider started.");
     }
 
-    protected DataModificationTransaction startChange() {
-        DataProviderService _dataService = this.dataService;
-        return _dataService.beginTransaction();
+    void enqueue(final InventoryOperation op) {
+        try {
+            queue.put(op);
+        } catch (InterruptedException e) {
+            LOG.warn("Failed to enqueue operation {}", op, e);
+        }
     }
 
     @Override
-    public void close() {
-        try {
-            LOG.info("Flow Capable Inventory Provider stopped.");
-            if (this.listenerRegistration != null) {
+    public void close() throws InterruptedException {
+        LOG.info("Flow Capable Inventory Provider stopped.");
+        if (this.listenerRegistration != null) {
+            try {
                 this.listenerRegistration.close();
+            } catch (Exception e) {
+                LOG.error("Failed to stop inventory provider", e);
             }
-        } catch (Exception e) {
-            String errMsg = "Error by stop Flow Capable Inventory Provider.";
-            LOG.error(errMsg, e);
-            throw new RuntimeException(errMsg, e);
+            listenerRegistration = null;
         }
-    }
 
-    public DataProviderService getDataService() {
-        return this.dataService;
-    }
+        if (thread != null) {
+            thread.interrupt();
+            thread.join();
+            thread = null;
+        }
 
-    public void setDataService(final DataProviderService dataService) {
-        this.dataService = dataService;
-    }
 
-    public NotificationProviderService getNotificationService() {
-        return this.notificationService;
     }
 
-    public void setNotificationService(
-            final NotificationProviderService notificationService) {
-        this.notificationService = notificationService;
+    @Override
+    public void run() {
+        try {
+            for (;;) {
+                InventoryOperation op = queue.take();
+
+                final DataModificationTransaction tx = dataService.beginTransaction();
+                LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
+
+                int ops = 0;
+                do {
+                    op.applyOperation(tx);
+
+                    ops++;
+                    if (ops < MAX_BATCH) {
+                        op = queue.poll();
+                    } else {
+                        op = null;
+                    }
+                } while (op != null);
+
+                LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
+
+                try {
+                    final RpcResult<TransactionStatus> result = tx.commit().get();
+                    if(!result.isSuccessful()) {
+                        LOG.error("Transaction {} failed", tx.getIdentifier());
+                    }
+                } catch (ExecutionException e) {
+                    LOG.warn("Failed to commit inventory change", e.getCause());
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.info("Processing interrupted, terminating", e);
+        }
+
+        // Drain all events, making sure any blocked threads are unblocked
+        while (!queue.isEmpty()) {
+            queue.poll();
+        }
     }
 }
index 6c06088..5bcae36 100644 (file)
@@ -12,23 +12,32 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InventoryActivator extends AbstractBindingAwareProvider {
-
-    private static FlowCapableInventoryProvider provider = new FlowCapableInventoryProvider();
+    private static final Logger LOG = LoggerFactory.getLogger(InventoryActivator.class);
+    private FlowCapableInventoryProvider provider;
 
     @Override
     public void onSessionInitiated(final ProviderContext session) {
-        DataProviderService salDataService = session.<DataProviderService> getSALService(DataProviderService.class);
+        DataProviderService salDataService = session.getSALService(DataProviderService.class);
         NotificationProviderService salNotifiService =
-                session.<NotificationProviderService> getSALService(NotificationProviderService.class);
-        InventoryActivator.provider.setDataService(salDataService);
-        InventoryActivator.provider.setNotificationService(salNotifiService);
-        InventoryActivator.provider.start();
+                session.getSALService(NotificationProviderService.class);
+
+        provider = new FlowCapableInventoryProvider(salDataService, salNotifiService);
+        provider.start();
     }
 
     @Override
     protected void stopImpl(final BundleContext context) {
-        InventoryActivator.provider.close();
+        if (provider != null) {
+            try {
+                provider.close();
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while waiting for shutdown", e);
+            }
+            provider = null;
+        }
     }
 }
diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java
new file mode 100644 (file)
index 0000000..3be5fcf
--- /dev/null
@@ -0,0 +1,16 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.inventory.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+
+interface InventoryOperation {
+
+    void applyOperation(DataModificationTransaction tx);
+
+}
index 674ae39..3db3c93 100644 (file)
@@ -7,15 +7,11 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
-import java.util.concurrent.Future;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
@@ -31,123 +27,90 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-
-public class NodeChangeCommiter implements OpendaylightInventoryListener {
+import com.google.common.base.Preconditions;
 
-    protected final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
+class NodeChangeCommiter implements OpendaylightInventoryListener {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
 
     private final FlowCapableInventoryProvider manager;
 
     public NodeChangeCommiter(final FlowCapableInventoryProvider manager) {
-        this.manager = manager;
-    }
-
-    public FlowCapableInventoryProvider getManager() {
-        return this.manager;
+        this.manager = Preconditions.checkNotNull(manager);
     }
 
     @Override
     public synchronized void onNodeConnectorRemoved(final NodeConnectorRemoved connector) {
-
-        final NodeConnectorRef ref = connector.getNodeConnectorRef();
-        final DataModificationTransaction it = this.getManager().startChange();
-        LOG.debug("removing node connector {} ", ref.getValue());
-        it.removeOperationalData(ref.getValue());
-        Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector removal", ref.getValue());
+        manager.enqueue(new InventoryOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction tx) {
+                final NodeConnectorRef ref = connector.getNodeConnectorRef();
+                LOG.debug("removing node connector {} ", ref.getValue());
+                tx.removeOperationalData(ref.getValue());
+            }
+        });
     }
 
     @Override
     public synchronized void onNodeConnectorUpdated(final NodeConnectorUpdated connector) {
-
-        final NodeConnectorRef ref = connector.getNodeConnectorRef();
-        final FlowCapableNodeConnectorUpdated flowConnector = connector
-                .getAugmentation(FlowCapableNodeConnectorUpdated.class);
-        final DataModificationTransaction it = this.manager.startChange();
-        final NodeConnectorBuilder data = new NodeConnectorBuilder(connector);
-        NodeConnectorId id = connector.getId();
-        NodeConnectorKey nodeConnectorKey = new NodeConnectorKey(id);
-        data.setKey(nodeConnectorKey);
-        boolean notEquals = (!Objects.equal(flowConnector, null));
-        if (notEquals) {
-            final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector);
-            data.addAugmentation(FlowCapableNodeConnector.class, augment);
-        }
-        InstanceIdentifier<? extends Object> value = ref.getValue();
-        LOG.debug("updating node connector : {}.", value);
-        NodeConnector build = data.build();
-        it.putOperationalData((value), build);
-        Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector update", ref.getValue());
+        manager.enqueue(new InventoryOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction tx) {
+                final NodeConnectorRef ref = connector.getNodeConnectorRef();
+                final NodeConnectorBuilder data = new NodeConnectorBuilder(connector);
+                data.setKey(new NodeConnectorKey(connector.getId()));
+
+                final FlowCapableNodeConnectorUpdated flowConnector = connector
+                        .getAugmentation(FlowCapableNodeConnectorUpdated.class);
+                if (flowConnector != null) {
+                    final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector);
+                    data.addAugmentation(FlowCapableNodeConnector.class, augment);
+                }
+                InstanceIdentifier<? extends Object> value = ref.getValue();
+                LOG.debug("updating node connector : {}.", value);
+                NodeConnector build = data.build();
+                tx.putOperationalData(value, build);
+            }
+        });
     }
 
     @Override
     public synchronized void onNodeRemoved(final NodeRemoved node) {
-
-        final NodeRef ref = node.getNodeRef();
-        final DataModificationTransaction it = this.manager.startChange();
-        LOG.debug("removing node : {}", ref.getValue());
-        it.removeOperationalData((ref.getValue()));
-        Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        listenOnTransactionState(it.getIdentifier(), commitResult, "node removal", ref.getValue());
+        manager.enqueue(new InventoryOperation() {
+            @Override
+            public void applyOperation(final DataModificationTransaction tx) {
+                final NodeRef ref = node.getNodeRef();
+                LOG.debug("removing node : {}", ref.getValue());
+                tx.removeOperationalData((ref.getValue()));
+            }
+        });
     }
 
     @Override
     public synchronized void onNodeUpdated(final NodeUpdated node) {
-
-        final NodeRef ref = node.getNodeRef();
-        final FlowCapableNodeUpdated flowNode = node
-                .<FlowCapableNodeUpdated> getAugmentation(FlowCapableNodeUpdated.class);
-        final DataModificationTransaction it = this.manager.startChange();
-        final NodeBuilder nodeBuilder = new NodeBuilder(node);
-        nodeBuilder.setKey(new NodeKey(node.getId()));
-        boolean equals = Objects.equal(flowNode, null);
-        if (equals) {
+        final FlowCapableNodeUpdated flowNode = node.getAugmentation(FlowCapableNodeUpdated.class);
+        if (flowNode == null) {
             return;
         }
-        final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode);
-        nodeBuilder.addAugmentation(FlowCapableNode.class, augment);
-        InstanceIdentifier<? extends Object> value = ref.getValue();
-        InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) value).builder();
-        InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder
-                .<FlowCapableNode> augmentation(FlowCapableNode.class);
-        final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
-        LOG.debug("updating node :{} ", path);
-        it.putOperationalData(path, augment);
-
-        Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        listenOnTransactionState(it.getIdentifier(), commitResult, "node update", ref.getValue());
-    }
-
-    /**
-     * @param txId transaction identificator
-     * @param future transaction result
-     * @param action performed by transaction
-     * @param nodeConnectorPath target value
-     */
-    private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future,
-            final String action, final InstanceIdentifier<?> nodeConnectorPath) {
-        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId, t);
-
-            }
 
+        manager.enqueue(new InventoryOperation() {
             @Override
-            public void onSuccess(RpcResult<TransactionStatus> result) {
-                if(!result.isSuccessful()) {
-                    LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId);
-                }
+            public void applyOperation(final DataModificationTransaction tx) {
+                final NodeRef ref = node.getNodeRef();
+                final NodeBuilder nodeBuilder = new NodeBuilder(node);
+                nodeBuilder.setKey(new NodeKey(node.getId()));
+
+                final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode);
+                nodeBuilder.addAugmentation(FlowCapableNode.class, augment);
+
+                @SuppressWarnings("unchecked")
+                InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
+                InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
+                final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
+                LOG.debug("updating node :{} ", path);
+                tx.putOperationalData(path, augment);
             }
         });
     }