import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
// Send the remaining batched modifications, if any, with the ready flag set.
bumpPermits(havePermit);
- return sendBatchedModifications(true, true);
+ return sendBatchedModifications(true, true, Optional.empty());
}
@Override
- public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+ final Optional<SortedSet<String>> participatingShardNames) {
logModificationCount();
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
bumpPermits(havePermit);
- Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
return transformReadyReply(lastModificationsFuture);
}
}
protected Future<Object> sendBatchedModifications() {
- return sendBatchedModifications(false, false);
+ return sendBatchedModifications(false, false, Optional.empty());
}
- protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
+ protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
+ final Optional<SortedSet<String>> participatingShardNames) {
Future<Object> sent = null;
if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
if (batchedModifications == null) {
LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
batchedModifications.getModifications().size(), ready);
- batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
batchPermits = 0;
if (ready) {
+ batchedModifications.setReady(participatingShardNames);
+ batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
}
if (failure != null) {
- LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- failure);
+ LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(),
+ failure);
returnFuture.setException(new ReadFailedException("Error checking "
+ readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));