2 * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.lang.Thread.UncaughtExceptionHandler;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.List;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.checkerframework.checker.lock.qual.GuardedBy;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
31 import org.opendaylight.mdsal.binding.api.Transaction;
32 import org.opendaylight.mdsal.binding.api.TransactionChain;
33 import org.osgi.service.component.annotations.Activate;
34 import org.osgi.service.component.annotations.Component;
35 import org.osgi.service.component.annotations.Deactivate;
36 import org.osgi.service.component.annotations.Reference;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Copied over as-is from southbound plugin. Good candidate to be common
42 * when refactoring code.
45 @Component(service = TransactionInvoker.class)
46 public final class TransactionInvokerImpl implements TransactionInvoker, Runnable, AutoCloseable,
47 UncaughtExceptionHandler {
48 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
49 private static final int QUEUE_SIZE = 10000;
51 private final DataBroker db;
52 private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
53 private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
54 private final ExecutorService executor;
57 private final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
59 private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
61 private TransactionChain chain;
62 //This is made volatile as it is accessed from uncaught exception handler thread also
63 private volatile ReadWriteTransaction transactionInFlight = null;
64 private Iterator<TransactionCommand> commandIterator = null;
68 public TransactionInvokerImpl(@Reference final DataBroker db) {
70 chain = db.createTransactionChain();
71 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
72 .setUncaughtExceptionHandler(this).build();
73 executor = Executors.newSingleThreadExecutor(threadFact);
74 //Using the execute method here so that un caught exception handler gets triggered upon exception.
75 //The other way to do it is using submit method and wait on the future to catch any exceptions
76 executor.execute(this);
80 public void invoke(final TransactionCommand command) {
81 // TODO what do we do if queue is full?
82 if (!inputQueue.offer(command)) {
83 LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
90 final List<TransactionCommand> commands;
92 commands = extractCommands();
93 } catch (InterruptedException e) {
94 LOG.warn("Extracting commands was interrupted.", e);
97 commandIterator = commands.iterator();
99 while (commandIterator.hasNext()) {
100 executeCommand(commandIterator.next());
102 transactionInFlight = null;
103 } catch (IllegalStateException e) {
104 if (transactionInFlight != null) {
105 // TODO: This method should distinguish exceptions on which the command should be
106 // retried from exceptions on which the command should NOT be retried.
107 // Then it should retry only the commands which should be retried, otherwise
108 // this method will retry commands which will never be successful forever.
109 offerFailedTransaction(transactionInFlight);
111 transactionInFlight = null;
112 LOG.warn("Failed to process an update notification from OVS.", e);
117 private synchronized void executeCommand(final TransactionCommand command) {
118 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
119 transactionInFlight = transaction;
120 recordPendingTransaction(command, transaction);
121 command.execute(transaction);
122 FluentFuture<?> ft = transaction.commit();
123 command.setTransactionResultFuture(ft);
124 ft.addCallback(new FutureCallback<Object>() {
126 public void onSuccess(final Object result) {
127 forgetSuccessfulTransaction(transaction);
132 public void onFailure(final Throwable throwable) {
133 offerFailedTransaction(transaction);
136 }, MoreExecutors.directExecutor());
139 private void offerFailedTransaction(final Transaction transaction) {
140 if (!failedTransactionQueue.offer(transaction)) {
141 LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
145 private List<TransactionCommand> extractResubmitCommands() {
146 List<TransactionCommand> commands = new ArrayList<>();
147 synchronized (this) {
148 Transaction transaction = failedTransactionQueue.poll();
149 if (transaction != null) {
150 int index = pendingTransactions.lastIndexOf(transaction);
151 //This logic needs to be revisited. Is it ok to resubmit these things again ?
152 //are these operations idempotent ?
153 //Does the transaction chain execute n+1th if nth one threw error ?
154 List<ReadWriteTransaction> transactions =
155 pendingTransactions.subList(index, pendingTransactions.size() - 1);
156 for (ReadWriteTransaction tx: transactions) {
157 commands.add(transactionToCommand.get(tx));
159 resetTransactionQueue();
162 if (commandIterator != null) {
163 while (commandIterator.hasNext()) {
164 commands.add(commandIterator.next());
170 private void resetTransactionQueue() {
172 chain = db.createTransactionChain();
173 pendingTransactions.clear();
174 transactionToCommand.clear();
175 failedTransactionQueue.clear();
178 synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
179 pendingTransactions.remove(transaction);
180 transactionToCommand.remove(transaction);
183 private synchronized void recordPendingTransaction(final TransactionCommand command,
184 final ReadWriteTransaction transaction) {
185 transactionToCommand.put(transaction, command);
186 pendingTransactions.add(transaction);
189 private List<TransactionCommand> extractCommands() throws InterruptedException {
190 List<TransactionCommand> commands = extractResubmitCommands();
191 if (!commands.isEmpty() && inputQueue.isEmpty()) {
192 //we got some commands to be executed let us not sit and wait on empty queue
195 //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
196 commands.addAll(extractCommandsFromQueue());
200 private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
201 List<TransactionCommand> result = new ArrayList<>();
202 TransactionCommand command = inputQueue.take();
204 inputQueue.drainTo(result);
211 public void close() {
217 public void uncaughtException(final Thread thread, final Throwable ex) {
218 LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
219 if (transactionInFlight != null) {
220 offerFailedTransaction(transactionInFlight);
222 transactionInFlight = null;
223 executor.execute(this);