2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.inventory.manager;
10 import java.util.ArrayList;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.LinkedBlockingDeque;
14 import com.google.common.base.Optional;
15 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
19 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
20 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import com.google.common.base.Preconditions;
32 class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
33 private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
34 private static final int QUEUE_DEPTH = 500;
35 private static final int MAX_BATCH = 100;
37 private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
38 private final NotificationProviderService notificationService;
39 private final EntityOwnershipService eos;
41 private final DataBroker dataBroker;
42 private BindingTransactionChain txChain;
43 private ListenerRegistration<?> listenerRegistration;
44 private ListenerRegistration<?> tableFeatureListenerRegistration;
45 private Thread thread;
47 FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService, EntityOwnershipService eos) {
48 this.dataBroker = Preconditions.checkNotNull(dataBroker);
49 this.notificationService = Preconditions.checkNotNull(notificationService);
54 final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
55 this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
57 final NodeTablesFeatureCommitter nodeTablesFeatureCommitter =
58 new NodeTablesFeatureCommitter(FlowCapableInventoryProvider.this);
59 this.tableFeatureListenerRegistration = this.notificationService.registerNotificationListener(nodeTablesFeatureCommitter);
62 this.txChain = (dataBroker.createTransactionChain(this));
63 thread = new Thread(this);
64 thread.setDaemon(true);
65 thread.setName("FlowCapableInventoryProvider");
68 LOG.info("Flow Capable Inventory Provider started.");
71 void enqueue(final InventoryOperation op) {
74 } catch (final InterruptedException e) {
75 LOG.warn("Failed to enqueue operation {}", op, e);
83 InventoryOperation op = queue.take();
85 final ArrayList<InventoryOperation> opsToApply = new ArrayList<>(MAX_BATCH);
89 if (ops < MAX_BATCH) {
95 submitOperations(opsToApply);
97 } catch (final InterruptedException e) {
98 LOG.info("Processing interrupted, terminating", e);
101 // Drain all events, making sure any blocked threads are unblocked
102 while (!queue.isEmpty()) {
108 * Starts new empty transaction, custimizes it with submitted operations
109 * and submit it to data broker.
111 * If transaction chain failed during customization of transaction
112 * it allocates new chain and empty transaction and customizes it
113 * with submitted operations.
115 * This does not retry failed transaction. It only retries it when
116 * chain failed during customization of transaction chain.
120 private void submitOperations(final ArrayList<InventoryOperation> opsToApply) {
121 final ReadWriteTransaction tx = createCustomizedTransaction(opsToApply);
122 LOG.debug("Processed {} operations, submitting transaction {}", opsToApply.size(), tx.getIdentifier());
125 } catch (final IllegalStateException e) {
127 * Transaction chain failed during doing batch, so we need to null
128 * tx chain and continue processing queue.
130 * We fail current txChain which was allocated with createTransaction.
132 failCurrentChain(txChain);
134 * We will retry transaction once in order to not loose any data.
137 final ReadWriteTransaction retryTx = createCustomizedTransaction(opsToApply);
143 * Creates new empty ReadWriteTransaction. If transaction chain
144 * was failed, it will allocate new transaction chain
145 * and assign it with this Operation Executor.
147 * This call is synchronized to prevent reace with {@link #failCurrentChain(TransactionChain)}.
149 * @return New Empty ReadWrite transaction, which continues this chain or starts new transaction
152 private synchronized ReadWriteTransaction newEmptyTransaction() {
154 if(txChain == null) {
155 // Chain was broken so we need to replace it.
156 txChain = dataBroker.createTransactionChain(this);
158 return txChain.newReadWriteTransaction();
159 } catch (final IllegalStateException e) {
160 LOG.debug("Chain is broken, need to allocate new transaction chain.",e);
162 * Chain was broken by previous transaction,
163 * but there was race between this.
164 * Chain will be closed by #onTransactionChainFailed method.
166 txChain = dataBroker.createTransactionChain(this);
167 return txChain.newReadWriteTransaction();
172 * Creates customized not-submitted transaction, which is ready to be submitted.
174 * @param opsToApply Operations which are used to customize transaction.
175 * @return Non-empty transaction.
177 private ReadWriteTransaction createCustomizedTransaction(final ArrayList<InventoryOperation> opsToApply) {
178 final ReadWriteTransaction tx = newEmptyTransaction();
179 for(final InventoryOperation op : opsToApply) {
180 op.applyOperation(tx);
185 private synchronized void failCurrentChain(final TransactionChain<?, ?> chain) {
186 if(txChain == chain) {
192 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
193 final Throwable cause) {
194 LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
196 if(txChain == chain) {
197 // Current chain is broken, so we will null it, in order to not use it.
198 failCurrentChain(chain);
203 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
208 public void close() throws InterruptedException {
209 LOG.info("Flow Capable Inventory Provider stopped.");
210 if (this.listenerRegistration != null) {
212 this.listenerRegistration.close();
213 } catch (final Exception e) {
214 LOG.error("Failed to stop inventory provider", e);
216 listenerRegistration = null;
219 if (this.tableFeatureListenerRegistration != null) {
221 this.tableFeatureListenerRegistration.close();
222 } catch (final Exception e) {
223 LOG.error("Failed to stop inventory provider", e);
225 tableFeatureListenerRegistration = null;
228 if (thread != null) {
233 if (txChain != null) {
236 } catch (final IllegalStateException e) {
237 // It is possible chain failed and was closed by #onTransactionChainFailed
238 LOG.debug("Chain was already closed.");
244 public boolean deviceDataDeleteAllowed(NodeId nodeId) {
245 Entity device = new Entity("openflow",nodeId.getValue());
246 Optional<EntityOwnershipState> entityOwnershipState = eos.getOwnershipState(device);
247 if(entityOwnershipState.isPresent()){
248 EntityOwnershipState eState = entityOwnershipState.get();
249 if(eState.isOwner()) { return true; }
251 return !eState.hasOwner();