2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.inventory.manager;
10 import java.util.ArrayList;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.LinkedBlockingDeque;
14 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 import com.google.common.base.Preconditions;
27 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
28 private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
29 private static final int QUEUE_DEPTH = 500;
30 private static final int MAX_BATCH = 100;
32 private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
33 private final NotificationProviderService notificationService;
35 private final DataBroker dataBroker;
36 private BindingTransactionChain txChain;
37 private ListenerRegistration<?> listenerRegistration;
38 private Thread thread;
40 FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) {
41 this.dataBroker = Preconditions.checkNotNull(dataBroker);
42 this.notificationService = Preconditions.checkNotNull(notificationService);
46 final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
47 this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
49 this.txChain = (dataBroker.createTransactionChain(this));
50 thread = new Thread(this);
51 thread.setDaemon(true);
52 thread.setName("FlowCapableInventoryProvider");
55 LOG.info("Flow Capable Inventory Provider started.");
58 void enqueue(final InventoryOperation op) {
61 } catch (final InterruptedException e) {
62 LOG.warn("Failed to enqueue operation {}", op, e);
70 InventoryOperation op = queue.take();
72 final ArrayList<InventoryOperation> opsToApply = new ArrayList<>(MAX_BATCH);
76 if (ops < MAX_BATCH) {
82 submitOperations(opsToApply);
84 } catch (final InterruptedException e) {
85 LOG.info("Processing interrupted, terminating", e);
88 // Drain all events, making sure any blocked threads are unblocked
89 while (!queue.isEmpty()) {
95 * Starts new empty transaction, custimizes it with submitted operations
96 * and submit it to data broker.
98 * If transaction chain failed during customization of transaction
99 * it allocates new chain and empty transaction and customizes it
100 * with submitted operations.
102 * This does not retry failed transaction. It only retries it when
103 * chain failed during customization of transaction chain.
107 private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
108 final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
109 LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
112 } catch (final IllegalStateException e) {
114 * Transaction chain failed during doing batch, so we need to null
115 * tx chain and continue processing queue.
117 * We fail current txChain which was allocated with createTransaction.
119 failCurrentChain(txChain);
121 * We will retry transaction once in order to not loose any data.
124 final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
130 * Creates new empty ReadWriteTransaction. If transaction chain
131 * was failed, it will allocate new transaction chain
132 * and assign it with this Operation Executor.
134 * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
136 * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
139 private synchronized ReadWriteTransaction newEmptyTransaction() {
141 if(txChain == null) {
142 // Chain was broken so we need to replace it.
143 txChain = dataBroker.createTransactionChain(this);
145 return txChain.newReadWriteTransaction();
146 } catch (final IllegalStateException e) {
147 LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
149 * Chain was broken by previous transaction,
150 * but there was race between this.
151 * Chain will be closed by #onTransactionChainFailed method.
153 txChain = dataBroker.createTransactionChain(this);
154 return txChain.newReadWriteTransaction();
159 * Creates customized not-submitted transaction, which is ready to be submitted.
161 * @param opsToApply Operations which are used to customize transaction.
162 * @return Non-empty transaction.
164 private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
165 final ReadWriteTransaction tx = newEmptyTransaction();
166 for(final InventoryOperation op : opsToApply) {
167 op.applyOperation(tx);
172 private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
173 if(txChain == chain) {
179 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
180 final Throwable cause) {
181 LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
183 if(txChain == chain) {
184 // Current chain is broken, so we will null it, in order to not use it.
185 failCurrentChain(chain);
190 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
195 public void close() throws InterruptedException {
196 LOG.info("Flow Capable Inventory Provider stopped.");
197 if (this.listenerRegistration != null) {
199 this.listenerRegistration.close();
200 } catch (final Exception e) {
201 LOG.error("Failed to stop inventory provider", e);
203 listenerRegistration = null;
206 if (thread != null) {
211 if (txChain != null) {
214 } catch (final IllegalStateException e) {
215 // It is possible chain failed and was closed by #onTransactionChainFailed
216 LOG.debug("Chain was already closed.");