X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Ftopology-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmd%2Fcontroller%2Ftopology%2Fmanager%2FOperationProcessor.java;h=3800413eb1b9964d00207f526177862eb69c5885;hb=1eeb5e88a3593ebd781b733123b8e12cf4cf757c;hp=d60c88032dbcc7015fc064a791ca9a16921d7332;hpb=cfe8a5cc4ce37fd644eb17884f8e9c30c0cbba9b;p=controller.git diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java index d60c88032d..3800413eb1 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java @@ -7,30 +7,27 @@ */ package org.opendaylight.md.controller.topology.manager; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - final class OperationProcessor implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class); private static final int MAX_TRANSACTION_OPERATIONS = 100; private static final int OPERATION_QUEUE_DEPTH = 500; private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); - // FIXME: Flow capable topology exporter should use transaction chaining API - private final DataProviderService dataService; + private final DataBroker dataBroker; - OperationProcessor(final DataProviderService dataService) { - this.dataService = Preconditions.checkNotNull(dataService); + OperationProcessor(final DataBroker dataBroker) { + this.dataBroker = Preconditions.checkNotNull(dataBroker); } void enqueueOperation(final TopologyOperation task) { @@ -44,11 +41,11 @@ final class OperationProcessor implements Runnable { @Override public void run() { try { - for (;;) { + for (; ; ) { TopologyOperation op = queue.take(); LOG.debug("New operations available, starting transaction"); - final DataModificationTransaction tx = dataService.beginTransaction(); + final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction(); int ops = 0; do { @@ -64,14 +61,18 @@ final class OperationProcessor implements Runnable { LOG.debug("Processed {} operations, submitting transaction", ops); - try { - final RpcResult s = tx.commit().get(); - if (!s.isSuccessful()) { - LOG.error("Topology export failed for Tx:{}", tx.getIdentifier()); + final CheckedFuture txResultFuture = tx.submit(); + Futures.addCallback(txResultFuture, new FutureCallback() { + @Override + public void onSuccess(Object o) { + LOG.debug("Topology export successful for tx :{}", tx.getIdentifier()); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause()); } - } catch (ExecutionException e) { - LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause()); - } + }); } } catch (InterruptedException e) { LOG.info("Interrupted processing, terminating", e);