Change-Id: I5896a0b7c3e63d2828361dc93d3582843a35b35d
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
*/
package org.opendaylight.controller.md.inventory.manager;
*/
package org.opendaylight.controller.md.inventory.manager;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
private static final int QUEUE_DEPTH = 500;
class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
private static final int QUEUE_DEPTH = 500;
void enqueue(final InventoryOperation op) {
try {
queue.put(op);
void enqueue(final InventoryOperation op) {
try {
queue.put(op);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Failed to enqueue operation {}", op, e);
}
}
LOG.warn("Failed to enqueue operation {}", op, e);
}
}
if (this.listenerRegistration != null) {
try {
this.listenerRegistration.close();
if (this.listenerRegistration != null) {
try {
this.listenerRegistration.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.error("Failed to stop inventory provider", e);
}
listenerRegistration = null;
LOG.error("Failed to stop inventory provider", e);
}
listenerRegistration = null;
for (; ; ) {
InventoryOperation op = queue.take();
for (; ; ) {
InventoryOperation op = queue.take();
- final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
+ ReadWriteTransaction tx;
+ try {
+ tx = txChain.newReadWriteTransaction();
+ } catch (final IllegalStateException e) {
+ txChain.close();
+ txChain = dataBroker.createTransactionChain(this);
+ tx = txChain.newReadWriteTransaction();
+ }
LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
int ops = 0;
LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
int ops = 0;
LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
+ final Object ident = tx.getIdentifier();
Futures.addCallback(result, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void aVoid) {
Futures.addCallback(result, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void aVoid) {
@Override
public void onFailure(final Throwable throwable) {
@Override
public void onFailure(final Throwable throwable) {
- LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
+ LOG.error("Transaction {} failed.", ident, throwable);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.info("Processing interrupted, terminating", e);
}
LOG.info("Processing interrupted, terminating", e);
}