Bug 2106: Refactored FlowCapableInventoryProvider to properly work with failures.
[controller.git] / opendaylight / md-sal / inventory-manager / src / main / java / org / opendaylight / controller / md / inventory / manager / FlowCapableInventoryProvider.java
index d63f2bef86d551280b9815e896ae9c07da96eace..618fcfc133abfd8335e491036dc0319d25204741 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
@@ -16,16 +17,12 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 
 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
@@ -49,7 +46,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
 
-        this.txChain = dataBroker.createTransactionChain(this);
+        this.txChain = (dataBroker.createTransactionChain(this));
         thread = new Thread(this);
         thread.setDaemon(true);
         thread.setName("FlowCapableInventoryProvider");
@@ -66,51 +63,15 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         }
     }
 
-    @Override
-    public void close() throws InterruptedException {
-        LOG.info("Flow Capable Inventory Provider stopped.");
-        if (this.listenerRegistration != null) {
-            try {
-                this.listenerRegistration.close();
-            } catch (final Exception e) {
-                LOG.error("Failed to stop inventory provider", e);
-            }
-            listenerRegistration = null;
-        }
-
-        if (thread != null) {
-            thread.interrupt();
-            thread.join();
-            thread = null;
-        }
-        if (txChain != null) {
-            txChain.close();
-            txChain = null;
-        }
-
-
-    }
-
     @Override
     public void run() {
         try {
             for (; ; ) {
                 InventoryOperation op = queue.take();
-
-                ReadWriteTransaction tx;
-                try {
-                    tx = txChain.newReadWriteTransaction();
-                } catch (final IllegalStateException e) {
-                    txChain.close();
-                    txChain = dataBroker.createTransactionChain(this);
-                    tx = txChain.newReadWriteTransaction();
-                }
-                LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
-
                 int ops = 0;
+                final ArrayList<InventoryOperation> opsToApply = new ArrayList<>(MAX_BATCH);
                 do {
-                    op.applyOperation(tx);
-
+                    opsToApply.add(op);
                     ops++;
                     if (ops < MAX_BATCH) {
                         op = queue.poll();
@@ -118,22 +79,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
                         op = null;
                     }
                 } while (op != null);
-
-                LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
-
-                final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
-                final Object ident = tx.getIdentifier();
-                Futures.addCallback(result, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(final Void aVoid) {
-                        //NOOP
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable throwable) {
-                        LOG.error("Transaction {} failed.", ident, throwable);
-                    }
-                });
+                submitOperations(opsToApply);
             }
         } catch (final InterruptedException e) {
             LOG.info("Processing interrupted, terminating", e);
@@ -145,15 +91,131 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti
         }
     }
 
+    /**
+     * Starts new empty transaction, custimizes it with submitted operations
+     * and submit it to data broker.
+     *
+     * If transaction chain failed during customization of transaction
+     * it allocates new chain and empty transaction and  customizes it
+     * with submitted operations.
+     *
+     * This does not retry failed transaction. It only retries it when
+     * chain failed during customization of transaction chain.
+     *
+     * @param opsToApply
+     */
+    private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
+        final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
+        LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
+        try {
+            tx.submit();
+        } catch (final IllegalStateException e) {
+            /*
+             * Transaction chain failed during doing batch, so we need to null
+             * tx chain and continue processing queue.
+             *
+             * We fail current txChain which was allocated with createTransaction.
+             */
+            failCurrentChain(txChain);
+            /*
+             * We will retry transaction once in order to not loose any data.
+             *
+             */
+            final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
+            retryTx.submit();
+        }
+    }
+
+    /**
+     * Creates new empty ReadWriteTransaction. If transaction chain
+     * was failed, it will allocate new transaction chain
+     * and assign it with this Operation Executor.
+     *
+     * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
+     *
+     * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
+     *          chain.
+     */
+    private synchronized ReadWriteTransaction newEmptyTransaction() {
+        try {
+            if(txChain == null) {
+                // Chain was broken so we need to replace it.
+                txChain = dataBroker.createTransactionChain(this);
+            }
+            return txChain.newReadWriteTransaction();
+        } catch (final IllegalStateException e) {
+            LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
+            /*
+             *  Chain was broken by previous transaction,
+             *  but there was race between this.
+             *  Chain will be closed by #onTransactionChainFailed method.
+             */
+            txChain = dataBroker.createTransactionChain(this);
+            return txChain.newReadWriteTransaction();
+        }
+    }
+
+    /**
+     * Creates customized not-submitted transaction, which is ready to be submitted.
+     *
+     * @param opsToApply Operations which are used to customize transaction.
+     * @return Non-empty transaction.
+     */
+    private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
+        final ReadWriteTransaction tx = newEmptyTransaction();
+        for(final InventoryOperation op : opsToApply) {
+            op.applyOperation(tx);
+        }
+        return tx;
+    }
+
+    private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
+        if(txChain == chain) {
+            txChain = null;
+        }
+    }
+
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
                                          final Throwable cause) {
         LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
-
+        chain.close();
+        if(txChain == chain) {
+            // Current chain is broken, so we will null it, in order to not use it.
+            failCurrentChain(chain);
+        }
     }
 
     @Override
     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
         // NOOP
     }
+
+    @Override
+    public void close() throws InterruptedException {
+        LOG.info("Flow Capable Inventory Provider stopped.");
+        if (this.listenerRegistration != null) {
+            try {
+                this.listenerRegistration.close();
+            } catch (final Exception e) {
+                LOG.error("Failed to stop inventory provider", e);
+            }
+            listenerRegistration = null;
+        }
+
+        if (thread != null) {
+            thread.interrupt();
+            thread.join();
+            thread = null;
+        }
+        if (txChain != null) {
+            try {
+                txChain.close();
+            } catch (final IllegalStateException e) {
+                // It is possible chain failed and was closed by #onTransactionChainFailed
+                LOG.debug("Chain was already closed.");
+            }
+            txChain = null;
+        }
+    }
 }