X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Finventory-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Finventory%2Fmanager%2FFlowCapableInventoryProvider.java;h=6ed61e3024b9522dbc89d99a979f93fbcb147c7f;hp=7e4190f1df4b7246da4a175d2efabb103282383a;hb=eb3dd53f1337b87d66a28bdcbbd8154c0fb26a45;hpb=e159106bc148e76fc1e3e3c780bdd740d99e74ed diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index 7e4190f1df..6ed61e3024 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -7,61 +7,117 @@ */ package org.opendaylight.controller.md.inventory.manager; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.binding.NotificationListener; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlowCapableInventoryProvider implements AutoCloseable { +import com.google.common.base.Preconditions; + +class FlowCapableInventoryProvider implements AutoCloseable, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); + private static final int QUEUE_DEPTH = 500; + private static final int MAX_BATCH = 100; + + private final BlockingQueue queue = new LinkedBlockingDeque<>(QUEUE_DEPTH); + private final NotificationProviderService notificationService; + private final DataProviderService dataService; + private Registration listenerRegistration; + private Thread thread; - private final static Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); + FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) { + this.dataService = Preconditions.checkNotNull(dataService); + this.notificationService = Preconditions.checkNotNull(notificationService); + } + + void start() { + final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); + this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter); - private DataProviderService dataService; - private NotificationProviderService notificationService; - private Registration listenerRegistration; - private final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); + thread = new Thread(this); + thread.setDaemon(true); + thread.setName("FlowCapableInventoryProvider"); + thread.start(); - public void start() { - this.listenerRegistration = this.notificationService.registerNotificationListener(this.changeCommiter); LOG.info("Flow Capable Inventory Provider started."); } - protected DataModificationTransaction startChange() { - DataProviderService _dataService = this.dataService; - return _dataService.beginTransaction(); + void enqueue(final InventoryOperation op) { + try { + queue.put(op); + } catch (InterruptedException e) { + LOG.warn("Failed to enqueue operation {}", op, e); + } } @Override - public void close() { - try { - LOG.info("Flow Capable Inventory Provider stopped."); - if (this.listenerRegistration != null) { + public void close() throws InterruptedException { + LOG.info("Flow Capable Inventory Provider stopped."); + if (this.listenerRegistration != null) { + try { this.listenerRegistration.close(); + } catch (Exception e) { + LOG.error("Failed to stop inventory provider", e); } - } catch (Exception e) { - String errMsg = "Error by stop Flow Capable Inventory Provider."; - LOG.error(errMsg, e); - throw new RuntimeException(errMsg, e); + listenerRegistration = null; } - } - public DataProviderService getDataService() { - return this.dataService; - } + if (thread != null) { + thread.interrupt(); + thread.join(); + thread = null; + } - public void setDataService(final DataProviderService dataService) { - this.dataService = dataService; - } - public NotificationProviderService getNotificationService() { - return this.notificationService; } - public void setNotificationService( - final NotificationProviderService notificationService) { - this.notificationService = notificationService; + @Override + public void run() { + try { + for (;;) { + InventoryOperation op = queue.take(); + + final DataModificationTransaction tx = dataService.beginTransaction(); + LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); + + int ops = 0; + do { + op.applyOperation(tx); + + ops++; + if (ops < MAX_BATCH) { + op = queue.poll(); + } else { + op = null; + } + } while (op != null); + + LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); + + try { + final RpcResult result = tx.commit().get(); + if(!result.isSuccessful()) { + LOG.error("Transaction {} failed", tx.getIdentifier()); + } + } catch (ExecutionException e) { + LOG.warn("Failed to commit inventory change", e.getCause()); + } + } + } catch (InterruptedException e) { + LOG.info("Processing interrupted, terminating", e); + } + + // Drain all events, making sure any blocked threads are unblocked + while (!queue.isEmpty()) { + queue.poll(); + } } }