import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
}
- protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
- final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
- final MutableCompositeModification modification) {
- return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
- }
-
- protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
- final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
- final MutableCompositeModification modification,
- final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
+ protected TipProducingDataTree createDelegatingMockDataTree() throws Exception {
+ TipProducingDataTree actual = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+ final TipProducingDataTree mock = mock(TipProducingDataTree.class);
- final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
- tx.getSnapshot().write(path, data);
- final ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
+ doAnswer(invocation -> {
+ actual.validate(invocation.getArgumentAt(0, DataTreeModification.class));
+ return null;
+ }).when(mock).validate(any(DataTreeModification.class));
- modification.addModification(new WriteModification(path, data));
+ doAnswer(invocation -> {
+ return actual.prepare(invocation.getArgumentAt(0, DataTreeModification.class));
+ }).when(mock).prepare(any(DataTreeModification.class));
- return cohort;
- }
+ doAnswer(invocation -> {
+ actual.commit(invocation.getArgumentAt(0, DataTreeCandidate.class));
+ return null;
+ }).when(mock).commit(any(DataTreeCandidate.class));
- protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
- final ShardDataTreeCohort actual) {
- return createDelegatingMockCohort(cohortName, actual, null);
- }
+ doAnswer(invocation -> {
+ actual.setSchemaContext(invocation.getArgumentAt(0, SchemaContext.class));
+ return null;
+ }).when(mock).setSchemaContext(any(SchemaContext.class));
- protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
- final ShardDataTreeCohort actual,
- final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName);
+ doAnswer(invocation -> {
+ return actual.takeSnapshot();
+ }).when(mock).takeSnapshot();
- doAnswer(new Answer<ListenableFuture<Boolean>>() {
- @Override
- public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
- return actual.canCommit();
- }
- }).when(cohort).canCommit();
-
- doAnswer(new Answer<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- if(preCommit != null) {
- return preCommit.apply(actual);
- } else {
- return actual.preCommit();
- }
- }
- }).when(cohort).preCommit();
+ doAnswer(invocation -> {
+ return actual.getRootPath();
+ }).when(mock).getRootPath();
- doAnswer(new Answer<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- return actual.commit();
- }
- }).when(cohort).commit();
-
- doAnswer(new Answer<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- return actual.abort();
- }
- }).when(cohort).abort();
-
- doAnswer(new Answer<DataTreeCandidateTip>() {
- @Override
- public DataTreeCandidateTip answer(final InvocationOnMock invocation) {
- return actual.getCandidate();
- }
- }).when(cohort).getCandidate();
-
- return cohort;
- }
-
- protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
- String transactionID,
- MutableCompositeModification modification,
- boolean doCommitOnReady) {
- if(remoteReadWriteTransaction){
- return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION,
- doCommitOnReady);
- } else {
- setupCohortDecorator(shard, cohort);
- return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
- }
+ return mock;
}
protected ShardDataTreeCohort mockShardDataTreeCohort() {
ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class);
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- doReturn(mockCandidate("candidate")).when(cohort).getCandidate();
+ DataTreeCandidate candidate = mockCandidate("candidate");
+ successfulCanCommit(cohort);
+ successfulPreCommit(cohort, candidate);
+ successfulCommit(cohort);
+ doReturn(candidate).when(cohort).getCandidate();
return cohort;
}
- static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
- ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
- doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
- doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
- return mockParent;
- }
-
- protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort,
- String transactionID, short version, boolean doCommitOnReady) {
- return new ForwardedReadyTransaction(transactionID, version,
- new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID,
- mock(DataTreeModification.class)), doCommitOnReady);
- }
-
- protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
- String transactionID,
- MutableCompositeModification modification) {
- return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false);
- }
+ protected Map<TransactionIdentifier, CapturingShardDataTreeCohort> setupCohortDecorator(final Shard shard,
+ final TransactionIdentifier... transactionIDs) {
+ final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = new HashMap<>();
+ for(TransactionIdentifier id: transactionIDs) {
+ cohortMap.put(id, new CapturingShardDataTreeCohort());
+ }
- protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) {
shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() {
@Override
- public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) {
+ public ShardDataTreeCohort decorate(final Identifier transactionID, final ShardDataTreeCohort actual) {
+ CapturingShardDataTreeCohort cohort = cohortMap.get(transactionID);
+ cohort.setDelegate(actual);
return cohort;
}
});
+
+ return cohortMap;
}
- protected BatchedModifications prepareBatchedModifications(String transactionID,
- MutableCompositeModification modification) {
+ protected BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+ final MutableCompositeModification modification) {
return prepareBatchedModifications(transactionID, modification, false);
}
- private static BatchedModifications prepareBatchedModifications(String transactionID,
- MutableCompositeModification modification,
- boolean doCommitOnReady) {
- final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+ protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+ final MutableCompositeModification modification,
+ final boolean doCommitOnReady) {
+ final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION);
batchedModifications.addModification(modification);
batchedModifications.setReady(true);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
return batchedModifications;
}
+ protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+ final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean doCommitOnReady) {
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new WriteModification(path, data));
+ return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
+ }
+
+ protected static ForwardedReadyTransaction prepareForwardedReadyTransaction(final TestActorRef<Shard> shard,
+ final TransactionIdentifier transactionID, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data, final boolean doCommitOnReady) {
+ ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+ newReadWriteTransaction(transactionID);
+ rwTx.getSnapshot().write(path, data);
+ return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady);
+ }
public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard, final YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
return store.takeSnapshot().readNode(id).orNull();
}
- public static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
+ public void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
- Future<Object> future = Patterns.ask(shard, newBatchedModifications("tx", id, node, true, true, 1),
+ Future<Object> future = Patterns.ask(shard, newBatchedModifications(nextTransactionId(), id, node, true, true, 1),
new Timeout(5, TimeUnit.SECONDS));
try {
Await.ready(future, Duration.create(5, TimeUnit.SECONDS));
}
public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
- final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
- final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
-
- transaction.getSnapshot().write(id, node);
- final ShardDataTreeCohort cohort = transaction.ready();
- cohort.canCommit().get();
- cohort.preCommit().get();
- cohort.commit();
+ final NormalizedNode<?,?> node) throws Exception {
+ BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1);
+ DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
+ batched.apply(modification);
+ store.commit(modification);
}
- public static void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
- final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
- final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
+ public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
+ final NormalizedNode<?,?> node) throws Exception {
+ final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION);
+ batched.addModification(new MergeModification(id, node));
+ batched.setReady(true);
+ batched.setDoCommitOnReady(true);
+ batched.setTotalMessagesSent(1);
- transaction.getSnapshot().merge(id, node);
- final ShardDataTreeCohort cohort = transaction.ready();
- cohort.canCommit().get();
- cohort.preCommit().get();
- cohort.commit();
+ DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
+ batched.apply(modification);
+ store.commit(modification);
}
public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.EMPTY);
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
- SerializationUtils.serializeNormalizedNode(root),
+ new PreBoronShardDataTreeSnapshot(root).serialize(),
Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
return testStore;
}
- static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
+ static CommitTransactionPayload payloadForModification(final DataTree source, final DataTreeModification mod,
+ final TransactionIdentifier transactionId) throws DataValidationFailedException, IOException {
source.validate(mod);
final DataTreeCandidate candidate = source.prepare(mod);
source.commit(candidate);
- return DataTreeCandidatePayload.create(candidate);
+ return CommitTransactionPayload.create(transactionId, candidate);
}
- static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
- return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
- }
-
- static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
+ static BatchedModifications newBatchedModifications(final TransactionIdentifier transactionID,
final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
final int messagesSent) {
- final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
+ final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
batched.addModification(new WriteModification(path, data));
batched.setReady(ready);
batched.setDoCommitOnReady(doCommitOnReady);
return delegate.create();
}
}
+
+ public static class CapturingShardDataTreeCohort extends ShardDataTreeCohort {
+ private volatile ShardDataTreeCohort delegate;
+ private FutureCallback<Void> canCommit;
+ private FutureCallback<DataTreeCandidate> preCommit;
+ private FutureCallback<UnsignedLong> commit;
+
+ public void setDelegate(ShardDataTreeCohort delegate) {
+ this.delegate = delegate;
+ }
+
+ public FutureCallback<Void> getCanCommit() {
+ assertNotNull("canCommit was not invoked", canCommit);
+ return canCommit;
+ }
+
+ public FutureCallback<DataTreeCandidate> getPreCommit() {
+ assertNotNull("preCommit was not invoked", preCommit);
+ return preCommit;
+ }
+
+ public FutureCallback<UnsignedLong> getCommit() {
+ assertNotNull("commit was not invoked", commit);
+ return commit;
+ }
+
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return delegate.getIdentifier();
+ }
+
+ @Override
+ DataTreeCandidateTip getCandidate() {
+ return delegate.getCandidate();
+ }
+
+ @Override
+ DataTreeModification getDataTreeModification() {
+ return delegate.getDataTreeModification();
+ }
+
+ @Override
+ public void canCommit(FutureCallback<Void> callback) {
+ canCommit = mockFutureCallback(callback);
+ delegate.canCommit(canCommit);
+ }
+
+ @Override
+ public void preCommit(FutureCallback<DataTreeCandidate> callback) {
+ preCommit = mockFutureCallback(callback);
+ delegate.preCommit(preCommit);
+ }
+
+ @Override
+ public void commit(FutureCallback<UnsignedLong> callback) {
+ commit = mockFutureCallback(callback);
+ delegate.commit(commit);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> FutureCallback<T> mockFutureCallback(final FutureCallback<T> actual ) {
+ FutureCallback<T> mock = mock(FutureCallback.class);
+ doAnswer(invocation -> {
+ actual.onFailure(invocation.getArgumentAt(0, Throwable.class));
+ return null;
+ }).when(mock).onFailure(any(Throwable.class));
+
+ doAnswer(invocation -> {
+ actual.onSuccess((T) invocation.getArgumentAt(0, Throwable.class));
+ return null;
+ }).when(mock).onSuccess((T) any(Object.class));
+
+ return mock;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ public boolean isFailed() {
+ return delegate.isFailed();
+ }
+
+ @Override
+ public State getState() {
+ return delegate.getState();
+ }
+ }
}