2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.ThreadFactory;
21 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import com.google.common.util.concurrent.FutureCallback;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ThreadFactoryBuilder;
35 * Copied over as-is from southbound plugin. Good candidate to be common
36 * when refactoring code.
38 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
39 private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
40 private static final int QUEUE_SIZE = 10000;
41 private BindingTransactionChain chain;
42 private DataBroker db;
43 private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
44 private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
45 = new LinkedBlockingQueue<>(QUEUE_SIZE);
46 private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
47 = new LinkedBlockingQueue<>(QUEUE_SIZE);
48 private ExecutorService executor;
49 private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
51 private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
53 public TransactionInvokerImpl(DataBroker db) {
55 this.chain = db.createTransactionChain(this);
56 ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
57 executor = Executors.newSingleThreadExecutor(threadFact);
58 executor.submit(this);
62 public void invoke(final TransactionCommand command) {
63 // TODO what do we do if queue is full?
64 inputQueue.offer(command);
68 public void onTransactionChainFailed(TransactionChain<?, ?> chain,
69 AsyncTransaction<?, ?> transaction, Throwable cause) {
70 failedTransactionQueue.offer(transaction);
74 public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
82 forgetSuccessfulTransactions();
84 List<TransactionCommand> commands = null;
86 commands = extractCommands();
87 } catch (InterruptedException e) {
88 LOG.warn("Extracting commands was interrupted.", e);
92 ReadWriteTransaction transactionInFlight = null;
94 for (TransactionCommand command: commands) {
95 final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
96 transactionInFlight = transaction;
97 recordPendingTransaction(command, transaction);
98 command.execute(transaction);
99 Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
101 public void onSuccess(final Void result) {
102 successfulTransactionQueue.offer(transaction);
106 public void onFailure(final Throwable throwable) {
107 // NOOP - handled by failure of transaction chain
111 } catch (IllegalStateException e) {
112 if (transactionInFlight != null) {
113 // TODO: This method should distinguish exceptions on which the command should be
114 // retried from exceptions on which the command should NOT be retried.
115 // Then it should retry only the commands which should be retried, otherwise
116 // this method will retry commands which will never be successful forever.
117 failedTransactionQueue.offer(transactionInFlight);
119 LOG.warn("Failed to process an update notification from OVS.", e);
124 private List<TransactionCommand> extractResubmitCommands() {
125 AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
126 List<TransactionCommand> commands = new ArrayList<>();
127 if (transaction != null) {
128 int index = pendingTransactions.lastIndexOf(transaction);
129 List<ReadWriteTransaction> transactions =
130 pendingTransactions.subList(index, pendingTransactions.size() - 1);
131 for (ReadWriteTransaction tx: transactions) {
132 commands.add(transactionToCommand.get(tx));
134 resetTransactionQueue();
139 private void resetTransactionQueue() {
141 chain = db.createTransactionChain(this);
142 pendingTransactions = new ArrayList<>();
143 transactionToCommand = new HashMap<>();
144 failedTransactionQueue.clear();
145 successfulTransactionQueue.clear();
148 private void recordPendingTransaction(TransactionCommand command,
149 final ReadWriteTransaction transaction) {
150 transactionToCommand.put(transaction, command);
151 pendingTransactions.add(transaction);
154 private List<TransactionCommand> extractCommands() throws InterruptedException {
155 List<TransactionCommand> commands = extractResubmitCommands();
156 commands.addAll(extractCommandsFromQueue());
160 private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
161 List<TransactionCommand> result = new ArrayList<>();
162 TransactionCommand command = inputQueue.take();
163 while (command != null) {
165 command = inputQueue.poll();
170 private void forgetSuccessfulTransactions() {
171 ReadWriteTransaction transaction = successfulTransactionQueue.poll();
172 while (transaction != null) {
173 pendingTransactions.remove(transaction);
174 transactionToCommand.remove(transaction);
175 transaction = successfulTransactionQueue.poll();
180 public void close() throws Exception {
181 this.executor.shutdown();