private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
private BindingTransactionChain transactionChain;
+ private volatile boolean finishing = false;
OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
@Override
public void run() {
- try {
- for (; ; ) {
- TopologyOperation op = queue.take();
+ while (!finishing) {
+ try {
+ TopologyOperation op = queue.take();
- LOG.debug("New {} operation available, starting transaction", op);
+ LOG.debug("New {} operation available, starting transaction", op);
- final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+ final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
- int ops = 0;
- do {
- op.applyOperation(tx);
+ int ops = 0;
+ do {
+ op.applyOperation(tx);
- ops++;
- if (ops < MAX_TRANSACTION_OPERATIONS) {
- op = queue.poll();
- } else {
- op = null;
- }
+ ops++;
+ if (ops < MAX_TRANSACTION_OPERATIONS) {
+ op = queue.poll();
+ } else {
+ op = null;
+ }
- LOG.debug("Next operation {}", op);
- } while (op != null);
+ LOG.debug("Next operation {}", op);
+ } while (op != null);
- LOG.debug("Processed {} operations, submitting transaction", ops);
+ LOG.debug("Processed {} operations, submitting transaction", ops);
- try {
- tx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
+ try {
+ tx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ }
+
+ } catch (final IllegalStateException e) {
LOG.warn("Stat DataStoreOperation unexpected State!", e);
transactionChain.close();
transactionChain = dataBroker.createTransactionChain(this);
cleanDataStoreOperQueue();
+ } catch (final InterruptedException e) {
+ LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ finishing = true;
+ } catch (final Exception e) {
+ LOG.warn("Stat DataStore Operation executor fail!", e);
}
}
- } catch (final IllegalStateException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- transactionChain.close();
- transactionChain = dataBroker.createTransactionChain(this);
- cleanDataStoreOperQueue();
- } catch (final InterruptedException e) {
- LOG.warn("Stat Manager DS Operation thread interupted!", e);
- } catch (final Exception e) {
- LOG.warn("Stat DataStore Operation executor fail!", e);
- }
-
// Drain all events, making sure any blocked threads are unblocked
cleanDataStoreOperQueue();
-
}
private void cleanDataStoreOperQueue() {
- // Drain all events, making sure any blocked threads are unblocked
while (!queue.isEmpty()) {
queue.poll();
}
@Override
public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
}
@Override