package org.opendaylight.md.controller.topology.manager;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
private static final int MAX_TRANSACTION_OPERATIONS = 100;
private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
private final DataBroker dataBroker;
- private final BindingTransactionChain transactionChain;
+ private BindingTransactionChain transactionChain;
OperationProcessor(final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
LOG.debug("Processed {} operations, submitting transaction", ops);
- CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
- Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void notUsed) {
- LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
- }
- });
+ try {
+ tx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ }
}
- } catch (InterruptedException e) {
- LOG.info("Interrupted processing, terminating", e);
+ } catch (final IllegalStateException e) {
+ LOG.warn("Stat DataStoreOperation unexpected State!", e);
+ transactionChain.close();
+ transactionChain = dataBroker.createTransactionChain(this);
+ cleanDataStoreOperQueue();
+ } catch (final InterruptedException e) {
+ LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ } catch (final Exception e) {
+ LOG.warn("Stat DataStore Operation executor fail!", e);
}
+ // Drain all events, making sure any blocked threads are unblocked
+ cleanDataStoreOperQueue();
+
+ }
+
+ private void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (!queue.isEmpty()) {
queue.poll();