X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Ftopology-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmd%2Fcontroller%2Ftopology%2Fmanager%2FOperationProcessor.java;h=c00943339570dc67ee10ed8349f461132dc1e3e7;hp=d60c88032dbcc7015fc064a791ca9a16921d7332;hb=fdda2ebadfe3729e21448fe8f44a506aa67b5da9;hpb=83e1c610eeefba667a19c243fbc1098072a8079d diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java index d60c88032d..c009433395 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java @@ -7,30 +7,33 @@ */ package org.opendaylight.md.controller.topology.manager; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.yangtools.yang.common.RpcResult; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; -final class OperationProcessor implements Runnable { +final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener { private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class); private static final int MAX_TRANSACTION_OPERATIONS = 100; private static final int OPERATION_QUEUE_DEPTH = 500; private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); - // FIXME: Flow capable topology exporter should use transaction chaining API - private final DataProviderService dataService; + private final DataBroker dataBroker; + private BindingTransactionChain transactionChain; + private volatile boolean finishing = false; - OperationProcessor(final DataProviderService dataService) { - this.dataService = Preconditions.checkNotNull(dataService); + OperationProcessor(final DataBroker dataBroker) { + this.dataBroker = Preconditions.checkNotNull(dataBroker); + transactionChain = this.dataBroker.createTransactionChain(this); } void enqueueOperation(final TopologyOperation task) { @@ -43,43 +46,79 @@ final class OperationProcessor implements Runnable { @Override public void run() { - try { - for (;;) { - TopologyOperation op = queue.take(); + while (!finishing) { + try { + TopologyOperation op = queue.take(); - LOG.debug("New operations available, starting transaction"); - final DataModificationTransaction tx = dataService.beginTransaction(); + LOG.debug("New {} operation available, starting transaction", op); - int ops = 0; - do { - op.applyOperation(tx); + final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); - ops++; - if (ops < MAX_TRANSACTION_OPERATIONS) { - op = queue.poll(); - } else { - op = null; - } - } while (op != null); + int ops = 0; + do { + op.applyOperation(tx); - LOG.debug("Processed {} operations, submitting transaction", ops); + ops++; + if (ops < MAX_TRANSACTION_OPERATIONS) { + op = queue.poll(); + } else { + op = null; + } - try { - final RpcResult s = tx.commit().get(); - if (!s.isSuccessful()) { - LOG.error("Topology export failed for Tx:{}", tx.getIdentifier()); + LOG.debug("Next operation {}", op); + } while (op != null); + + LOG.debug("Processed {} operations, submitting transaction", ops); + + try { + tx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + LOG.warn("Stat DataStoreOperation unexpected State!", e); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); } - } catch (ExecutionException e) { - LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause()); + + } catch (final IllegalStateException e) { + LOG.warn("Stat DataStoreOperation unexpected State!", e); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); + } catch (final InterruptedException e) { + LOG.warn("Stat Manager DS Operation thread interupted!", e); + finishing = true; + } catch (final Exception e) { + LOG.warn("Stat DataStore Operation executor fail!", e); } } - } catch (InterruptedException e) { - LOG.info("Interrupted processing, terminating", e); - } - // Drain all events, making sure any blocked threads are unblocked + cleanDataStoreOperQueue(); + } + + private void cleanDataStoreOperQueue() { while (!queue.isEmpty()) { queue.poll(); } } + + @Override + public void onTransactionChainFailed(TransactionChain chain, AsyncTransaction transaction, Throwable cause) { + LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); + } + + @Override + public void onTransactionChainSuccessful(TransactionChain chain) { + //NOOP + } + + @Override + public void close() throws Exception { + if (transactionChain != null) { + transactionChain.close(); + } + + } }