Bug 2106: Refactored FlowCapableInventoryProvider to properly work with failures. 63/11663/2
authorTony Tkacik <ttkacik@cisco.com>
Mon, 29 Sep 2014 17:04:40 +0000 (19:04 +0200)
committerEd Warnicke <eaw@cisco.com>
Mon, 29 Sep 2014 17:37:04 +0000 (10:37 -0700)
FlowCapableInventoryProvider was unable to continue if transaction chain
failed during construction of new transaction.

Refactored it in way, that it is able to reconstruct and retry transaction,
which failed because of previous transaction and retries it once
in new transaction chain. If it fails again, it continues with next batch.

Change-Id: I4a90a3b60b0a49d562c8648a779e790f55840b5d
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java

index d63f2bef86d551280b9815e896ae9c07da96eace..618fcfc133abfd8335e491036dc0319d25204741 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
  */
 package org.opendaylight.controller.md.inventory.manager;
 
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
@@ -16,16 +17,12 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 
 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
 
 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
@@ -49,7 +46,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
 
         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
 
-        this.txChain = dataBroker.createTransactionChain(this);
+        this.txChain = (dataBroker.createTransactionChain(this));
         thread = new Thread(this);
         thread.setDaemon(true);
         thread.setName("FlowCapableInventoryProvider");
         thread = new Thread(this);
         thread.setDaemon(true);
         thread.setName("FlowCapableInventoryProvider");
@@ -66,51 +63,15 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         }
     }
 
         }
     }
 
-    @Override
-    public void close() throws InterruptedException {
-        LOG.info("Flow Capable Inventory Provider stopped.");
-        if (this.listenerRegistration != null) {
-            try {
-                this.listenerRegistration.close();
-            } catch (final Exception e) {
-                LOG.error("Failed to stop inventory provider", e);
-            }
-            listenerRegistration = null;
-        }
-
-        if (thread != null) {
-            thread.interrupt();
-            thread.join();
-            thread = null;
-        }
-        if (txChain != null) {
-            txChain.close();
-            txChain = null;
-        }
-
-
-    }
-
     @Override
     public void run() {
         try {
             for (; ; ) {
                 InventoryOperation op = queue.take();
     @Override
     public void run() {
         try {
             for (; ; ) {
                 InventoryOperation op = queue.take();
-
-                ReadWriteTransaction tx;
-                try {
-                    tx = txChain.newReadWriteTransaction();
-                } catch (final IllegalStateException e) {
-                    txChain.close();
-                    txChain = dataBroker.createTransactionChain(this);
-                    tx = txChain.newReadWriteTransaction();
-                }
-                LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
-
                 int ops = 0;
                 int ops = 0;
+                final ArrayList<InventoryOperation> opsToApply = new ArrayList<>(MAX_BATCH);
                 do {
                 do {
-                    op.applyOperation(tx);
-
+                    opsToApply.add(op);
                     ops++;
                     if (ops < MAX_BATCH) {
                         op = queue.poll();
                     ops++;
                     if (ops < MAX_BATCH) {
                         op = queue.poll();
@@ -118,22 +79,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
                         op = null;
                     }
                 } while (op != null);
                         op = null;
                     }
                 } while (op != null);
-
-                LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
-
-                final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
-                final Object ident = tx.getIdentifier();
-                Futures.addCallback(result, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(final Void aVoid) {
-                        //NOOP
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable throwable) {
-                        LOG.error("Transaction {} failed.", ident, throwable);
-                    }
-                });
+                submitOperations(opsToApply);
             }
         } catch (final InterruptedException e) {
             LOG.info("Processing interrupted, terminating", e);
             }
         } catch (final InterruptedException e) {
             LOG.info("Processing interrupted, terminating", e);
@@ -145,15 +91,131 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         }
     }
 
         }
     }
 
+    /**
+     * Starts new empty transaction, custimizes it with submitted operations
+     * and submit it to data broker.
+     *
+     * If transaction chain failed during customization of transaction
+     * it allocates new chain and empty transaction and  customizes it
+     * with submitted operations.
+     *
+     * This does not retry failed transaction. It only retries it when
+     * chain failed during customization of transaction chain.
+     *
+     * @param opsToApply
+     */
+    private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
+        final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
+        LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
+        try {
+            tx.submit();
+        } catch (final IllegalStateException e) {
+            /*
+             * Transaction chain failed during doing batch, so we need to null
+             * tx chain and continue processing queue.
+             *
+             * We fail current txChain which was allocated with createTransaction.
+             */
+            failCurrentChain(txChain);
+            /*
+             * We will retry transaction once in order to not loose any data.
+             *
+             */
+            final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
+            retryTx.submit();
+        }
+    }
+
+    /**
+     * Creates new empty ReadWriteTransaction. If transaction chain
+     * was failed, it will allocate new transaction chain
+     * and assign it with this Operation Executor.
+     *
+     * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
+     *
+     * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
+     *          chain.
+     */
+    private synchronized ReadWriteTransaction newEmptyTransaction() {
+        try {
+            if(txChain == null) {
+                // Chain was broken so we need to replace it.
+                txChain = dataBroker.createTransactionChain(this);
+            }
+            return txChain.newReadWriteTransaction();
+        } catch (final IllegalStateException e) {
+            LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
+            /*
+             *  Chain was broken by previous transaction,
+             *  but there was race between this.
+             *  Chain will be closed by #onTransactionChainFailed method.
+             */
+            txChain = dataBroker.createTransactionChain(this);
+            return txChain.newReadWriteTransaction();
+        }
+    }
+
+    /**
+     * Creates customized not-submitted transaction, which is ready to be submitted.
+     *
+     * @param opsToApply Operations which are used to customize transaction.
+     * @return Non-empty transaction.
+     */
+    private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
+        final ReadWriteTransaction tx = newEmptyTransaction();
+        for(final InventoryOperation op : opsToApply) {
+            op.applyOperation(tx);
+        }
+        return tx;
+    }
+
+    private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
+        if(txChain == chain) {
+            txChain = null;
+        }
+    }
+
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
                                          final Throwable cause) {
         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
                                          final Throwable cause) {
         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
-
+        chain.close();
+        if(txChain == chain) {
+            // Current chain is broken, so we will null it, in order to not use it.
+            failCurrentChain(chain);
+        }
     }
 
     @Override
     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
         // NOOP
     }
     }
 
     @Override
     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
         // NOOP
     }
+
+    @Override
+    public void close() throws InterruptedException {
+        LOG.info("Flow Capable Inventory Provider stopped.");
+        if (this.listenerRegistration != null) {
+            try {
+                this.listenerRegistration.close();
+            } catch (final Exception e) {
+                LOG.error("Failed to stop inventory provider", e);
+            }
+            listenerRegistration = null;
+        }
+
+        if (thread != null) {
+            thread.interrupt();
+            thread.join();
+            thread = null;
+        }
+        if (txChain != null) {
+            try {
+                txChain.close();
+            } catch (final IllegalStateException e) {
+                // It is possible chain failed and was closed by #onTransactionChainFailed
+                LOG.debug("Chain was already closed.");
+            }
+            txChain = null;
+        }
+    }
 }
 }