From: Robert Varga Date: Mon, 9 Jun 2014 15:46:24 +0000 (+0200) Subject: Inventory manager: get rid of synchronized blocks X-Git-Tag: release/helium~622^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F28%2F7828%2F6 Inventory manager: get rid of synchronized blocks This patch reworks the inventory manager to operate on batches, such that it does not block notification threads and lowers the pressure on the datastore. Change-Id: I1953ce22446853b99a201381ff4d7b64a3cfcee7 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index 7e4190f1df..6ed61e3024 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -7,61 +7,117 @@ */ package org.opendaylight.controller.md.inventory.manager; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.binding.NotificationListener; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlowCapableInventoryProvider implements AutoCloseable { +import com.google.common.base.Preconditions; + +class FlowCapableInventoryProvider implements AutoCloseable, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); + private static final int QUEUE_DEPTH = 500; + private static final int MAX_BATCH = 100; + + private final BlockingQueue queue = new LinkedBlockingDeque<>(QUEUE_DEPTH); + private final NotificationProviderService notificationService; + private final DataProviderService dataService; + private Registration listenerRegistration; + private Thread thread; - private final static Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); + FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) { + this.dataService = Preconditions.checkNotNull(dataService); + this.notificationService = Preconditions.checkNotNull(notificationService); + } + + void start() { + final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); + this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter); - private DataProviderService dataService; - private NotificationProviderService notificationService; - private Registration listenerRegistration; - private final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); + thread = new Thread(this); + thread.setDaemon(true); + thread.setName("FlowCapableInventoryProvider"); + thread.start(); - public void start() { - this.listenerRegistration = this.notificationService.registerNotificationListener(this.changeCommiter); LOG.info("Flow Capable Inventory Provider started."); } - protected DataModificationTransaction startChange() { - DataProviderService _dataService = this.dataService; - return _dataService.beginTransaction(); + void enqueue(final InventoryOperation op) { + try { + queue.put(op); + } catch (InterruptedException e) { + LOG.warn("Failed to enqueue operation {}", op, e); + } } @Override - public void close() { - try { - LOG.info("Flow Capable Inventory Provider stopped."); - if (this.listenerRegistration != null) { + public void close() throws InterruptedException { + LOG.info("Flow Capable Inventory Provider stopped."); + if (this.listenerRegistration != null) { + try { this.listenerRegistration.close(); + } catch (Exception e) { + LOG.error("Failed to stop inventory provider", e); } - } catch (Exception e) { - String errMsg = "Error by stop Flow Capable Inventory Provider."; - LOG.error(errMsg, e); - throw new RuntimeException(errMsg, e); + listenerRegistration = null; } - } - public DataProviderService getDataService() { - return this.dataService; - } + if (thread != null) { + thread.interrupt(); + thread.join(); + thread = null; + } - public void setDataService(final DataProviderService dataService) { - this.dataService = dataService; - } - public NotificationProviderService getNotificationService() { - return this.notificationService; } - public void setNotificationService( - final NotificationProviderService notificationService) { - this.notificationService = notificationService; + @Override + public void run() { + try { + for (;;) { + InventoryOperation op = queue.take(); + + final DataModificationTransaction tx = dataService.beginTransaction(); + LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); + + int ops = 0; + do { + op.applyOperation(tx); + + ops++; + if (ops < MAX_BATCH) { + op = queue.poll(); + } else { + op = null; + } + } while (op != null); + + LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); + + try { + final RpcResult result = tx.commit().get(); + if(!result.isSuccessful()) { + LOG.error("Transaction {} failed", tx.getIdentifier()); + } + } catch (ExecutionException e) { + LOG.warn("Failed to commit inventory change", e.getCause()); + } + } + } catch (InterruptedException e) { + LOG.info("Processing interrupted, terminating", e); + } + + // Drain all events, making sure any blocked threads are unblocked + while (!queue.isEmpty()) { + queue.poll(); + } } } diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryActivator.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryActivator.java index 6c06088fc4..5bcae367e3 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryActivator.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryActivator.java @@ -12,23 +12,32 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InventoryActivator extends AbstractBindingAwareProvider { - - private static FlowCapableInventoryProvider provider = new FlowCapableInventoryProvider(); + private static final Logger LOG = LoggerFactory.getLogger(InventoryActivator.class); + private FlowCapableInventoryProvider provider; @Override public void onSessionInitiated(final ProviderContext session) { - DataProviderService salDataService = session. getSALService(DataProviderService.class); + DataProviderService salDataService = session.getSALService(DataProviderService.class); NotificationProviderService salNotifiService = - session. getSALService(NotificationProviderService.class); - InventoryActivator.provider.setDataService(salDataService); - InventoryActivator.provider.setNotificationService(salNotifiService); - InventoryActivator.provider.start(); + session.getSALService(NotificationProviderService.class); + + provider = new FlowCapableInventoryProvider(salDataService, salNotifiService); + provider.start(); } @Override protected void stopImpl(final BundleContext context) { - InventoryActivator.provider.close(); + if (provider != null) { + try { + provider.close(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for shutdown", e); + } + provider = null; + } } } diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java new file mode 100644 index 0000000000..3be5fcf643 --- /dev/null +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java @@ -0,0 +1,16 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.inventory.manager; + +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; + +interface InventoryOperation { + + void applyOperation(DataModificationTransaction tx); + +} diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java index 674ae398d3..3db3c93fcc 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java @@ -7,15 +7,11 @@ */ package org.opendaylight.controller.md.inventory.manager; -import java.util.concurrent.Future; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; @@ -31,123 +27,90 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; - -public class NodeChangeCommiter implements OpendaylightInventoryListener { +import com.google.common.base.Preconditions; - protected final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class); +class NodeChangeCommiter implements OpendaylightInventoryListener { + private static final Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class); private final FlowCapableInventoryProvider manager; public NodeChangeCommiter(final FlowCapableInventoryProvider manager) { - this.manager = manager; - } - - public FlowCapableInventoryProvider getManager() { - return this.manager; + this.manager = Preconditions.checkNotNull(manager); } @Override public synchronized void onNodeConnectorRemoved(final NodeConnectorRemoved connector) { - - final NodeConnectorRef ref = connector.getNodeConnectorRef(); - final DataModificationTransaction it = this.getManager().startChange(); - LOG.debug("removing node connector {} ", ref.getValue()); - it.removeOperationalData(ref.getValue()); - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector removal", ref.getValue()); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(final DataModificationTransaction tx) { + final NodeConnectorRef ref = connector.getNodeConnectorRef(); + LOG.debug("removing node connector {} ", ref.getValue()); + tx.removeOperationalData(ref.getValue()); + } + }); } @Override public synchronized void onNodeConnectorUpdated(final NodeConnectorUpdated connector) { - - final NodeConnectorRef ref = connector.getNodeConnectorRef(); - final FlowCapableNodeConnectorUpdated flowConnector = connector - .getAugmentation(FlowCapableNodeConnectorUpdated.class); - final DataModificationTransaction it = this.manager.startChange(); - final NodeConnectorBuilder data = new NodeConnectorBuilder(connector); - NodeConnectorId id = connector.getId(); - NodeConnectorKey nodeConnectorKey = new NodeConnectorKey(id); - data.setKey(nodeConnectorKey); - boolean notEquals = (!Objects.equal(flowConnector, null)); - if (notEquals) { - final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector); - data.addAugmentation(FlowCapableNodeConnector.class, augment); - } - InstanceIdentifier value = ref.getValue(); - LOG.debug("updating node connector : {}.", value); - NodeConnector build = data.build(); - it.putOperationalData((value), build); - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector update", ref.getValue()); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(final DataModificationTransaction tx) { + final NodeConnectorRef ref = connector.getNodeConnectorRef(); + final NodeConnectorBuilder data = new NodeConnectorBuilder(connector); + data.setKey(new NodeConnectorKey(connector.getId())); + + final FlowCapableNodeConnectorUpdated flowConnector = connector + .getAugmentation(FlowCapableNodeConnectorUpdated.class); + if (flowConnector != null) { + final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector); + data.addAugmentation(FlowCapableNodeConnector.class, augment); + } + InstanceIdentifier value = ref.getValue(); + LOG.debug("updating node connector : {}.", value); + NodeConnector build = data.build(); + tx.putOperationalData(value, build); + } + }); } @Override public synchronized void onNodeRemoved(final NodeRemoved node) { - - final NodeRef ref = node.getNodeRef(); - final DataModificationTransaction it = this.manager.startChange(); - LOG.debug("removing node : {}", ref.getValue()); - it.removeOperationalData((ref.getValue())); - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "node removal", ref.getValue()); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(final DataModificationTransaction tx) { + final NodeRef ref = node.getNodeRef(); + LOG.debug("removing node : {}", ref.getValue()); + tx.removeOperationalData((ref.getValue())); + } + }); } @Override public synchronized void onNodeUpdated(final NodeUpdated node) { - - final NodeRef ref = node.getNodeRef(); - final FlowCapableNodeUpdated flowNode = node - . getAugmentation(FlowCapableNodeUpdated.class); - final DataModificationTransaction it = this.manager.startChange(); - final NodeBuilder nodeBuilder = new NodeBuilder(node); - nodeBuilder.setKey(new NodeKey(node.getId())); - boolean equals = Objects.equal(flowNode, null); - if (equals) { + final FlowCapableNodeUpdated flowNode = node.getAugmentation(FlowCapableNodeUpdated.class); + if (flowNode == null) { return; } - final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode); - nodeBuilder.addAugmentation(FlowCapableNode.class, augment); - InstanceIdentifier value = ref.getValue(); - InstanceIdentifierBuilder builder = ((InstanceIdentifier) value).builder(); - InstanceIdentifierBuilder augmentation = builder - . augmentation(FlowCapableNode.class); - final InstanceIdentifier path = augmentation.build(); - LOG.debug("updating node :{} ", path); - it.putOperationalData(path, augment); - - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "node update", ref.getValue()); - } - - /** - * @param txId transaction identificator - * @param future transaction result - * @param action performed by transaction - * @param nodeConnectorPath target value - */ - private static void listenOnTransactionState(final Object txId, Future> future, - final String action, final InstanceIdentifier nodeConnectorPath) { - Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback>() { - - @Override - public void onFailure(Throwable t) { - LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId, t); - - } + manager.enqueue(new InventoryOperation() { @Override - public void onSuccess(RpcResult result) { - if(!result.isSuccessful()) { - LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId); - } + public void applyOperation(final DataModificationTransaction tx) { + final NodeRef ref = node.getNodeRef(); + final NodeBuilder nodeBuilder = new NodeBuilder(node); + nodeBuilder.setKey(new NodeKey(node.getId())); + + final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode); + nodeBuilder.addAugmentation(FlowCapableNode.class, augment); + + @SuppressWarnings("unchecked") + InstanceIdentifierBuilder builder = ((InstanceIdentifier) ref.getValue()).builder(); + InstanceIdentifierBuilder augmentation = builder.augmentation(FlowCapableNode.class); + final InstanceIdentifier path = augmentation.build(); + LOG.debug("updating node :{} ", path); + tx.putOperationalData(path, augment); } }); }