X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Ftopology-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ftopology%2Fmanager%2FOperationProcessor.java;h=3312779c8ef1c98926ac3cb1d5bd558ad3dde016;hb=cfe3a97837951ebbedb337dc988027f10c49f714;hp=bbfae52b84910b179fe182d9b3fa345a0ef90934;hpb=2f0bb7b74a58dde036e69677e5dba089863d82cf;p=openflowplugin.git diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java index bbfae52b84..3312779c8e 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/OperationProcessor.java @@ -7,37 +7,39 @@ */ package org.opendaylight.openflowplugin.applications.topology.manager; -import com.google.common.base.Preconditions; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.apache.aries.blueprint.annotation.service.Reference; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener { +@Singleton +public final class OperationProcessor implements AutoCloseable, Runnable { private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class); private static final int MAX_TRANSACTION_OPERATIONS = 100; private static final int OPERATION_QUEUE_DEPTH = 500; + private static final String TOPOLOGY_MANAGER = "ofp-topo-processor"; private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); - private final DataBroker dataBroker; private final Thread thread; - private BindingTransactionChain transactionChain; + private final TransactionChainManager transactionChainManager; private volatile boolean finishing = false; - public OperationProcessor(final DataBroker dataBroker) { - this.dataBroker = Preconditions.checkNotNull(dataBroker); - transactionChain = this.dataBroker.createTransactionChain(this); + @Inject + public OperationProcessor(@Reference final DataBroker dataBroker) { + transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_MANAGER); + transactionChainManager.activateTransactionManager(); + transactionChainManager.initialSubmitWriteTransaction(); thread = new Thread(this); thread.setDaemon(true); - thread.setName("FlowCapableTopologyExporter-" + FlowCapableTopologyProvider.TOPOLOGY_ID); + thread.setName("ofp-topo-expo-" + FlowCapableTopologyProvider.TOPOLOGY_ID); } void enqueueOperation(final TopologyOperation task) { @@ -48,64 +50,47 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa } } + @PostConstruct public void start() { thread.start(); } @Override public void run() { - while (!finishing) { - try { - TopologyOperation op = queue.take(); - - LOG.debug("New {} operation available, starting transaction", op); - - final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); - - int ops = 0; - do { - op.applyOperation(tx); - - ops++; - if (ops < MAX_TRANSACTION_OPERATIONS) { - op = queue.poll(); - } else { - op = null; - } - - LOG.debug("Next operation {}", op); - } while (op != null); - - LOG.debug("Processed {} operations, submitting transaction", ops); - submitTransaction(tx); - } catch (final IllegalStateException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); + while (!finishing) { + try { + TopologyOperation op = queue.take(); + + LOG.debug("New {} operation available, starting transaction", op); + + int ops = 0; + do { + op.applyOperation(transactionChainManager); + + ops++; + if (ops < MAX_TRANSACTION_OPERATIONS) { + op = queue.poll(); + } else { + op = null; + } + + LOG.debug("Next operation {}", op); + } while (op != null); + + LOG.debug("Processed {} operations, submitting transaction", ops); + if (!transactionChainManager.submitTransaction()) { cleanDataStoreOperQueue(); - } catch (final InterruptedException e) { - // This should mean we're shutting down. - LOG.debug("Stat Manager DS Operation thread interrupted!", e); - finishing = true; - } catch (final Exception e) { - LOG.warn("Stat DataStore Operation executor fail!", e); } + } catch (final InterruptedException e) { + // This should mean we're shutting down. + LOG.debug("Stat Manager DS Operation thread interrupted!", e); + finishing = true; } + } // Drain all events, making sure any blocked threads are unblocked cleanDataStoreOperQueue(); } - private void submitTransaction(ReadWriteTransaction tx) { - try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); - cleanDataStoreOperQueue(); - } - } - private void cleanDataStoreOperQueue() { while (!queue.isEmpty()) { queue.poll(); @@ -113,31 +98,16 @@ public final class OperationProcessor implements AutoCloseable, Runnable, Transa } @Override - public void onTransactionChainFailed(TransactionChain chain, AsyncTransaction transaction, Throwable cause) { - LOG.warn("Failed to export Topology manager operations, Transaction {} failed: {}", transaction.getIdentifier(), cause.getMessage()); - LOG.debug("Failed to export Topology manager operations.. ", cause); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); - cleanDataStoreOperQueue(); - } - - @Override - public void onTransactionChainSuccessful(TransactionChain chain) { - //NOOP - } - - @Override + @PreDestroy public void close() { thread.interrupt(); try { thread.join(); - } catch(InterruptedException e) { + } catch (InterruptedException e) { LOG.debug("Join of thread {} was interrupted", thread.getName(), e); } - if (transactionChain != null) { - transactionChain.close(); - } + transactionChainManager.close(); LOG.debug("OperationProcessor closed"); }