2 * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.Iterator;
19 import java.util.List;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadFactory;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Copied over as-is from southbound plugin. Good candidate to be common
37 * when refactoring code.
39 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable,
40 Thread.UncaughtExceptionHandler {
41 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
42 private static final int QUEUE_SIZE = 10000;
43 private BindingTransactionChain chain;
44 private final DataBroker db;
45 private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46 private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
47 = new LinkedBlockingQueue<>(QUEUE_SIZE);
48 private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
49 = new LinkedBlockingQueue<>(QUEUE_SIZE);
50 private final ExecutorService executor;
51 private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
53 private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
54 //This is made volatile as it is accessed from uncaught exception handler thread also
55 private volatile ReadWriteTransaction transactionInFlight = null;
56 private Iterator<TransactionCommand> commandIterator = null;
58 public TransactionInvokerImpl(DataBroker db) {
60 this.chain = db.createTransactionChain(this);
61 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
62 .setUncaughtExceptionHandler(this).build();
63 executor = Executors.newSingleThreadExecutor(threadFact);
64 //Using the execute method here so that un caught exception handler gets triggered upon exception.
65 //The other way to do it is using submit method and wait on the future to catch any exceptions
66 executor.execute(this);
70 public void invoke(final TransactionCommand command) {
71 // TODO what do we do if queue is full?
72 if (!inputQueue.offer(command)) {
73 LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
78 public void onTransactionChainFailed(TransactionChain<?, ?> txChain,
79 AsyncTransaction<?, ?> transaction, Throwable cause) {
80 offerFailedTransaction(transaction);
84 public void onTransactionChainSuccessful(TransactionChain<?, ?> txChain) {
91 forgetSuccessfulTransactions();
93 List<TransactionCommand> commands = null;
95 commands = extractCommands();
96 } catch (InterruptedException e) {
97 LOG.warn("Extracting commands was interrupted.", e);
100 commandIterator = commands.iterator();
102 while (commandIterator.hasNext()) {
103 TransactionCommand command = commandIterator.next();
104 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
105 transactionInFlight = transaction;
106 recordPendingTransaction(command, transaction);
107 command.execute(transaction);
108 ListenableFuture<Void> ft = transaction.submit();
109 command.setTransactionResultFuture(ft);
110 Futures.addCallback(ft, new FutureCallback<Void>() {
112 public void onSuccess(final Void result) {
113 if (!successfulTransactionQueue.offer(transaction)) {
114 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
115 successfulTransactionQueue.size(), transaction);
120 public void onFailure(final Throwable throwable) {
121 // NOOP - handled by failure of transaction chain
123 }, MoreExecutors.directExecutor());
125 transactionInFlight = null;
126 } catch (IllegalStateException e) {
127 if (transactionInFlight != null) {
128 // TODO: This method should distinguish exceptions on which the command should be
129 // retried from exceptions on which the command should NOT be retried.
130 // Then it should retry only the commands which should be retried, otherwise
131 // this method will retry commands which will never be successful forever.
132 offerFailedTransaction(transactionInFlight);
134 transactionInFlight = null;
135 LOG.warn("Failed to process an update notification from OVS.", e);
140 private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
141 if (!failedTransactionQueue.offer(transaction)) {
142 LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
146 private List<TransactionCommand> extractResubmitCommands() {
147 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
148 List<TransactionCommand> commands = new ArrayList<>();
149 if (transaction != null) {
150 int index = pendingTransactions.lastIndexOf(transaction);
151 //This logic needs to be revisited. Is it ok to resubmit these things again ?
152 //are these operations idempotent ?
153 //Does the transaction chain execute n+1th if nth one threw error ?
154 List<ReadWriteTransaction> transactions =
155 pendingTransactions.subList(index, pendingTransactions.size() - 1);
156 for (ReadWriteTransaction tx: transactions) {
157 commands.add(transactionToCommand.get(tx));
159 resetTransactionQueue();
161 if (commandIterator != null) {
162 while (commandIterator.hasNext()) {
163 commands.add(commandIterator.next());
169 private void resetTransactionQueue() {
171 chain = db.createTransactionChain(this);
172 pendingTransactions = new ArrayList<>();
173 transactionToCommand = new HashMap<>();
174 failedTransactionQueue.clear();
175 successfulTransactionQueue.clear();
178 private void recordPendingTransaction(TransactionCommand command,
179 final ReadWriteTransaction transaction) {
180 transactionToCommand.put(transaction, command);
181 pendingTransactions.add(transaction);
184 private List<TransactionCommand> extractCommands() throws InterruptedException {
185 List<TransactionCommand> commands = extractResubmitCommands();
186 if (!commands.isEmpty() && inputQueue.isEmpty()) {
187 //we got some commands to be executed let us not sit and wait on empty queue
190 //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
191 commands.addAll(extractCommandsFromQueue());
195 private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
196 List<TransactionCommand> result = new ArrayList<>();
197 TransactionCommand command = inputQueue.take();
198 while (command != null) {
200 command = inputQueue.poll();
205 private void forgetSuccessfulTransactions() {
206 ReadWriteTransaction transaction = successfulTransactionQueue.poll();
207 while (transaction != null) {
208 pendingTransactions.remove(transaction);
209 transactionToCommand.remove(transaction);
210 transaction = successfulTransactionQueue.poll();
215 public void close() throws Exception {
217 this.executor.shutdown();
221 public void uncaughtException(Thread thread, Throwable ex) {
222 LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
223 if (transactionInFlight != null) {
224 offerFailedTransaction(transactionInFlight);
226 transactionInFlight = null;
227 executor.execute(this);