2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.southbound.transactions.md;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.ArrayList;
15 import java.util.HashMap;
16 import java.util.List;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
35 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
36 private static final int QUEUE_SIZE = 10000;
38 private final DataBroker db;
39 private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
40 private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
41 private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
42 new LinkedBlockingQueue<>(QUEUE_SIZE);
43 private final ExecutorService executor;
45 private final AtomicBoolean runTask = new AtomicBoolean(true);
47 private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
48 private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
49 private BindingTransactionChain chain;
51 public TransactionInvokerImpl(final DataBroker db) {
53 this.chain = db.createTransactionChain(this);
54 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
55 executor = Executors.newSingleThreadExecutor(threadFact);
56 executor.execute(this);
60 public void invoke(final TransactionCommand command) {
61 // TODO what do we do if queue is full?
62 if (!inputQueue.offer(command)) {
63 LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
68 public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
69 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
70 LOG.error("Failed to write operational topology", cause);
71 offerFailedTransaction(transaction);
75 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
81 while (runTask.get()) {
82 forgetSuccessfulTransactions();
84 List<TransactionCommand> commands = null;
86 commands = extractCommands();
87 } catch (InterruptedException e) {
88 LOG.warn("Extracting commands was interrupted.", e);
92 ReadWriteTransaction transactionInFlight = null;
94 for (TransactionCommand command: commands) {
95 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
96 transactionInFlight = transaction;
97 recordPendingTransaction(command, transaction);
98 command.execute(transaction);
99 Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
101 public void onSuccess(final Void result) {
102 if (!successfulTransactionQueue.offer(transaction)) {
103 LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
104 successfulTransactionQueue.size(), transaction);
110 public void onFailure(final Throwable throwable) {
111 command.onFailure(throwable);
112 // NOOP - handled by failure of transaction chain
114 }, MoreExecutors.directExecutor());
116 } catch (IllegalStateException e) {
117 if (transactionInFlight != null) {
118 // TODO: This method should distinguish exceptions on which the command should be
119 // retried from exceptions on which the command should NOT be retried.
120 // Then it should retry only the commands which should be retried, otherwise
121 // this method will retry commands which will never be successful forever.
122 offerFailedTransaction(transactionInFlight);
124 LOG.warn("Failed to process an update notification from OVS.", e);
129 private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
130 if (!failedTransactionQueue.offer(transaction)) {
131 LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
135 private List<TransactionCommand> extractResubmitCommands() {
136 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
137 List<TransactionCommand> commands = new ArrayList<>();
138 if (transaction != null) {
139 int index = pendingTransactions.lastIndexOf(transaction);
140 List<ReadWriteTransaction> transactions =
141 pendingTransactions.subList(index, pendingTransactions.size() - 1);
142 for (ReadWriteTransaction tx: transactions) {
143 commands.add(transactionToCommand.get(tx));
145 resetTransactionQueue();
150 private void resetTransactionQueue() {
152 chain = db.createTransactionChain(this);
153 pendingTransactions = new ArrayList<>();
154 transactionToCommand = new HashMap<>();
155 failedTransactionQueue.clear();
156 successfulTransactionQueue.clear();
159 private void recordPendingTransaction(final TransactionCommand command,
160 final ReadWriteTransaction transaction) {
161 transactionToCommand.put(transaction, command);
162 pendingTransactions.add(transaction);
165 private List<TransactionCommand> extractCommands() throws InterruptedException {
166 List<TransactionCommand> commands = extractResubmitCommands();
167 commands.addAll(extractCommandsFromQueue());
171 private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
172 List<TransactionCommand> result = new ArrayList<>();
173 TransactionCommand command = inputQueue.take();
175 inputQueue.drainTo(result);
179 private void forgetSuccessfulTransactions() {
180 ReadWriteTransaction transaction = successfulTransactionQueue.poll();
181 while (transaction != null) {
182 pendingTransactions.remove(transaction);
183 transactionToCommand.remove(transaction);
184 transaction = successfulTransactionQueue.poll();
189 public void close() throws InterruptedException {
191 this.executor.shutdown();
192 if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
194 this.executor.shutdownNow();