*/
package org.opendaylight.controller.md.inventory.manager;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
private static final int QUEUE_DEPTH = 500;
final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
- this.txChain = dataBroker.createTransactionChain(this);
+ this.txChain = dataBroker.createTransactionChain(this);
thread = new Thread(this);
thread.setDaemon(true);
thread.setName("FlowCapableInventoryProvider");
void enqueue(final InventoryOperation op) {
try {
queue.put(op);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Failed to enqueue operation {}", op, e);
}
}
if (this.listenerRegistration != null) {
try {
this.listenerRegistration.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Failed to stop inventory provider", e);
}
listenerRegistration = null;
thread.join();
thread = null;
}
- if(txChain != null) {
+ if (txChain != null) {
txChain.close();
txChain = null;
}
for (; ; ) {
InventoryOperation op = queue.take();
- final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
+ ReadWriteTransaction tx;
+ try {
+ tx = txChain.newReadWriteTransaction();
+ } catch (final IllegalStateException e) {
+ txChain.close();
+ txChain = dataBroker.createTransactionChain(this);
+ tx = txChain.newReadWriteTransaction();
+ }
LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
int ops = 0;
LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
+ final Object ident = tx.getIdentifier();
Futures.addCallback(result, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void aVoid) {
@Override
public void onFailure(final Throwable throwable) {
- LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
+ LOG.error("Transaction {} failed.", ident, throwable);
}
});
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.info("Processing interrupted, terminating", e);
}
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+ final Throwable cause) {
+ LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
}