*/
package org.opendaylight.md.controller.topology.manager;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
-final class OperationProcessor implements Runnable {
+final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private static final int OPERATION_QUEUE_DEPTH = 500;
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
- // FIXME: Flow capable topology exporter should use transaction chaining API
- private final DataProviderService dataService;
+ private final DataBroker dataBroker;
+ private BindingTransactionChain transactionChain;
+ private volatile boolean finishing = false;
- OperationProcessor(final DataProviderService dataService) {
- this.dataService = Preconditions.checkNotNull(dataService);
+ OperationProcessor(final DataBroker dataBroker) {
+ this.dataBroker = Preconditions.checkNotNull(dataBroker);
+ transactionChain = this.dataBroker.createTransactionChain(this);
}
void enqueueOperation(final TopologyOperation task) {
@Override
public void run() {
- try {
- for (;;) {
- TopologyOperation op = queue.take();
+ while (!finishing) {
+ try {
+ TopologyOperation op = queue.take();
- LOG.debug("New operations available, starting transaction");
- final DataModificationTransaction tx = dataService.beginTransaction();
+ LOG.debug("New {} operation available, starting transaction", op);
- int ops = 0;
- do {
- op.applyOperation(tx);
+ final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
- ops++;
- if (ops < MAX_TRANSACTION_OPERATIONS) {
- op = queue.poll();
- } else {
- op = null;
- }
- } while (op != null);
+ int ops = 0;
+ do {
+ op.applyOperation(tx);
- LOG.debug("Processed {} operations, submitting transaction", ops);
+ ops++;
+ if (ops < MAX_TRANSACTION_OPERATIONS) {
+ op = queue.poll();
+ } else {
+ op = null;
+ }
- try {
- final RpcResult<TransactionStatus> s = tx.commit().get();
- if (!s.isSuccessful()) {
- LOG.error("Topology export failed for Tx:{}", tx.getIdentifier());
+ LOG.debug("Next operation {}", op);
+ } while (op != null);
+
+ LOG.debug("Processed {} operations, submitting transaction", ops);
+
+ try {
+ tx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
}
- } catch (ExecutionException e) {
- LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause());
+
+ } catch (final IllegalStateException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ } catch (final InterruptedException e) {
+ LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ finishing = true;
+ } catch (final Exception e) {
+ LOG.warn("Stat DataStore Operation executor fail!", e);
}
}
- } catch (InterruptedException e) {
- LOG.info("Interrupted processing, terminating", e);
- }
-
// Drain all events, making sure any blocked threads are unblocked
+ cleanDataStoreOperQueue();
+ }
+
+ private void cleanDataStoreOperQueue() {
while (!queue.isEmpty()) {
queue.poll();
}
}
+
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
+ LOG.error("Failed to export Topology manager operations, Transaction {} failed.", transaction.getIdentifier(), cause);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ //NOOP
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (transactionChain != null) {
+ transactionChain.close();
+ }
+
+ }
}