X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Finventory-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Finventory%2Fmanager%2FFlowCapableInventoryProvider.java;h=29ac12393ac54aa33d2de6b01af01c1213db1c4c;hb=3dd9d7c6da1baaf72f05e1118c0ca47dc16e3c7b;hp=6ed61e3024b9522dbc89d99a979f93fbcb147c7f;hpb=18f8f0b852694daf18e8fd034ba576d78499e0ee;p=controller.git diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java index 6ed61e3024..29ac12393a 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java @@ -7,21 +7,20 @@ */ package org.opendaylight.controller.md.inventory.manager; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - class FlowCapableInventoryProvider implements AutoCloseable, Runnable { private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class); private static final int QUEUE_DEPTH = 500; @@ -29,12 +28,13 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { private final BlockingQueue queue = new LinkedBlockingDeque<>(QUEUE_DEPTH); private final NotificationProviderService notificationService; - private final DataProviderService dataService; - private Registration listenerRegistration; + + private final DataBroker dataBroker; + private ListenerRegistration listenerRegistration; private Thread thread; - FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) { - this.dataService = Preconditions.checkNotNull(dataService); + FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) { + this.dataBroker = Preconditions.checkNotNull(dataBroker); this.notificationService = Preconditions.checkNotNull(notificationService); } @@ -82,10 +82,10 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { @Override public void run() { try { - for (;;) { + for (; ; ) { InventoryOperation op = queue.take(); - final DataModificationTransaction tx = dataService.beginTransaction(); + final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction(); LOG.debug("New operations available, starting transaction {}", tx.getIdentifier()); int ops = 0; @@ -102,14 +102,18 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable { LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); - try { - final RpcResult result = tx.commit().get(); - if(!result.isSuccessful()) { - LOG.error("Transaction {} failed", tx.getIdentifier()); + final CheckedFuture result = tx.submit(); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(Void aVoid) { + //NOOP + } + + @Override + public void onFailure(Throwable throwable) { + LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable); } - } catch (ExecutionException e) { - LOG.warn("Failed to commit inventory change", e.getCause()); - } + }); } } catch (InterruptedException e) { LOG.info("Processing interrupted, terminating", e);