Eliminate TransactionInvokerImpl.successfulTransactionQueue
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
index acbe3fc3b478ea5f4b56480cca11b8b908ac8cac..98b423e11054ea2345c0dfc1f1adc691721e619c 100644 (file)
@@ -1,5 +1,17 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.ovsdb.southbound.transactions.md;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -9,91 +21,126 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
-
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
+
+    private final DataBroker db;
+    private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final ExecutorService executor;
+
+    private final AtomicBoolean runTask = new AtomicBoolean(true);
+
+    @GuardedBy("this")
+    private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private BindingTransactionChain chain;
-    private DataBroker db;
-    private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<TransactionCommand>(QUEUE_SIZE);
-    private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue = new LinkedBlockingQueue<ReadWriteTransaction>(QUEUE_SIZE);
-    private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<AsyncTransaction<?, ?>>(QUEUE_SIZE);
-    private ExecutorService executor;
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<ReadWriteTransaction>();
-
-    public TransactionInvokerImpl(DataBroker db) {
+
+    public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
         executor = Executors.newSingleThreadExecutor(threadFact);
-        executor.submit(this);
+        executor.execute(this);
     }
 
     @Override
     public void invoke(final TransactionCommand command) {
         // TODO what do we do if queue is full?
-        inputQueue.offer(command);
+        if (!inputQueue.offer(command)) {
+            LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
+        }
     }
 
     @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> chain,
-            AsyncTransaction<?, ?> transaction, Throwable cause) {
-        failedTransactionQueue.offer(transaction);
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
+            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+        LOG.error("Failed to write operational topology", cause);
+        offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
         // NO OP
-
     }
 
     @Override
     public void run() {
-        while(true) {
-            forgetSuccessfulTransactions();
+        while (runTask.get()) {
+            final List<TransactionCommand> commands;
             try {
-                List<TransactionCommand> commands = extractCommands();
-                for(TransactionCommand command: commands) {
-                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                    recordPendingTransaction(command, transaction);
-                    command.execute(transaction);
-                    Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            successfulTransactionQueue.offer(transaction);
-                        }
-
-                        @Override
-                        public void onFailure(final Throwable t) {
-                            // NOOP - handled by failure of transaction chain
-                        }
-                    });
-                }
-            } catch (Exception e) {
+                commands = extractCommands();
+            } catch (InterruptedException e) {
+                LOG.warn("Extracting commands was interrupted.", e);
+                continue;
+            }
 
+            ReadWriteTransaction transactionInFlight = null;
+            try {
+                for (TransactionCommand command: commands) {
+                    synchronized (this) {
+                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                        transactionInFlight = transaction;
+                        recordPendingTransaction(command, transaction);
+                        command.execute(transaction);
+                        Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                forgetSuccessfulTransaction(transaction);
+                                command.onSuccess();
+                            }
+
+                            @Override
+                            public void onFailure(final Throwable throwable) {
+                                command.onFailure(throwable);
+                                // NOOP - handled by failure of transaction chain
+                            }
+                        }, MoreExecutors.directExecutor());
+                    }
+                }
+            } catch (IllegalStateException e) {
+                if (transactionInFlight != null) {
+                    // TODO: This method should distinguish exceptions on which the command should be
+                    // retried from exceptions on which the command should NOT be retried.
+                    // Then it should retry only the commands which should be retried, otherwise
+                    // this method will retry commands which will never be successful forever.
+                    offerFailedTransaction(transactionInFlight);
+                }
+                LOG.warn("Failed to process an update notification from OVS.", e);
             }
         }
     }
 
-    private List<TransactionCommand> extractResubmitCommands() {
+    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
+        if (!failedTransactionQueue.offer(transaction)) {
+            LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
+        }
+    }
+
+    @VisibleForTesting
+    synchronized List<TransactionCommand> extractResubmitCommands() {
         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
-        List<TransactionCommand> commands = new ArrayList<TransactionCommand>();
-        if(transaction != null) {
+        List<TransactionCommand> commands = new ArrayList<>();
+        if (transaction != null) {
             int index = pendingTransactions.lastIndexOf(transaction);
-            List<ReadWriteTransaction> transactions = pendingTransactions.subList(index, pendingTransactions.size()-1);
-            for(ReadWriteTransaction tx: transactions) {
+            List<ReadWriteTransaction> transactions =
+                    pendingTransactions.subList(index, pendingTransactions.size() - 1);
+            for (ReadWriteTransaction tx: transactions) {
                 commands.add(transactionToCommand.get(tx));
             }
             resetTransactionQueue();
@@ -101,16 +148,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return commands;
     }
 
-    private void resetTransactionQueue() {
+    @VisibleForTesting
+    synchronized void resetTransactionQueue() {
         chain.close();
         chain = db.createTransactionChain(this);
-        pendingTransactions = new ArrayList<ReadWriteTransaction>();
-        transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
+        pendingTransactions.clear();
+        transactionToCommand.clear();
         failedTransactionQueue.clear();
-        successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(TransactionCommand command,
+    synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+        pendingTransactions.remove(transaction);
+        transactionToCommand.remove(transaction);
+    }
+
+    private synchronized void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -122,27 +174,22 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return commands;
     }
 
-    private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
-        List<TransactionCommand> result = new ArrayList<TransactionCommand>();
+    @VisibleForTesting
+    List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
+        List<TransactionCommand> result = new ArrayList<>();
         TransactionCommand command = inputQueue.take();
-        while(command != null) {
-            result.add(command);
-            command = inputQueue.poll();
-        }
+        result.add(command);
+        inputQueue.drainTo(result);
         return result;
     }
 
-    private void forgetSuccessfulTransactions() {
-        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
-        while(transaction != null) {
-            pendingTransactions.remove(transaction);
-            transactionToCommand.remove(transaction);
-            transaction = successfulTransactionQueue.poll();
-        }
-    }
-
     @Override
-    public void close() throws Exception {
-      this.executor.shutdown();
+    public void close() throws InterruptedException {
+        this.chain.close();
+        this.executor.shutdown();
+        if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
+            runTask.set(false);
+            this.executor.shutdownNow();
+        }
     }
 }