X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Finventory-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Finventory%2Fmanager%2FFlowCapableInventoryProvider.java;h=d63f2bef86d551280b9815e896ae9c07da96eace;hp=9724d31f9ae6cd3ab8eaef0c23212bf626cfca84;hb=21b68a5e7cedf34ec5cafff21c4e5fd4be222b92;hpb=c222e37f2a0f0f3f6266242fbea2d3b018f4e6e3 diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index 9724d31f9a..d63f2bef86 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -8,33 +8,40 @@ package org.opendaylight.controller.md.inventory.manager; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; -class FlowCapableInventoryProvider implements AutoCloseable, Runnable { +class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); private static final int QUEUE_DEPTH = 500; private static final int MAX_BATCH = 100; private final BlockingQueue queue = new LinkedBlockingDeque<>(QUEUE_DEPTH); private final NotificationProviderService notificationService; - private final DataProviderService dataService; + + private final DataBroker dataBroker; + private BindingTransactionChain txChain; private ListenerRegistration listenerRegistration; private Thread thread; - FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) { - this.dataService = Preconditions.checkNotNull(dataService); + FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) { + this.dataBroker = Preconditions.checkNotNull(dataBroker); this.notificationService = Preconditions.checkNotNull(notificationService); } @@ -42,6 +49,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this); this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter); + this.txChain = dataBroker.createTransactionChain(this); thread = new Thread(this); thread.setDaemon(true); thread.setName("FlowCapableInventoryProvider"); @@ -53,7 +61,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { void enqueue(final InventoryOperation op) { try { queue.put(op); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Failed to enqueue operation {}", op, e); } } @@ -64,7 +72,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { if (this.listenerRegistration != null) { try { this.listenerRegistration.close(); - } catch (Exception e) { + } catch (final Exception e) { LOG.error("Failed to stop inventory provider", e); } listenerRegistration = null; @@ -75,6 +83,10 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { thread.join(); thread = null; } + if (txChain != null) { + txChain.close(); + txChain = null; + } } @@ -82,10 +94,17 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { @Override public void run() { try { - for (;;) { + for (; ; ) { InventoryOperation op = queue.take(); - final DataModificationTransaction tx = dataService.beginTransaction(); + ReadWriteTransaction tx; + try { + tx = txChain.newReadWriteTransaction(); + } catch (final IllegalStateException e) { + txChain.close(); + txChain = dataBroker.createTransactionChain(this); + tx = txChain.newReadWriteTransaction(); + } LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); int ops = 0; @@ -102,16 +121,21 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); - try { - final RpcResult result = tx.commit().get(); - if(!result.isSuccessful()) { - LOG.error("Transaction {} failed", tx.getIdentifier()); + final CheckedFuture result = tx.submit(); + final Object ident = tx.getIdentifier(); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(final Void aVoid) { + //NOOP } - } catch (ExecutionException e) { - LOG.warn("Failed to commit inventory change", e.getCause()); - } + + @Override + public void onFailure(final Throwable throwable) { + LOG.error("Transaction {} failed.", ident, throwable); + } + }); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.info("Processing interrupted, terminating", e); } @@ -120,4 +144,16 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { queue.poll(); } } + + @Override + public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, + final Throwable cause) { + LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause); + + } + + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + // NOOP + } }