X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Finventory-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Finventory%2Fmanager%2FFlowCapableInventoryProvider.java;h=618fcfc133abfd8335e491036dc0319d25204741;hp=6ed61e3024b9522dbc89d99a979f93fbcb147c7f;hb=531621aac4cff9d39cbd8668a53bdeba8a0e6d81;hpb=e64d758b45360b2ed8b7d8967fc1c7caaa7aa58f diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index 6ed61e3024..618fcfc133 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -7,34 +7,38 @@ */ package org.opendaylight.controller.md.inventory.manager; +import java.util.ArrayList; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -class FlowCapableInventoryProvider implements AutoCloseable, Runnable { +class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); private static final int QUEUE_DEPTH = 500; private static final int MAX_BATCH = 100; private final BlockingQueue queue = new LinkedBlockingDeque<>(QUEUE_DEPTH); private final NotificationProviderService notificationService; - private final DataProviderService dataService; - private Registration listenerRegistration; + + private final DataBroker dataBroker; + private BindingTransactionChain txChain; + private ListenerRegistration listenerRegistration; private Thread thread; - FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) { - this.dataService = Preconditions.checkNotNull(dataService); + FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) { + this.dataBroker = Preconditions.checkNotNull(dataBroker); this.notificationService = Preconditions.checkNotNull(notificationService); } @@ -42,6 +46,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter); + this.txChain = (dataBroker.createTransactionChain(this)); thread = new Thread(this); thread.setDaemon(true); thread.setName("FlowCapableInventoryProvider"); @@ -53,45 +58,20 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { void enqueue(final InventoryOperation op) { try { queue.put(op); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Failed to enqueue operation {}", op, e); } } - @Override - public void close() throws InterruptedException { - LOG.info("Flow Capable Inventory Provider stopped."); - if (this.listenerRegistration != null) { - try { - this.listenerRegistration.close(); - } catch (Exception e) { - LOG.error("Failed to stop inventory provider", e); - } - listenerRegistration = null; - } - - if (thread != null) { - thread.interrupt(); - thread.join(); - thread = null; - } - - - } - @Override public void run() { try { - for (;;) { + for (; ; ) { InventoryOperation op = queue.take(); - - final DataModificationTransaction tx = dataService.beginTransaction(); - LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); - int ops = 0; + final ArrayList opsToApply = new ArrayList<>(MAX_BATCH); do { - op.applyOperation(tx); - + opsToApply.add(op); ops++; if (ops < MAX_BATCH) { op = queue.poll(); @@ -99,19 +79,9 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { op = null; } } while (op != null); - - LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); - - try { - final RpcResult result = tx.commit().get(); - if(!result.isSuccessful()) { - LOG.error("Transaction {} failed", tx.getIdentifier()); - } - } catch (ExecutionException e) { - LOG.warn("Failed to commit inventory change", e.getCause()); - } + submitOperations(opsToApply); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.info("Processing interrupted, terminating", e); } @@ -120,4 +90,132 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { queue.poll(); } } + + /** + * Starts new empty transaction, custimizes it with submitted operations + * and submit it to data broker. + * + * If transaction chain failed during customization of transaction + * it allocates new chain and empty transaction and customizes it + * with submitted operations. + * + * This does not retry failed transaction. It only retries it when + * chain failed during customization of transaction chain. + * + * @param opsToApply + */ + private void submitOperations(final ArrayList opsToApply) { + final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply); + LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier()); + try { + tx.submit(); + } catch (final IllegalStateException e) { + /* + * Transaction chain failed during doing batch, so we need to null + * tx chain and continue processing queue. + * + * We fail current txChain which was allocated with createTransaction. + */ + failCurrentChain(txChain); + /* + * We will retry transaction once in order to not loose any data. + * + */ + final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply); + retryTx.submit(); + } + } + + /** + * Creates new empty ReadWriteTransaction. If transaction chain + * was failed, it will allocate new transaction chain + * and assign it with this Operation Executor. + * + * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}. + * + * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction + * chain. + */ + private synchronized ReadWriteTransaction newEmptyTransaction() { + try { + if(txChain == null) { + // Chain was broken so we need to replace it. + txChain = dataBroker.createTransactionChain(this); + } + return txChain.newReadWriteTransaction(); + } catch (final IllegalStateException e) { + LOG.debug("Chain is broken, need to allocate new transaction chain.",e); + /* + * Chain was broken by previous transaction, + * but there was race between this. + * Chain will be closed by #onTransactionChainFailed method. + */ + txChain = dataBroker.createTransactionChain(this); + return txChain.newReadWriteTransaction(); + } + } + + /** + * Creates customized not-submitted transaction, which is ready to be submitted. + * + * @param opsToApply Operations which are used to customize transaction. + * @return Non-empty transaction. + */ + private ReadWriteTransaction createCustomizedTransaction(final ArrayList opsToApply) { + final ReadWriteTransaction tx = newEmptyTransaction(); + for(final InventoryOperation op : opsToApply) { + op.applyOperation(tx); + } + return tx; + } + + private synchronized void failCurrentChain(final TransactionChain chain) { + if(txChain == chain) { + txChain = null; + } + } + + @Override + public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, + final Throwable cause) { + LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause); + chain.close(); + if(txChain == chain) { + // Current chain is broken, so we will null it, in order to not use it. + failCurrentChain(chain); + } + } + + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + // NOOP + } + + @Override + public void close() throws InterruptedException { + LOG.info("Flow Capable Inventory Provider stopped."); + if (this.listenerRegistration != null) { + try { + this.listenerRegistration.close(); + } catch (final Exception e) { + LOG.error("Failed to stop inventory provider", e); + } + listenerRegistration = null; + } + + if (thread != null) { + thread.interrupt(); + thread.join(); + thread = null; + } + if (txChain != null) { + try { + txChain.close(); + } catch (final IllegalStateException e) { + // It is possible chain failed and was closed by #onTransactionChainFailed + LOG.debug("Chain was already closed."); + } + txChain = null; + } + } }