X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=southbound%2Fsouthbound-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fovsdb%2Fsouthbound%2Ftransactions%2Fmd%2FTransactionInvokerImpl.java;h=98b423e11054ea2345c0dfc1f1adc691721e619c;hb=fd925bf08ba4134899a7886686bf5b06621fafdc;hp=6f8c70427d5cb9e21e04c11447132a331214564c;hpb=599cd7b2be1bcbf4b84088ca6944e1bc9ec46c1e;p=ovsdb.git diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java index 6f8c70427..98b423e11 100644 --- a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java +++ b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java @@ -1,5 +1,17 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.ovsdb.southbound.transactions.md; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -9,7 +21,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; - +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -19,84 +33,114 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListen import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - - public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class); private static final int QUEUE_SIZE = 10000; + + private final DataBroker db; + private final BlockingQueue inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final ExecutorService executor; + + private final AtomicBoolean runTask = new AtomicBoolean(true); + + @GuardedBy("this") + private final Map transactionToCommand = new HashMap<>(); + @GuardedBy("this") + private final List pendingTransactions = new ArrayList<>(); + private BindingTransactionChain chain; - private DataBroker db; - private BlockingQueue inputQueue = new LinkedBlockingQueue(QUEUE_SIZE); - private BlockingQueue successfulTransactionQueue = new LinkedBlockingQueue(QUEUE_SIZE); - private BlockingQueue> failedTransactionQueue = new LinkedBlockingQueue>(QUEUE_SIZE); - private ExecutorService executor; - private Map transactionToCommand = new HashMap(); - private List pendingTransactions = new ArrayList(); - - public TransactionInvokerImpl(DataBroker db) { + + public TransactionInvokerImpl(final DataBroker db) { this.db = db; this.chain = db.createTransactionChain(this); ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build(); executor = Executors.newSingleThreadExecutor(threadFact); - executor.submit(this); + executor.execute(this); } @Override public void invoke(final TransactionCommand command) { // TODO what do we do if queue is full? - inputQueue.offer(command); + if (!inputQueue.offer(command)) { + LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command); + } } @Override - public void onTransactionChainFailed(TransactionChain chain, - AsyncTransaction transaction, Throwable cause) { - failedTransactionQueue.offer(transaction); + public void onTransactionChainFailed(final TransactionChain chainArg, + final AsyncTransaction transaction, final Throwable cause) { + LOG.error("Failed to write operational topology", cause); + offerFailedTransaction(transaction); } @Override - public void onTransactionChainSuccessful(TransactionChain chain) { + public void onTransactionChainSuccessful(final TransactionChain chainArg) { // NO OP - } @Override public void run() { - while(true) { - forgetSuccessfulTransactions(); + while (runTask.get()) { + final List commands; + try { + commands = extractCommands(); + } catch (InterruptedException e) { + LOG.warn("Extracting commands was interrupted.", e); + continue; + } + + ReadWriteTransaction transactionInFlight = null; try { - List commands = extractCommands(); - for(TransactionCommand command: commands) { - final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); - recordPendingTransaction(command, transaction); - command.execute(transaction); - Futures.addCallback(transaction.submit(), new FutureCallback() { - @Override - public void onSuccess(final Void result) { - successfulTransactionQueue.offer(transaction); - } - - @Override - public void onFailure(final Throwable t) { - // NOOP - handled by failure of transaction chain - } - }); + for (TransactionCommand command: commands) { + synchronized (this) { + final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); + transactionInFlight = transaction; + recordPendingTransaction(command, transaction); + command.execute(transaction); + Futures.addCallback(transaction.submit(), new FutureCallback() { + @Override + public void onSuccess(final Void result) { + forgetSuccessfulTransaction(transaction); + command.onSuccess(); + } + + @Override + public void onFailure(final Throwable throwable) { + command.onFailure(throwable); + // NOOP - handled by failure of transaction chain + } + }, MoreExecutors.directExecutor()); + } } - } catch (Exception e) { - LOG.warn("Exception invoking Transaction: ",e); + } catch (IllegalStateException e) { + if (transactionInFlight != null) { + // TODO: This method should distinguish exceptions on which the command should be + // retried from exceptions on which the command should NOT be retried. + // Then it should retry only the commands which should be retried, otherwise + // this method will retry commands which will never be successful forever. + offerFailedTransaction(transactionInFlight); + } + LOG.warn("Failed to process an update notification from OVS.", e); } } } - private List extractResubmitCommands() { + private void offerFailedTransaction(final AsyncTransaction transaction) { + if (!failedTransactionQueue.offer(transaction)) { + LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size()); + } + } + + @VisibleForTesting + synchronized List extractResubmitCommands() { AsyncTransaction transaction = failedTransactionQueue.poll(); - List commands = new ArrayList(); - if(transaction != null) { + List commands = new ArrayList<>(); + if (transaction != null) { int index = pendingTransactions.lastIndexOf(transaction); - List transactions = pendingTransactions.subList(index, pendingTransactions.size()-1); - for(ReadWriteTransaction tx: transactions) { + List transactions = + pendingTransactions.subList(index, pendingTransactions.size() - 1); + for (ReadWriteTransaction tx: transactions) { commands.add(transactionToCommand.get(tx)); } resetTransactionQueue(); @@ -104,16 +148,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha return commands; } - private void resetTransactionQueue() { + @VisibleForTesting + synchronized void resetTransactionQueue() { chain.close(); chain = db.createTransactionChain(this); - pendingTransactions = new ArrayList(); - transactionToCommand = new HashMap(); + pendingTransactions.clear(); + transactionToCommand.clear(); failedTransactionQueue.clear(); - successfulTransactionQueue.clear(); } - private void recordPendingTransaction(TransactionCommand command, + synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) { + pendingTransactions.remove(transaction); + transactionToCommand.remove(transaction); + } + + private synchronized void recordPendingTransaction(final TransactionCommand command, final ReadWriteTransaction transaction) { transactionToCommand.put(transaction, command); pendingTransactions.add(transaction); @@ -125,27 +174,22 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha return commands; } - private List extractCommandsFromQueue() throws InterruptedException { - List result = new ArrayList(); + @VisibleForTesting + List extractCommandsFromQueue() throws InterruptedException { + List result = new ArrayList<>(); TransactionCommand command = inputQueue.take(); - while(command != null) { - result.add(command); - command = inputQueue.poll(); - } + result.add(command); + inputQueue.drainTo(result); return result; } - private void forgetSuccessfulTransactions() { - ReadWriteTransaction transaction = successfulTransactionQueue.poll(); - while(transaction != null) { - pendingTransactions.remove(transaction); - transactionToCommand.remove(transaction); - transaction = successfulTransactionQueue.poll(); - } - } - @Override - public void close() throws Exception { - this.executor.shutdown(); + public void close() throws InterruptedException { + this.chain.close(); + this.executor.shutdown(); + if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) { + runTask.set(false); + this.executor.shutdownNow(); + } } }