X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Ftopology-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmd%2Fcontroller%2Ftopology%2Fmanager%2FOperationProcessor.java;h=41162d30463d54338d687dfd904ef1f7646dbec6;hb=eccb8a1b65c15fa25650f33f1723ef2e46b745d6;hp=3800413eb1b9964d00207f526177862eb69c5885;hpb=28f20ee0cdbbb586ea2ffcd5982138e98f5307e3;p=controller.git diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java index 3800413eb1..41162d3046 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java @@ -8,26 +8,31 @@ package org.opendaylight.md.controller.topology.manager; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class OperationProcessor implements Runnable { +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener { private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class); private static final int MAX_TRANSACTION_OPERATIONS = 100; private static final int OPERATION_QUEUE_DEPTH = 500; private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); private final DataBroker dataBroker; + private BindingTransactionChain transactionChain; OperationProcessor(final DataBroker dataBroker) { this.dataBroker = Preconditions.checkNotNull(dataBroker); + transactionChain = this.dataBroker.createTransactionChain(this); } void enqueueOperation(final TopologyOperation task) { @@ -44,8 +49,9 @@ final class OperationProcessor implements Runnable { for (; ; ) { TopologyOperation op = queue.take(); - LOG.debug("New operations available, starting transaction"); - final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction(); + LOG.debug("New {} operation available, starting transaction", op); + + final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); int ops = 0; do { @@ -57,30 +63,59 @@ final class OperationProcessor implements Runnable { } else { op = null; } + + LOG.debug("Next operation {}", op); } while (op != null); LOG.debug("Processed {} operations, submitting transaction", ops); - final CheckedFuture txResultFuture = tx.submit(); - Futures.addCallback(txResultFuture, new FutureCallback() { - @Override - public void onSuccess(Object o) { - LOG.debug("Topology export successful for tx :{}", tx.getIdentifier()); - } - - @Override - public void onFailure(Throwable throwable) { - LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause()); - } - }); + try { + tx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + LOG.warn("Stat DataStoreOperation unexpected State!", e); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); + } } - } catch (InterruptedException e) { - LOG.info("Interrupted processing, terminating", e); + } catch (final IllegalStateException e) { + LOG.warn("Stat DataStoreOperation unexpected State!", e); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); + } catch (final InterruptedException e) { + LOG.warn("Stat Manager DS Operation thread interupted!", e); + } catch (final Exception e) { + LOG.warn("Stat DataStore Operation executor fail!", e); } + // Drain all events, making sure any blocked threads are unblocked + cleanDataStoreOperQueue(); + + } + + private void cleanDataStoreOperQueue() { // Drain all events, making sure any blocked threads are unblocked while (!queue.isEmpty()) { queue.poll(); } } + + @Override + public void onTransactionChainFailed(TransactionChain chain, AsyncTransaction transaction, Throwable cause) { + LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause); + } + + @Override + public void onTransactionChainSuccessful(TransactionChain chain) { + //NOOP + } + + @Override + public void close() throws Exception { + if (transactionChain != null) { + transactionChain.close(); + } + + } }