2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.southbound.transactions.md;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.List;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
38 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
39 private static final int QUEUE_SIZE = 10000;
40 private BindingTransactionChain chain;
41 private final DataBroker db;
42 private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
43 private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
44 = new LinkedBlockingQueue<>(QUEUE_SIZE);
45 private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
46 = new LinkedBlockingQueue<>(QUEUE_SIZE);
47 private final ExecutorService executor;
48 private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
50 private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
51 private final AtomicBoolean runTask = new AtomicBoolean(true);
53 public TransactionInvokerImpl(DataBroker db) {
55 this.chain = db.createTransactionChain(this);
56 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
57 executor = Executors.newSingleThreadExecutor(threadFact);
58 executor.execute(this);
62 public void invoke(final TransactionCommand command) {
63 // TODO what do we do if queue is full?
64 if (!inputQueue.offer(command)) {
65 LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
70 public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
71 AsyncTransaction<?, ?> transaction, Throwable cause) {
72 LOG.error("Failed to write operational topology", cause);
73 offerFailedTransaction(transaction);
77 public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
83 while (runTask.get()) {
84 forgetSuccessfulTransactions();
86 List<TransactionCommand> commands = null;
88 commands = extractCommands();
89 } catch (InterruptedException e) {
90 LOG.warn("Extracting commands was interrupted.", e);
94 ReadWriteTransaction transactionInFlight = null;
96 for (TransactionCommand command: commands) {
97 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
98 transactionInFlight = transaction;
99 recordPendingTransaction(command, transaction);
100 command.execute(transaction);
101 Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
103 public void onSuccess(final Void result) {
104 if (!successfulTransactionQueue.offer(transaction)) {
105 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
106 successfulTransactionQueue.size(), transaction);
112 public void onFailure(final Throwable throwable) {
113 command.onFailure(throwable);
114 // NOOP - handled by failure of transaction chain
116 }, MoreExecutors.directExecutor());
118 } catch (IllegalStateException e) {
119 if (transactionInFlight != null) {
120 // TODO: This method should distinguish exceptions on which the command should be
121 // retried from exceptions on which the command should NOT be retried.
122 // Then it should retry only the commands which should be retried, otherwise
123 // this method will retry commands which will never be successful forever.
124 offerFailedTransaction(transactionInFlight);
126 LOG.warn("Failed to process an update notification from OVS.", e);
131 private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
132 if (!failedTransactionQueue.offer(transaction)) {
133 LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
138 List<TransactionCommand> extractResubmitCommands() {
139 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
140 List<TransactionCommand> commands = new ArrayList<>();
141 if (transaction != null) {
142 int index = pendingTransactions.lastIndexOf(transaction);
143 List<ReadWriteTransaction> transactions =
144 pendingTransactions.subList(index, pendingTransactions.size() - 1);
145 for (ReadWriteTransaction tx: transactions) {
146 commands.add(transactionToCommand.get(tx));
148 resetTransactionQueue();
154 void resetTransactionQueue() {
156 chain = db.createTransactionChain(this);
157 pendingTransactions = new ArrayList<>();
158 transactionToCommand = new HashMap<>();
159 failedTransactionQueue.clear();
160 successfulTransactionQueue.clear();
163 private void recordPendingTransaction(TransactionCommand command,
164 final ReadWriteTransaction transaction) {
165 transactionToCommand.put(transaction, command);
166 pendingTransactions.add(transaction);
169 private List<TransactionCommand> extractCommands() throws InterruptedException {
170 List<TransactionCommand> commands = extractResubmitCommands();
171 commands.addAll(extractCommandsFromQueue());
176 List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
177 List<TransactionCommand> result = new ArrayList<>();
178 TransactionCommand command = inputQueue.take();
179 while (command != null) {
181 command = inputQueue.poll();
186 private void forgetSuccessfulTransactions() {
187 ReadWriteTransaction transaction = successfulTransactionQueue.poll();
188 while (transaction != null) {
189 pendingTransactions.remove(transaction);
190 transactionToCommand.remove(transaction);
191 transaction = successfulTransactionQueue.poll();
196 public void close() throws InterruptedException {
198 this.executor.shutdown();
199 if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
201 this.executor.shutdownNow();