import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
+import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.LoggerFactory;
/**
- * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * ConcurrentDOMDataBroker commits transactions concurrently. The 3
* commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
* (ie async) per transaction but multiple transaction commits can run concurrent.
*
* @author Thomas Pantelis
*/
-public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
/**
* This executor is used to execute Future listener callback Runnables async.
*/
- private final ExecutorService clientFutureCallbackExecutor;
+ private final Executor clientFutureCallbackExecutor;
- /**
- * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead
- * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call.
- */
- private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
-
- public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, ExecutorService listenableFutureExecutor) {
+ public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
super(datastores);
this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
}
}
@Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
+ protected CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+ if(cohorts.isEmpty()){
+ return Futures.immediateCheckedFuture(null);
+ }
+
final AsyncNotifyingSettableFuture clientSubmitFuture =
new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
}
}
}
};
ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(canCommitFuture, futureCallback, MoreExecutors.directExecutor());
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
- Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(preCommitFuture, this, MoreExecutors.directExecutor());
}
}
};
ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(preCommitFuture, futureCallback, MoreExecutors.directExecutor());
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
clientSubmitFuture.set();
} else {
ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
- Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(commitFuture, this, MoreExecutors.directExecutor());
}
}
};
ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(commitFuture, futureCallback, MoreExecutors.directExecutor());
}
- private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
+ private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
final String phase, final TransactionCommitFailedExceptionMapper exMapper,
final Throwable t) {
- if(clientSubmitFuture.isDone()) {
+ if (clientSubmitFuture.isDone()) {
// We must have had failures from multiple cohorts.
return;
}
LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
- Exception e;
- if(t instanceof Exception) {
+ final Exception e;
+ if(t instanceof NoShardLeaderException || t instanceof ShardLeaderNotRespondingException) {
+ e = new DataStoreUnavailableException(t.getMessage(), t);
+ } else if (t instanceof Exception) {
e = (Exception)t;
} else {
e = new RuntimeException("Unexpected error occurred", t);
@SuppressWarnings("unchecked")
ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
int i = 0;
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
canCommitFutures[i++] = cohort.abort();
}
// what's interesting to the client.
clientSubmitFuture.setException(clientException);
}
- }, internalFutureCallbackExecutor);
+ }, MoreExecutors.directExecutor());
}
/**
*/
private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
- private final ExecutorService listenerExecutor;
+ private final Executor listenerExecutor;
- AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) {
- this.listenerExecutor = listenerExecutor;
+ AsyncNotifyingSettableFuture(Executor listenerExecutor) {
+ this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
}
@Override
}
}
}
-
- /**
- * A simple same-thread executor without the internal locking overhead that
- * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we
- * don't shutdown the executor so the other methods irrelevant.
- */
- private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService {
-
- @Override
- public void execute(Runnable command) {
- command.run();
- }
-
- @Override
- public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
- return true;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- return null;
- }
- }
}