import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final Shard shard;
private Runnable runOnPendingTransactionsComplete;
+ /**
+ * Optimistic {@link DataTreeCandidate} preparation. Since our DataTree implementation is a
+ * {@link TipProducingDataTree}, each {@link DataTreeCandidate} is also a {@link DataTreeTip}, e.g. another
+ * candidate can be prepared on top of it. They still need to be committed in sequence. Here we track the current
+ * tip of the data tree, which is the last DataTreeCandidate we have in flight, or the DataTree itself.
+ */
+ private TipProducingDataTreeTip tip;
+
private SchemaContext schemaContext;
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
this.metadata = ImmutableList.copyOf(metadata);
+ tip = dataTree;
}
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
@VisibleForTesting
@Deprecated
public DataTreeCandidate commit(final DataTreeModification modification) throws DataValidationFailedException {
+ // Direct modification commit is a utility, which cannot be used while we have transactions in-flight
+ Preconditions.checkState(tip == dataTree, "Cannot modify data tree while transacgitons are pending");
+
modification.ready();
dataTree.validate(modification);
DataTreeCandidate candidate = dataTree.prepare(modification);
}
pendingTransactions.clear();
+ tip = dataTree;
return ret;
}
LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
Exception cause;
try {
- dataTree.validate(modification);
+ tip.validate(modification);
LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
cohort.successfulCanCommit();
entry.lastAccess = shard.ticker().read();
Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
final DataTreeCandidateTip candidate;
try {
- candidate = dataTree.prepare(cohort.getDataTreeModification());
+ candidate = tip.prepare(cohort.getDataTreeModification());
} catch (Exception e) {
failPreCommit(e);
return;
return;
}
+ // Set the tip of the data tree.
+ tip = Verify.verifyNotNull(candidate);
+
entry.lastAccess = shard.ticker().read();
cohort.successfulPreCommit(candidate);
}
return;
}
+ // All pending candidates have been committed, reset the tip to the data tree
+ if (tip == candidate) {
+ tip = dataTree;
+ }
+
shard.getShardMBean().incrementCommittedTransactionCount();
shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
}
}
- void startAbort(final SimpleShardDataTreeCohort cohort) {
+ boolean startAbort(final SimpleShardDataTreeCohort cohort) {
final Iterator<CommitEntry> it = pendingTransactions.iterator();
if (!it.hasNext()) {
LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
- return;
+ return true;
}
// First entry is special, as it may already be committing
final CommitEntry first = it.next();
if (cohort.equals(first.cohort)) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+ LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
cohort.getIdentifier());
- pendingTransactions.remove();
+ it.remove();
+ rebasePreCommittedTransactions(it, dataTree);
processNextTransaction();
- } else {
- LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return true;
}
- return;
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ return false;
}
+ TipProducingDataTreeTip newTip = dataTree;
while (it.hasNext()) {
final CommitEntry e = it.next();
if (cohort.equals(e.cohort)) {
LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
it.remove();
- return;
+ rebasePreCommittedTransactions(it, newTip);
+ return true;
+ } else {
+ newTip = cohort.getCandidate();
}
}
LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ return true;
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void rebasePreCommittedTransactions(Iterator<CommitEntry> iter, TipProducingDataTreeTip newTip) {
+ tip = newTip;
+ while (iter.hasNext()) {
+ final SimpleShardDataTreeCohort cohort = iter.next().cohort;
+ if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
+ LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ } catch (DataValidationFailedException | RuntimeException e) {
+ LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
+ LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+
+ try {
+ tip.validate(cohort.getDataTreeModification());
+ DataTreeCandidateTip candidate = tip.prepare(cohort.getDataTreeModification());
+ cohort.userPreCommit(candidate);
+
+ cohort.setNewCandidate(candidate);
+ tip = candidate;
+ } catch (ExecutionException | TimeoutException | RuntimeException | DataValidationFailedException e) {
+ LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+ cohort.reportFailure(e);
+ }
+ }
+ }
}
void setRunOnPendingTransactionsComplete(final Runnable operation) {
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+@VisibleForTesting
public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
public enum State {
READY,
import com.google.common.base.Verify;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
- private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+
private final DataTreeModification transaction;
private final ShardDataTree dataTree;
private final TransactionIdentifier transactionId;
}
@Override
-
DataTreeModification getDataTreeModification() {
return transaction;
}
}
}
-
@Override
- public void abort(final FutureCallback<Void> callback) {
- dataTree.startAbort(this);
+ public void abort(final FutureCallback<Void> abortCallback) {
+ if (!dataTree.startAbort(this)) {
+ abortCallback.onSuccess(null);
+ return;
+ }
+
+ candidate = null;
state = State.ABORTED;
final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
if (!maybeAborts.isPresent()) {
- callback.onSuccess(null);
+ abortCallback.onSuccess(null);
return;
}
final Future<Iterable<Object>> aborts = maybeAborts.get();
if (aborts.isCompleted()) {
- callback.onSuccess(null);
+ abortCallback.onSuccess(null);
return;
}
@Override
public void onComplete(final Throwable failure, final Iterable<Object> objs) {
if (failure != null) {
- callback.onFailure(failure);
+ abortCallback.onFailure(failure);
} else {
- callback.onSuccess(null);
+ abortCallback.onSuccess(null);
}
}
}, ExecutionContexts.global());
checkState(State.PRE_COMMIT_COMPLETE);
this.callback = Preconditions.checkNotNull(newCallback);
state = State.COMMIT_PENDING;
- dataTree.startCommit(this, candidate);
+
+ if (nextFailure == null) {
+ dataTree.startCommit(this, candidate);
+ } else {
+ failedCommit(nextFailure);
+ }
}
private <T> FutureCallback<T> switchState(final State newState) {
return ret;
}
+ void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+ checkState(State.PRE_COMMIT_COMPLETE);
+ this.candidate = Verify.verifyNotNull(dataTreeCandidate);
+ }
+
void successfulCanCommit() {
switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
- final List<DataTreeCandidateTip> candidates = new ArrayList<>();
+ final List<DataTreeCandidate> candidates = new ArrayList<>();
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
- final List<DataTreeCandidateTip> candidates = new ArrayList<>();
+ final List<DataTreeCandidate> candidates = new ArrayList<>();
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
candidates.add(addCar(shardDataTree));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- private static void verifyOnDataTreeChanged(DOMDataTreeChangeListener listener,
- Consumer<DataTreeCandidate> callback) {
+ private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
+ final Consumer<DataTreeCandidate> callback) {
ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
for (Collection list : changes.getAllValues()) {
return optional.get();
}
- private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree)
+ private static DataTreeCandidate addCar(final ShardDataTree shardDataTree)
throws ExecutionException, InterruptedException {
return addCar(shardDataTree, "altima");
}
- private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree, String name)
+ private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name)
throws ExecutionException, InterruptedException {
return doTransaction(shardDataTree, snapshot -> {
snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
});
}
- private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree)
+ private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree)
throws ExecutionException, InterruptedException {
return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
}
void execute(DataTreeModification snapshot);
}
- private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree,
+ private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
final DataTreeOperation operation) throws ExecutionException, InterruptedException {
final ReadWriteShardDataTreeTransaction transaction =
shardDataTree.newReadWriteTransaction(nextTransactionId());
immediateCanCommit(cohort);
immediatePreCommit(cohort);
- final DataTreeCandidateTip candidate = cohort.getCandidate();
+ final DataTreeCandidate candidate = cohort.getCandidate();
immediateCommit(cohort);
return candidate;
}
- private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree,
- final List<DataTreeCandidateTip> candidates) throws ExecutionException, InterruptedException {
+ private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
+ final List<DataTreeCandidate> candidates) throws ExecutionException, InterruptedException {
final ReadWriteShardDataTreeTransaction transaction =
shardDataTree.newReadWriteTransaction(nextTransactionId());
final DataTreeModification snapshot = transaction.getSnapshot();
- for (final DataTreeCandidateTip candidateTip : candidates) {
+ for (final DataTreeCandidate candidateTip : candidates) {
DataTreeCandidates.applyToModification(snapshot, candidateTip);
}
final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
immediateCanCommit(cohort);
immediatePreCommit(cohort);
- final DataTreeCandidateTip candidate = cohort.getCandidate();
+ final DataTreeCandidate candidate = cohort.getCandidate();
immediateCommit(cohort);
return candidate;
final InOrder inOrder = inOrder(dataTree);
inOrder.verify(dataTree).validate(any(DataTreeModification.class));
inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+
+ // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
+ // validate performs wrapping and we capture that mock
+ // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+
inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
- inOrder.verify(dataTree).validate(any(DataTreeModification.class));
}
};
}
@Test
public void testAbort() throws Exception {
- doNothing().when(mockShardDataTree).startAbort(cohort);
+ doReturn(true).when(mockShardDataTree).startAbort(cohort);
abort(cohort).get();
verify(mockShardDataTree).startAbort(cohort);
@Test
public void testAbortWithCohorts() throws Exception {
- doNothing().when(mockShardDataTree).startAbort(cohort);
+ doReturn(true).when(mockShardDataTree).startAbort(cohort);
final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();