From: Tony Tkacik Date: Mon, 29 Sep 2014 17:04:40 +0000 (+0200) Subject: Bug 2106: Refactored FlowCapableInventoryProvider to properly work with failures. X-Git-Tag: release/helium~2^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=fe8c630fee135ac9732f3228da443d68c0fd6d7a;p=controller.git Bug 2106: Refactored FlowCapableInventoryProvider to properly work with failures. FlowCapableInventoryProvider was unable to continue if transaction chain failed during construction of new transaction. Refactored it in way, that it is able to reconstruct and retry transaction, which failed because of previous transaction and retries it once in new transaction chain. If it fails again, it continues with next batch. Change-Id: I4a90a3b60b0a49d562c8648a779e790f55840b5d Signed-off-by: Tony Tkacik --- diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index d63f2bef86..618fcfc133 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.md.inventory.manager; +import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -16,16 +17,12 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); @@ -49,7 +46,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter); - this.txChain = dataBroker.createTransactionChain(this); + this.txChain = (dataBroker.createTransactionChain(this)); thread = new Thread(this); thread.setDaemon(true); thread.setName("FlowCapableInventoryProvider"); @@ -66,51 +63,15 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti } } - @Override - public void close() throws InterruptedException { - LOG.info("Flow Capable Inventory Provider stopped."); - if (this.listenerRegistration != null) { - try { - this.listenerRegistration.close(); - } catch (final Exception e) { - LOG.error("Failed to stop inventory provider", e); - } - listenerRegistration = null; - } - - if (thread != null) { - thread.interrupt(); - thread.join(); - thread = null; - } - if (txChain != null) { - txChain.close(); - txChain = null; - } - - - } - @Override public void run() { try { for (; ; ) { InventoryOperation op = queue.take(); - - ReadWriteTransaction tx; - try { - tx = txChain.newReadWriteTransaction(); - } catch (final IllegalStateException e) { - txChain.close(); - txChain = dataBroker.createTransactionChain(this); - tx = txChain.newReadWriteTransaction(); - } - LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); - int ops = 0; + final ArrayList opsToApply = new ArrayList<>(MAX_BATCH); do { - op.applyOperation(tx); - + opsToApply.add(op); ops++; if (ops < MAX_BATCH) { op = queue.poll(); @@ -118,22 +79,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti op = null; } } while (op != null); - - LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); - - final CheckedFuture result = tx.submit(); - final Object ident = tx.getIdentifier(); - Futures.addCallback(result, new FutureCallback() { - @Override - public void onSuccess(final Void aVoid) { - //NOOP - } - - @Override - public void onFailure(final Throwable throwable) { - LOG.error("Transaction {} failed.", ident, throwable); - } - }); + submitOperations(opsToApply); } } catch (final InterruptedException e) { LOG.info("Processing interrupted, terminating", e); @@ -145,15 +91,131 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti } } + /** + * Starts new empty transaction, custimizes it with submitted operations + * and submit it to data broker. + * + * If transaction chain failed during customization of transaction + * it allocates new chain and empty transaction and customizes it + * with submitted operations. + * + * This does not retry failed transaction. It only retries it when + * chain failed during customization of transaction chain. + * + * @param opsToApply + */ + private void submitOperations(final ArrayList opsToApply) { + final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply); + LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier()); + try { + tx.submit(); + } catch (final IllegalStateException e) { + /* + * Transaction chain failed during doing batch, so we need to null + * tx chain and continue processing queue. + * + * We fail current txChain which was allocated with createTransaction. + */ + failCurrentChain(txChain); + /* + * We will retry transaction once in order to not loose any data. + * + */ + final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply); + retryTx.submit(); + } + } + + /** + * Creates new empty ReadWriteTransaction. If transaction chain + * was failed, it will allocate new transaction chain + * and assign it with this Operation Executor. + * + * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}. + * + * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction + * chain. + */ + private synchronized ReadWriteTransaction newEmptyTransaction() { + try { + if(txChain == null) { + // Chain was broken so we need to replace it. + txChain = dataBroker.createTransactionChain(this); + } + return txChain.newReadWriteTransaction(); + } catch (final IllegalStateException e) { + LOG.debug("Chain is broken, need to allocate new transaction chain.",e); + /* + * Chain was broken by previous transaction, + * but there was race between this. + * Chain will be closed by #onTransactionChainFailed method. + */ + txChain = dataBroker.createTransactionChain(this); + return txChain.newReadWriteTransaction(); + } + } + + /** + * Creates customized not-submitted transaction, which is ready to be submitted. + * + * @param opsToApply Operations which are used to customize transaction. + * @return Non-empty transaction. + */ + private ReadWriteTransaction createCustomizedTransaction(final ArrayList opsToApply) { + final ReadWriteTransaction tx = newEmptyTransaction(); + for(final InventoryOperation op : opsToApply) { + op.applyOperation(tx); + } + return tx; + } + + private synchronized void failCurrentChain(final TransactionChain chain) { + if(txChain == chain) { + txChain = null; + } + } + @Override public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause); - + chain.close(); + if(txChain == chain) { + // Current chain is broken, so we will null it, in order to not use it. + failCurrentChain(chain); + } } @Override public void onTransactionChainSuccessful(final TransactionChain chain) { // NOOP } + + @Override + public void close() throws InterruptedException { + LOG.info("Flow Capable Inventory Provider stopped."); + if (this.listenerRegistration != null) { + try { + this.listenerRegistration.close(); + } catch (final Exception e) { + LOG.error("Failed to stop inventory provider", e); + } + listenerRegistration = null; + } + + if (thread != null) { + thread.interrupt(); + thread.join(); + thread = null; + } + if (txChain != null) { + try { + txChain.close(); + } catch (final IllegalStateException e) { + // It is possible chain failed and was closed by #onTransactionChainFailed + LOG.debug("Chain was already closed."); + } + txChain = null; + } + } }