2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.southbound.transactions.md;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.List;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
38 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
39 private static final int QUEUE_SIZE = 10000;
41 private final DataBroker db;
42 private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
43 private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
44 private final ExecutorService executor;
46 private final AtomicBoolean runTask = new AtomicBoolean(true);
49 private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
51 private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
53 private BindingTransactionChain chain;
55 public TransactionInvokerImpl(final DataBroker db) {
57 this.chain = db.createTransactionChain(this);
58 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
59 executor = Executors.newSingleThreadExecutor(threadFact);
60 executor.execute(this);
64 TransactionInvokerImpl(final DataBroker db, final ExecutorService executor) {
66 this.chain = db.createTransactionChain(this);
67 this.executor = executor;
71 TransactionInvokerImpl(final DataBroker db, final List<ReadWriteTransaction> pendingTransactions,
72 final List<ReadWriteTransaction> failedTransactions,
73 final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand) {
74 this(db, (ExecutorService) null);
77 this.pendingTransactions.addAll(pendingTransactions);
78 this.failedTransactionQueue.addAll(failedTransactions);
79 this.transactionToCommand.putAll(transactionToCommand);
83 TransactionInvokerImpl(final DataBroker db, final List<ReadWriteTransaction> pendingTransactions) {
84 this(db, pendingTransactions, Collections.emptyList(), Collections.emptyMap());
88 public void invoke(final TransactionCommand command) {
89 // TODO what do we do if queue is full?
90 if (!inputQueue.offer(command)) {
91 LOG.error("inputQueue is full (size: {}) - could not offer {}", inputQueue.size(), command);
96 public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
97 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
98 LOG.error("Failed to write operational topology", cause);
99 offerFailedTransaction(transaction);
103 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
109 while (runTask.get()) {
110 final List<TransactionCommand> commands;
112 commands = extractCommands();
113 } catch (InterruptedException e) {
114 LOG.warn("Extracting commands was interrupted.", e);
118 ReadWriteTransaction transactionInFlight = null;
120 for (TransactionCommand command: commands) {
121 synchronized (this) {
122 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
123 transactionInFlight = transaction;
124 recordPendingTransaction(command, transaction);
125 command.execute(transaction);
126 Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
128 public void onSuccess(final Void result) {
129 forgetSuccessfulTransaction(transaction);
134 public void onFailure(final Throwable throwable) {
135 command.onFailure(throwable);
136 // NOOP - handled by failure of transaction chain
138 }, MoreExecutors.directExecutor());
141 } catch (IllegalStateException e) {
142 if (transactionInFlight != null) {
143 // TODO: This method should distinguish exceptions on which the command should be
144 // retried from exceptions on which the command should NOT be retried.
145 // Then it should retry only the commands which should be retried, otherwise
146 // this method will retry commands which will never be successful forever.
147 offerFailedTransaction(transactionInFlight);
149 LOG.warn("Failed to process an update notification from OVS.", e);
154 private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
155 if (!failedTransactionQueue.offer(transaction)) {
156 LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
161 synchronized List<TransactionCommand> extractResubmitCommands() {
162 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
163 List<TransactionCommand> commands = new ArrayList<>();
164 if (transaction != null) {
165 int index = pendingTransactions.lastIndexOf(transaction);
166 List<ReadWriteTransaction> transactions =
167 pendingTransactions.subList(index, pendingTransactions.size() - 1);
168 for (ReadWriteTransaction tx: transactions) {
169 commands.add(transactionToCommand.get(tx));
171 resetTransactionQueue();
177 synchronized void resetTransactionQueue() {
179 chain = db.createTransactionChain(this);
180 pendingTransactions.clear();
181 transactionToCommand.clear();
182 failedTransactionQueue.clear();
185 synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
186 pendingTransactions.remove(transaction);
187 transactionToCommand.remove(transaction);
191 synchronized void recordPendingTransaction(final TransactionCommand command,
192 final ReadWriteTransaction transaction) {
193 transactionToCommand.put(transaction, command);
194 pendingTransactions.add(transaction);
198 List<TransactionCommand> extractCommands() throws InterruptedException {
199 List<TransactionCommand> commands = extractResubmitCommands();
200 commands.addAll(extractCommandsFromQueue());
205 List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
206 List<TransactionCommand> result = new ArrayList<>();
207 TransactionCommand command = inputQueue.take();
209 inputQueue.drainTo(result);
214 public void close() throws InterruptedException {
216 this.executor.shutdown();
217 if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
219 this.executor.shutdownNow();