2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.southbound.transactions.md;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.AbstractMap.SimpleImmutableEntry;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.Queue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
41 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
42 private static final int QUEUE_SIZE = 10000;
44 private final DataBroker db;
45 private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
46 private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
47 private final ExecutorService executor;
49 private final AtomicBoolean runTask = new AtomicBoolean(true);
52 private final Queue<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions = new ArrayDeque<>();
54 private BindingTransactionChain chain;
56 public TransactionInvokerImpl(final DataBroker db) {
58 this.chain = db.createTransactionChain(this);
59 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
60 executor = Executors.newSingleThreadExecutor(threadFact);
61 executor.execute(this);
65 TransactionInvokerImpl(final DataBroker db, final ExecutorService executor) {
67 this.chain = db.createTransactionChain(this);
68 this.executor = executor;
72 TransactionInvokerImpl(final DataBroker db,
73 final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions,
74 final List<ReadWriteTransaction> failedTransactions) {
75 this(db, (ExecutorService) null);
78 this.pendingTransactions.addAll(pendingTransactions);
79 this.failedTransactionQueue.addAll(failedTransactions);
83 TransactionInvokerImpl(final DataBroker db,
84 final List<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions) {
85 this(db, pendingTransactions, Collections.emptyList());
89 public void invoke(final TransactionCommand command) {
90 // TODO what do we do if queue is full?
91 if (!inputQueue.offer(command)) {
92 LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
97 public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
98 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
99 LOG.error("Failed to write operational topology", cause);
100 offerFailedTransaction(transaction);
104 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
110 while (runTask.get()) {
111 final List<TransactionCommand> commands;
113 commands = extractCommands();
114 } catch (InterruptedException e) {
115 LOG.warn("Extracting commands was interrupted.", e);
119 commands.forEach(this::executeCommand);
123 private synchronized void executeCommand(final TransactionCommand command) {
124 ReadWriteTransaction transactionInFlight = null;
126 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
127 transactionInFlight = transaction;
128 recordPendingTransaction(command, transaction);
129 command.execute(transaction);
130 Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
132 public void onSuccess(final Void result) {
133 forgetSuccessfulTransaction(transaction);
138 public void onFailure(final Throwable throwable) {
139 command.onFailure(throwable);
140 // NOOP - handled by failure of transaction chain
142 }, MoreExecutors.directExecutor());
143 } catch (IllegalStateException e) {
144 if (transactionInFlight != null) {
145 // TODO: This method should distinguish exceptions on which the command should be
146 // retried from exceptions on which the command should NOT be retried.
147 // Then it should retry only the commands which should be retried, otherwise
148 // this method will retry commands which will never be successful forever.
149 offerFailedTransaction(transactionInFlight);
151 LOG.warn("Failed to process an update notification from OVS.", e);
155 private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
156 if (!failedTransactionQueue.offer(transaction)) {
157 LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
162 synchronized List<TransactionCommand> extractResubmitCommands() {
163 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
164 List<TransactionCommand> commands = new ArrayList<>();
165 if (transaction != null) {
166 // Process all pending transactions, looking for the failed one...
167 final Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
168 while (it.hasNext()) {
169 final Entry<ReadWriteTransaction, TransactionCommand> current = it.next();
170 if (transaction.equals(current.getKey())) {
171 // .. collect current and all remaining pending transactions' values
172 commands.add(current.getValue());
173 it.forEachRemaining(entry -> commands.add(entry.getValue()));
178 resetTransactionQueue();
184 synchronized void resetTransactionQueue() {
186 chain = db.createTransactionChain(this);
187 pendingTransactions.clear();
188 failedTransactionQueue.clear();
191 synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
192 Iterator<Entry<ReadWriteTransaction, TransactionCommand>> it = pendingTransactions.iterator();
193 while (it.hasNext()) {
194 final Entry<ReadWriteTransaction, TransactionCommand> entry = it.next();
195 if (transaction.equals(entry.getKey())) {
203 synchronized void recordPendingTransaction(final TransactionCommand command,
204 final ReadWriteTransaction transaction) {
205 pendingTransactions.add(new SimpleImmutableEntry<>(transaction, command));
209 List<TransactionCommand> extractCommands() throws InterruptedException {
210 List<TransactionCommand> commands = extractResubmitCommands();
211 commands.addAll(extractCommandsFromQueue());
216 List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
217 List<TransactionCommand> result = new ArrayList<>();
218 TransactionCommand command = inputQueue.take();
220 inputQueue.drainTo(result);
225 public void close() throws InterruptedException {
227 this.executor.shutdown();
228 if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
230 this.executor.shutdownNow();