X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Ftopology-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmd%2Fcontroller%2Ftopology%2Fmanager%2FOperationProcessor.java;h=c00943339570dc67ee10ed8349f461132dc1e3e7;hp=41162d30463d54338d687dfd904ef1f7646dbec6;hb=9e8add2114ce1c3fd18a860af6e7419270611209;hpb=09563b7786e591a8faaf1947a1fb27b8183414f2 diff --git a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java index 41162d3046..c009433395 100644 --- a/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java +++ b/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java @@ -29,6 +29,7 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh private final BlockingQueue queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH); private final DataBroker dataBroker; private BindingTransactionChain transactionChain; + private volatile boolean finishing = false; OperationProcessor(final DataBroker dataBroker) { this.dataBroker = Preconditions.checkNotNull(dataBroker); @@ -45,57 +46,56 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh @Override public void run() { - try { - for (; ; ) { - TopologyOperation op = queue.take(); + while (!finishing) { + try { + TopologyOperation op = queue.take(); - LOG.debug("New {} operation available, starting transaction", op); + LOG.debug("New {} operation available, starting transaction", op); - final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); + final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction(); - int ops = 0; - do { - op.applyOperation(tx); + int ops = 0; + do { + op.applyOperation(tx); - ops++; - if (ops < MAX_TRANSACTION_OPERATIONS) { - op = queue.poll(); - } else { - op = null; - } + ops++; + if (ops < MAX_TRANSACTION_OPERATIONS) { + op = queue.poll(); + } else { + op = null; + } - LOG.debug("Next operation {}", op); - } while (op != null); + LOG.debug("Next operation {}", op); + } while (op != null); - LOG.debug("Processed {} operations, submitting transaction", ops); + LOG.debug("Processed {} operations, submitting transaction", ops); - try { - tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { + try { + tx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + LOG.warn("Stat DataStoreOperation unexpected State!", e); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); + } + + } catch (final IllegalStateException e) { LOG.warn("Stat DataStoreOperation unexpected State!", e); transactionChain.close(); transactionChain = dataBroker.createTransactionChain(this); cleanDataStoreOperQueue(); + } catch (final InterruptedException e) { + LOG.warn("Stat Manager DS Operation thread interupted!", e); + finishing = true; + } catch (final Exception e) { + LOG.warn("Stat DataStore Operation executor fail!", e); } } - } catch (final IllegalStateException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - transactionChain.close(); - transactionChain = dataBroker.createTransactionChain(this); - cleanDataStoreOperQueue(); - } catch (final InterruptedException e) { - LOG.warn("Stat Manager DS Operation thread interupted!", e); - } catch (final Exception e) { - LOG.warn("Stat DataStore Operation executor fail!", e); - } - // Drain all events, making sure any blocked threads are unblocked cleanDataStoreOperQueue(); - } private void cleanDataStoreOperQueue() { - // Drain all events, making sure any blocked threads are unblocked while (!queue.isEmpty()) { queue.poll(); } @@ -104,6 +104,9 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh @Override public void onTransactionChainFailed(TransactionChain chain, AsyncTransaction transaction, Throwable cause) { LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause); + transactionChain.close(); + transactionChain = dataBroker.createTransactionChain(this); + cleanDataStoreOperQueue(); } @Override