X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Finventory-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Finventory%2Fmanager%2FFlowCapableInventoryProvider.java;h=d63f2bef86d551280b9815e896ae9c07da96eace;hp=3db929b99d9c68065cec7875412901090570530d;hb=21b68a5e7cedf34ec5cafff21c4e5fd4be222b92;hpb=6b3c2a9e446bf91942a48427e5592909a173a399;ds=sidebyside diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index 3db929b99d..d63f2bef86 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -7,12 +7,9 @@ */ package org.opendaylight.controller.md.inventory.manager; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; + import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -25,6 +22,11 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; + class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); private static final int QUEUE_DEPTH = 500; @@ -59,7 +61,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti void enqueue(final InventoryOperation op) { try { queue.put(op); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Failed to enqueue operation {}", op, e); } } @@ -70,7 +72,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti if (this.listenerRegistration != null) { try { this.listenerRegistration.close(); - } catch (Exception e) { + } catch (final Exception e) { LOG.error("Failed to stop inventory provider", e); } listenerRegistration = null; @@ -95,7 +97,14 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti for (; ; ) { InventoryOperation op = queue.take(); - final ReadWriteTransaction tx = txChain.newReadWriteTransaction(); + ReadWriteTransaction tx; + try { + tx = txChain.newReadWriteTransaction(); + } catch (final IllegalStateException e) { + txChain.close(); + txChain = dataBroker.createTransactionChain(this); + tx = txChain.newReadWriteTransaction(); + } LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); int ops = 0; @@ -113,6 +122,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); final CheckedFuture result = tx.submit(); + final Object ident = tx.getIdentifier(); Futures.addCallback(result, new FutureCallback() { @Override public void onSuccess(final Void aVoid) { @@ -121,11 +131,11 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable, Transacti @Override public void onFailure(final Throwable throwable) { - LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable); + LOG.error("Transaction {} failed.", ident, throwable); } }); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.info("Processing interrupted, terminating", e); }