Bug 5555: Exception during operations on ReadWriteTransaction
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
10
11 import java.util.ArrayList;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.ThreadFactory;
20
21 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import com.google.common.util.concurrent.FutureCallback;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ThreadFactoryBuilder;
33
34 /*  TODO:
35  * Copied over as-is from southbound plugin. Good candidate to be common
36  * when refactoring code. 
37  */
38 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
39     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
40     private static final int QUEUE_SIZE = 10000;
41     private BindingTransactionChain chain;
42     private DataBroker db;
43     private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
44     private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
45         = new LinkedBlockingQueue<>(QUEUE_SIZE);
46     private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
47         = new LinkedBlockingQueue<>(QUEUE_SIZE);
48     private ExecutorService executor;
49     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
50         = new HashMap<>();
51     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
52
53     public TransactionInvokerImpl(DataBroker db) {
54         this.db = db;
55         this.chain = db.createTransactionChain(this);
56         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
57         executor = Executors.newSingleThreadExecutor(threadFact);
58         executor.submit(this);
59     }
60
61     @Override
62     public void invoke(final TransactionCommand command) {
63         // TODO what do we do if queue is full?
64         inputQueue.offer(command);
65     }
66
67     @Override
68     public void onTransactionChainFailed(TransactionChain<?, ?> chain,
69             AsyncTransaction<?, ?> transaction, Throwable cause) {
70         failedTransactionQueue.offer(transaction);
71     }
72
73     @Override
74     public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
75         // NO OP
76
77     }
78
79     @Override
80     public void run() {
81         while (true) {
82             forgetSuccessfulTransactions();
83
84             List<TransactionCommand> commands = null;
85             try {
86                 commands = extractCommands();
87             } catch (InterruptedException e) {
88                 LOG.warn("Extracting commands was interrupted.", e);
89                 continue;
90             }
91
92             ReadWriteTransaction transactionInFlight = null;
93             try {
94                 for (TransactionCommand command: commands) {
95                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
96                     transactionInFlight = transaction;
97                     recordPendingTransaction(command, transaction);
98                     command.execute(transaction);
99                     Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
100                         @Override
101                         public void onSuccess(final Void result) {
102                             successfulTransactionQueue.offer(transaction);
103                         }
104
105                         @Override
106                         public void onFailure(final Throwable throwable) {
107                             // NOOP - handled by failure of transaction chain
108                         }
109                     });
110                 }
111             } catch (IllegalStateException e) {
112                 if (transactionInFlight != null) {
113                     // TODO: This method should distinguish exceptions on which the command should be
114                     // retried from exceptions on which the command should NOT be retried.
115                     // Then it should retry only the commands which should be retried, otherwise
116                     // this method will retry commands which will never be successful forever.
117                     failedTransactionQueue.offer(transactionInFlight);
118                 }
119                 LOG.warn("Failed to process an update notification from OVS.", e);
120             }
121         }
122     }
123
124     private List<TransactionCommand> extractResubmitCommands() {
125         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
126         List<TransactionCommand> commands = new ArrayList<>();
127         if (transaction != null) {
128             int index = pendingTransactions.lastIndexOf(transaction);
129             List<ReadWriteTransaction> transactions =
130                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
131             for (ReadWriteTransaction tx: transactions) {
132                 commands.add(transactionToCommand.get(tx));
133             }
134             resetTransactionQueue();
135         }
136         return commands;
137     }
138
139     private void resetTransactionQueue() {
140         chain.close();
141         chain = db.createTransactionChain(this);
142         pendingTransactions = new ArrayList<>();
143         transactionToCommand = new HashMap<>();
144         failedTransactionQueue.clear();
145         successfulTransactionQueue.clear();
146     }
147
148     private void recordPendingTransaction(TransactionCommand command,
149             final ReadWriteTransaction transaction) {
150         transactionToCommand.put(transaction, command);
151         pendingTransactions.add(transaction);
152     }
153
154     private List<TransactionCommand> extractCommands() throws InterruptedException {
155         List<TransactionCommand> commands = extractResubmitCommands();
156         commands.addAll(extractCommandsFromQueue());
157         return commands;
158     }
159
160     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
161         List<TransactionCommand> result = new ArrayList<>();
162         TransactionCommand command = inputQueue.take();
163         while (command != null) {
164             result.add(command);
165             command = inputQueue.poll();
166         }
167         return result;
168     }
169
170     private void forgetSuccessfulTransactions() {
171         ReadWriteTransaction transaction = successfulTransactionQueue.poll();
172         while (transaction != null) {
173             pendingTransactions.remove(transaction);
174             transactionToCommand.remove(transaction);
175             transaction = successfulTransactionQueue.poll();
176         }
177     }
178
179     @Override
180     public void close() throws Exception {
181         this.executor.shutdown();
182     }
183 }