import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
/**
* This executor is used to execute Future listener callback Runnables async.
*/
- private final ExecutorService clientFutureCallbackExecutor;
+ private final Executor clientFutureCallbackExecutor;
- /**
- * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead
- * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call.
- */
- private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
-
- public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, ExecutorService listenableFutureExecutor) {
+ public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
super(datastores);
this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
}
}
@Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
+ protected CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
}
}
}
};
ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(canCommitFuture, futureCallback, MoreExecutors.directExecutor());
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
- Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(preCommitFuture, this, MoreExecutors.directExecutor());
}
}
};
ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(preCommitFuture, futureCallback, MoreExecutors.directExecutor());
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
clientSubmitFuture.set();
} else {
ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
- Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(commitFuture, this, MoreExecutors.directExecutor());
}
}
};
ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(commitFuture, futureCallback, MoreExecutors.directExecutor());
}
- private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
+ private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
final String phase, final TransactionCommitFailedExceptionMapper exMapper,
final Throwable t) {
- if(clientSubmitFuture.isDone()) {
+ if (clientSubmitFuture.isDone()) {
// We must have had failures from multiple cohorts.
return;
}
LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
- Exception e;
- if(t instanceof Exception) {
+ final Exception e;
+ if(t instanceof NoShardLeaderException || t instanceof ShardLeaderNotRespondingException) {
+ e = new DataStoreUnavailableException(t.getMessage(), t);
+ } else if (t instanceof Exception) {
e = (Exception)t;
} else {
e = new RuntimeException("Unexpected error occurred", t);
@SuppressWarnings("unchecked")
ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
int i = 0;
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
canCommitFutures[i++] = cohort.abort();
}
// what's interesting to the client.
clientSubmitFuture.setException(clientException);
}
- }, internalFutureCallbackExecutor);
+ }, MoreExecutors.directExecutor());
}
/**
*/
private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
- private final ExecutorService listenerExecutor;
+ private final Executor listenerExecutor;
- AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) {
- this.listenerExecutor = listenerExecutor;
+ AsyncNotifyingSettableFuture(Executor listenerExecutor) {
+ this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
}
@Override
}
}
}
-
- /**
- * A simple same-thread executor without the internal locking overhead that
- * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we
- * don't shutdown the executor so the other methods irrelevant.
- */
- private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService {
-
- @Override
- public void execute(Runnable command) {
- command.run();
- }
-
- @Override
- public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
- return true;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- return null;
- }
- }
}