2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.southbound.transactions.md;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.ArrayList;
15 import java.util.HashMap;
16 import java.util.List;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
36 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
37 private static final int QUEUE_SIZE = 10000;
38 private BindingTransactionChain chain;
39 private DataBroker db;
40 private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
41 private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
42 = new LinkedBlockingQueue<>(QUEUE_SIZE);
43 private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
44 = new LinkedBlockingQueue<>(QUEUE_SIZE);
45 private ExecutorService executor;
46 private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
48 private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
49 private final AtomicBoolean runTask = new AtomicBoolean( true );
51 public TransactionInvokerImpl(DataBroker db) {
53 this.chain = db.createTransactionChain(this);
54 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
55 executor = Executors.newSingleThreadExecutor(threadFact);
56 executor.submit(this);
60 public void invoke(final TransactionCommand command) {
61 // TODO what do we do if queue is full?
62 inputQueue.offer(command);
66 public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
67 AsyncTransaction<?, ?> transaction, Throwable cause) {
68 failedTransactionQueue.offer(transaction);
72 public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
78 while (runTask.get()) {
79 forgetSuccessfulTransactions();
81 List<TransactionCommand> commands = extractCommands();
82 for (TransactionCommand command: commands) {
83 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
84 recordPendingTransaction(command, transaction);
85 command.execute(transaction);
86 Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
88 public void onSuccess(final Void result) {
89 successfulTransactionQueue.offer(transaction);
93 public void onFailure(final Throwable throwable) {
94 // NOOP - handled by failure of transaction chain
98 } catch (InterruptedException e) {
99 LOG.warn("Exception invoking Transaction: ", e);
104 private List<TransactionCommand> extractResubmitCommands() {
105 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
106 List<TransactionCommand> commands = new ArrayList<>();
107 if (transaction != null) {
108 int index = pendingTransactions.lastIndexOf(transaction);
109 List<ReadWriteTransaction> transactions =
110 pendingTransactions.subList(index, pendingTransactions.size() - 1);
111 for (ReadWriteTransaction tx: transactions) {
112 commands.add(transactionToCommand.get(tx));
114 resetTransactionQueue();
119 private void resetTransactionQueue() {
121 chain = db.createTransactionChain(this);
122 pendingTransactions = new ArrayList<>();
123 transactionToCommand = new HashMap<>();
124 failedTransactionQueue.clear();
125 successfulTransactionQueue.clear();
128 private void recordPendingTransaction(TransactionCommand command,
129 final ReadWriteTransaction transaction) {
130 transactionToCommand.put(transaction, command);
131 pendingTransactions.add(transaction);
134 private List<TransactionCommand> extractCommands() throws InterruptedException {
135 List<TransactionCommand> commands = extractResubmitCommands();
136 commands.addAll(extractCommandsFromQueue());
140 private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
141 List<TransactionCommand> result = new ArrayList<>();
142 TransactionCommand command = inputQueue.take();
143 while (command != null) {
145 command = inputQueue.poll();
150 private void forgetSuccessfulTransactions() {
151 ReadWriteTransaction transaction = successfulTransactionQueue.poll();
152 while (transaction != null) {
153 pendingTransactions.remove(transaction);
154 transactionToCommand.remove(transaction);
155 transaction = successfulTransactionQueue.poll();
160 public void close() throws Exception {
161 this.executor.shutdown();
162 if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
164 this.executor.shutdownNow();