Eliminate TransactionInvokerImpl.successfulTransactionQueue
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
index fdf21997c2ab5b0668b5cbf1966170994edc5cb6..98b423e11054ea2345c0dfc1f1adc691721e619c 100644 (file)
@@ -5,9 +5,13 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.southbound.transactions.md;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -19,7 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -29,84 +33,109 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListen
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-
 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
+
+    private final DataBroker db;
+    private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final ExecutorService executor;
+
+    private final AtomicBoolean runTask = new AtomicBoolean(true);
+
+    @GuardedBy("this")
+    private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private BindingTransactionChain chain;
-    private DataBroker db;
-    private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<TransactionCommand>(QUEUE_SIZE);
-    private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
-        = new LinkedBlockingQueue<ReadWriteTransaction>(QUEUE_SIZE);
-    private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
-        = new LinkedBlockingQueue<AsyncTransaction<?, ?>>(QUEUE_SIZE);
-    private ExecutorService executor;
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
-        = new HashMap<ReadWriteTransaction,TransactionCommand>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<ReadWriteTransaction>();
-    private final AtomicBoolean runTask = new AtomicBoolean( true );
-
-    public TransactionInvokerImpl(DataBroker db) {
+
+    public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
         executor = Executors.newSingleThreadExecutor(threadFact);
-        executor.submit(this);
+        executor.execute(this);
     }
 
     @Override
     public void invoke(final TransactionCommand command) {
         // TODO what do we do if queue is full?
-        inputQueue.offer(command);
+        if (!inputQueue.offer(command)) {
+            LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
+        }
     }
 
     @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> chain,
-            AsyncTransaction<?, ?> transaction, Throwable cause) {
-        failedTransactionQueue.offer(transaction);
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
+            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+        LOG.error("Failed to write operational topology", cause);
+        offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
         // NO OP
-
     }
 
     @Override
     public void run() {
         while (runTask.get()) {
-            forgetSuccessfulTransactions();
+            final List<TransactionCommand> commands;
+            try {
+                commands = extractCommands();
+            } catch (InterruptedException e) {
+                LOG.warn("Extracting commands was interrupted.", e);
+                continue;
+            }
+
+            ReadWriteTransaction transactionInFlight = null;
             try {
-                List<TransactionCommand> commands = extractCommands();
                 for (TransactionCommand command: commands) {
-                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                    recordPendingTransaction(command, transaction);
-                    command.execute(transaction);
-                    Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            successfulTransactionQueue.offer(transaction);
-                        }
-
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            // NOOP - handled by failure of transaction chain
-                        }
-                    });
+                    synchronized (this) {
+                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                        transactionInFlight = transaction;
+                        recordPendingTransaction(command, transaction);
+                        command.execute(transaction);
+                        Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                forgetSuccessfulTransaction(transaction);
+                                command.onSuccess();
+                            }
+
+                            @Override
+                            public void onFailure(final Throwable throwable) {
+                                command.onFailure(throwable);
+                                // NOOP - handled by failure of transaction chain
+                            }
+                        }, MoreExecutors.directExecutor());
+                    }
+                }
+            } catch (IllegalStateException e) {
+                if (transactionInFlight != null) {
+                    // TODO: This method should distinguish exceptions on which the command should be
+                    // retried from exceptions on which the command should NOT be retried.
+                    // Then it should retry only the commands which should be retried, otherwise
+                    // this method will retry commands which will never be successful forever.
+                    offerFailedTransaction(transactionInFlight);
                 }
-            } catch (Exception e) {
-                LOG.warn("Exception invoking Transaction: ", e);
+                LOG.warn("Failed to process an update notification from OVS.", e);
             }
         }
     }
 
-    private List<TransactionCommand> extractResubmitCommands() {
+    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
+        if (!failedTransactionQueue.offer(transaction)) {
+            LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
+        }
+    }
+
+    @VisibleForTesting
+    synchronized List<TransactionCommand> extractResubmitCommands() {
         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
-        List<TransactionCommand> commands = new ArrayList<TransactionCommand>();
+        List<TransactionCommand> commands = new ArrayList<>();
         if (transaction != null) {
             int index = pendingTransactions.lastIndexOf(transaction);
             List<ReadWriteTransaction> transactions =
@@ -119,16 +148,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return commands;
     }
 
-    private void resetTransactionQueue() {
+    @VisibleForTesting
+    synchronized void resetTransactionQueue() {
         chain.close();
         chain = db.createTransactionChain(this);
-        pendingTransactions = new ArrayList<ReadWriteTransaction>();
-        transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
+        pendingTransactions.clear();
+        transactionToCommand.clear();
         failedTransactionQueue.clear();
-        successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(TransactionCommand command,
+    synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+        pendingTransactions.remove(transaction);
+        transactionToCommand.remove(transaction);
+    }
+
+    private synchronized void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -140,27 +174,18 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return commands;
     }
 
-    private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
-        List<TransactionCommand> result = new ArrayList<TransactionCommand>();
+    @VisibleForTesting
+    List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
+        List<TransactionCommand> result = new ArrayList<>();
         TransactionCommand command = inputQueue.take();
-        while (command != null) {
-            result.add(command);
-            command = inputQueue.poll();
-        }
+        result.add(command);
+        inputQueue.drainTo(result);
         return result;
     }
 
-    private void forgetSuccessfulTransactions() {
-        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
-        while (transaction != null) {
-            pendingTransactions.remove(transaction);
-            transactionToCommand.remove(transaction);
-            transaction = successfulTransactionQueue.poll();
-        }
-    }
-
     @Override
-    public void close() throws Exception {
+    public void close() throws InterruptedException {
+        this.chain.close();
         this.executor.shutdown();
         if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
             runTask.set(false);