package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.slf4j.Logger;
}
@Override
- public ListenableFuture<Void> commit() {
- final ListenableFuture<Void> ret = delegate.commit();
-
- Futures.addCallback(ret, new FutureCallback<Void>() {
+ public void commit(final FutureCallback<UnsignedLong> callback) {
+ delegate.commit(new FutureCallback<UnsignedLong>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(final UnsignedLong result) {
chain.clearTransaction(transaction);
LOG.debug("Committed transaction {}", transaction);
+ callback.onSuccess(result);
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+ callback.onFailure(t);
}
});
+ }
- return ret;
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return delegate.getIdentifier();
}
@Override
- public ListenableFuture<Boolean> canCommit() {
- return delegate.canCommit();
+ public void canCommit(final FutureCallback<Void> callback) {
+ delegate.canCommit(callback);
}
@Override
- public ListenableFuture<Void> preCommit() {
- return delegate.preCommit();
+ public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+ delegate.preCommit(callback);
}
@Override
DataTreeModification getDataTreeModification() {
return delegate.getDataTreeModification();
}
+
+ @Override
+ public boolean isFailed() {
+ return delegate.isFailed();
+ }
+
+ @Override
+ public State getState() {
+ return delegate.getState();
+ }
}
\ No newline at end of file
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.Duration;
final class CohortEntry {
- enum State {
- PENDING,
- CAN_COMMITTED,
- PRE_COMMITTED,
- COMMITTED,
- ABORTED
- }
-
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
-
- private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private final ReadWriteShardDataTreeTransaction transaction;
private final TransactionIdentifier transactionID;
- private final CompositeDataTreeCohort userCohorts;
private final short clientVersion;
- private State state = State.PENDING;
private RuntimeException lastBatchedModificationsException;
private int totalBatchedModificationsReceived;
private ShardDataTreeCohort cohort;
private ActorRef replySender;
private Shard shard;
- private CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
- DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
+ private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionID = Preconditions.checkNotNull(transactionID);
+ this.transactionID = transaction.getId();
this.clientVersion = clientVersion;
- this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- private CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
- SchemaContext schema, short clientVersion) {
- this.transactionID = Preconditions.checkNotNull(transactionID);
- this.cohort = cohort;
+ private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
+ this.cohort = Preconditions.checkNotNull(cohort);
+ this.transactionID = cohort.getIdentifier();
this.transaction = null;
this.clientVersion = clientVersion;
- this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
- }
-
- static CohortEntry createOpen(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
- DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
- return new CohortEntry(transactionID, transaction, cohortRegistry, schema, clientVersion);
}
- static CohortEntry createReady(TransactionIdentifier transactionID, ShardDataTreeCohort cohort,
- DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
- return new CohortEntry(transactionID, cohort, cohortRegistry, schema, clientVersion);
+ static CohortEntry createOpen(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
+ return new CohortEntry(transaction, clientVersion);
}
- void updateLastAccessTime() {
- lastAccessTimer.reset();
- lastAccessTimer.start();
+ static CohortEntry createReady(final ShardDataTreeCohort cohort, final short clientVersion) {
+ return new CohortEntry(cohort, clientVersion);
}
TransactionIdentifier getTransactionID() {
return clientVersion;
}
- State getState() {
- return state;
- }
-
- DataTreeCandidate getCandidate() {
- return cohort.getCandidate();
+ boolean isFailed() {
+ return cohort != null && cohort.isFailed();
}
DataTreeModification getDataTreeModification() {
return lastBatchedModificationsException;
}
- void applyModifications(Iterable<Modification> modifications) {
+ void applyModifications(final Iterable<Modification> modifications) {
totalBatchedModificationsReceived++;
if(lastBatchedModificationsException == null) {
for (Modification modification : modifications) {
}
}
- boolean canCommit() throws InterruptedException, ExecutionException {
- state = State.CAN_COMMITTED;
-
- // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
- // about possibly accessing our state on a different thread outside of our dispatcher.
- // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
- // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
- // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
- return cohort.canCommit().get();
+ void canCommit(final FutureCallback<Void> callback) {
+ cohort.canCommit(callback);
}
-
-
- void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.PRE_COMMITTED;
- cohort.preCommit().get();
- userCohorts.canCommit(cohort.getCandidate());
- userCohorts.preCommit();
+ void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+ cohort.preCommit(callback);
}
- void commit() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.COMMITTED;
- cohort.commit().get();
- userCohorts.commit();
+ void commit(final FutureCallback<UnsignedLong> callback) {
+ cohort.commit(callback);
}
void abort() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.ABORTED;
cohort.abort().get();
- userCohorts.abort();
}
- void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
+ void ready(final CohortDecorator cohortDecorator) {
Preconditions.checkState(cohort == null, "cohort was already set");
- setDoImmediateCommit(doImmediateCommit);
-
cohort = transaction.ready();
if(cohortDecorator != null) {
}
}
- boolean isReadyToCommit() {
- return replySender != null;
- }
-
- boolean isExpired(long expireTimeInMillis) {
- return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
- }
-
boolean isDoImmediateCommit() {
return doImmediateCommit;
}
- void setDoImmediateCommit(boolean doImmediateCommit) {
+ void setDoImmediateCommit(final boolean doImmediateCommit) {
this.doImmediateCommit = doImmediateCommit;
}
return replySender;
}
- void setReplySender(ActorRef replySender) {
+ void setReplySender(final ActorRef replySender) {
this.replySender = replySender;
}
return shard;
}
- void setShard(Shard shard) {
+ void setShard(final Shard shard) {
this.shard = shard;
}
-
- boolean isAborted() {
- return state == State.ABORTED;
- }
-
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
@Override
- public Failure recover(Throwable error) throws Throwable {
+ public Failure recover(final Throwable error) throws Throwable {
return new Failure(error);
}
};
private Iterable<Success> successfulFromPrevious;
private State state = State.IDLE;
- CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID,
- SchemaContext schema, Timeout timeout) {
+ CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
+ final SchemaContext schema, final Timeout timeout) {
this.registry = Preconditions.checkNotNull(registry);
this.txId = Preconditions.checkNotNull(transactionID);
this.schema = Preconditions.checkNotNull(schema);
this.timeout = Preconditions.checkNotNull(timeout);
}
- void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
+ void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
// FIXME: Optimize empty collection list with pre-created futures, containing success.
Future<Iterable<Object>> canCommitsFuture =
Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
@Override
- public Future<Object> apply(CanCommit input) {
+ public Future<Object> apply(final CanCommit input) {
return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
ExecutionContexts.global());
}
processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
}
- void abort() throws TimeoutException {
+ Optional<Future<Iterable<Object>>> abort() {
if (successfulFromPrevious != null) {
- sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
+ return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
}
+
+ return Optional.empty();
}
private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
@Override
- public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
+ public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
return Patterns.ask(cohortResponse.getCohort(), message, timeout);
}
}, ExecutionContexts.global());
}
- private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
+ private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
throws TimeoutException, ExecutionException {
final Iterable<Object> results;
try {
changeStateFrom(currentState, afterState);
}
- void changeStateFrom(State expected, State followup) {
+ void changeStateFrom(final State expected, final State followup) {
Preconditions.checkState(state == expected);
state = followup;
}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Status;
+import akka.util.Timeout;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
- void registerCohort(ActorRef sender, RegisterCohort cohort) {
+ void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
takeLock();
try {
final ActorRef cohortRef = cohort.getCohort();
sender.tell(new Status.Success(null), ActorRef.noSender());
}
- void removeCommitCohort(ActorRef sender, RemoveCohort message) {
+ void removeCommitCohort(final ActorRef sender, final RemoveCohort message) {
final ActorRef cohort = message.getCohort();
final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort);
if (node != null) {
cohort.tell(PoisonPill.getInstance(), cohort);
}
- Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(TransactionIdentifier txId,
- DataTreeCandidate candidate, SchemaContext schema) {
+ Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(final TransactionIdentifier txId,
+ final DataTreeCandidate candidate, final SchemaContext schema) {
try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
}
}
- void process(ActorRef sender, CohortRegistryCommand message) {
+ void process(final ActorRef sender, final CohortRegistryCommand message) {
if (message instanceof RegisterCohort) {
registerCohort(sender, (RegisterCohort) message);
} else if (message instanceof RemoveCohort) {
private final ActorRef cohort;
- CohortRegistryCommand(ActorRef cohort) {
+ CohortRegistryCommand(final ActorRef cohort) {
this.cohort = Preconditions.checkNotNull(cohort);
}
private final DOMDataTreeIdentifier path;
- RegisterCohort(DOMDataTreeIdentifier path, ActorRef cohort) {
+ RegisterCohort(final DOMDataTreeIdentifier path, final ActorRef cohort) {
super(cohort);
this.path = path;
static class RemoveCohort extends CohortRegistryCommand {
- RemoveCohort(ActorRef cohort) {
+ RemoveCohort(final ActorRef cohort) {
super(cohort);
}
private final Collection<DataTreeCohortActor.CanCommit> messages =
new ArrayList<>();
- CanCommitMessageBuilder(TransactionIdentifier txId, DataTreeCandidate candidate, SchemaContext schema) {
+ CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate, final SchemaContext schema) {
this.txId = Preconditions.checkNotNull(txId);
this.candidate = Preconditions.checkNotNull(candidate);
this.schema = schema;
}
- private void lookupAndCreateCanCommits(List<PathArgument> args, int offset,
- RegistrationTreeNode<ActorRef> node) {
+ private void lookupAndCreateCanCommits(final List<PathArgument> args, final int offset,
+ final RegistrationTreeNode<ActorRef> node) {
if (args.size() != offset) {
final PathArgument arg = args.get(offset);
}
}
- private void lookupAndCreateCanCommits(YangInstanceIdentifier path, RegistrationTreeNode<ActorRef> regNode,
- DataTreeCandidateNode candNode) {
+ private void lookupAndCreateCanCommits(final YangInstanceIdentifier path, final RegistrationTreeNode<ActorRef> regNode,
+ final DataTreeCandidateNode candNode) {
if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
LOG.debug("Skipping unmodified candidate {}", path);
return;
}
}
- private void createCanCommits(Collection<ActorRef> regs, YangInstanceIdentifier path,
- DataTreeCandidateNode node) {
+ private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
+ final DataTreeCandidateNode node) {
final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
for (final ActorRef reg : regs) {
final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg);
}
}
- private static DOMDataTreeIdentifier treeIdentifier(YangInstanceIdentifier path) {
+ private static DOMDataTreeIdentifier treeIdentifier(final YangInstanceIdentifier path) {
return new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, path);
}
- private Collection<DataTreeCohortActor.CanCommit> perform(RegistrationTreeNode<ActorRef> rootNode) {
+ private Collection<DataTreeCohortActor.CanCommit> perform(final RegistrationTreeNode<ActorRef> rootNode) {
final List<PathArgument> toLookup = candidate.getRootPath().getPathArguments();
lookupAndCreateCanCommits(toLookup, 0, rootNode);
return messages;
}
}
+ CompositeDataTreeCohort createCohort(final SchemaContext schemaContext, final TransactionIdentifier txId,
+ final Timeout commitStepTimeout) {
+ return new CompositeDataTreeCohort(this, txId, schemaContext, commitStepTimeout);
+ }
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import com.google.common.base.Ticker;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- protected Shard(AbstractBuilder<?, ?> builder) {
+ protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(),
- new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"),
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name);
+ ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+ ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+ if(builder.getDataTree() != null) {
+ store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
+ treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+ } else {
+ store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
+ treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+ }
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(store,
- datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
- datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name);
+ commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
setTransactionCommitTimeout();
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
- private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+ private Optional<ActorRef> createRoleChangeNotifier(final String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
return Optional.of(shardRoleChangeNotifier);
messageRetrySupport.close();
- if(txCommitTimeoutCheckSchedule != null) {
+ if (txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
setPeerAddress(resolved.getPeerId().toString(),
resolved.getPeerAddress());
} else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+ store.checkForExpiredTransactions(transactionCommitTimeout);
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
- } else if(message instanceof DatastoreContext) {
+ } else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
- } else if(message instanceof RegisterRoleChangeListener){
+ } else if (message instanceof RegisterRoleChangeListener){
roleChangeNotifier.get().forward(message, context());
} else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
- } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
+ } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){
sender().tell(getShardMBean(), self());
- } else if(message instanceof GetShardDataTree) {
+ } else if (message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
- } else if(message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved){
context().parent().forward(message, context());
- } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+ } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
- commitCoordinator.processCohortRegistryCommand(getSender(),
+ store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
super.handleNonRaftCommand(message);
}
public int getPendingTxCommitQueueSize() {
- return commitCoordinator.getQueueSize();
+ return store.getQueueSize();
}
public int getCohortCacheSize() {
}
@Override
- protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) {
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
- protected void onDatastoreContext(DatastoreContext context) {
+ protected void onDatastoreContext(final DatastoreContext context) {
datastoreContext = context;
- commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
-
setTransactionCommitTimeout();
- if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+ if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
setPersistence(true);
- } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+ } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
setPersistence(false);
}
updateConfigParams(datastoreContext.getShardRaftConfig());
}
- private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
- return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+ boolean canSkipPayload() {
+ // If we do not have any followers and we are not using persistence we can apply modification to the state
+ // immediately
+ return !hasFollowers() && !persistence().isRecoveryApplicable();
}
- void continueCommit(final CohortEntry cohortEntry) {
- final DataTreeCandidate candidate = cohortEntry.getCandidate();
- final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
-
- // If we do not have any followers and we are not using persistence
- // or if cohortEntry has no modifications
- // we can apply modification to the state immediately
- if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
- return;
- }
-
- final Payload payload;
- try {
- payload = CommitTransactionPayload.create(transactionId, candidate);
- } catch (IOException e) {
- LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
- e);
- // TODO: do we need to do something smarter here?
- throw Throwables.propagate(e);
- }
-
- persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
+ // applyState() will be invoked once consensus is reached on the payload
+ void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+ // We are faking the sender
+ persistData(self(), transactionId, payload);
}
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
- }
+ commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
- @Nonnull final CohortEntry cohortEntry) {
- LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
- try {
- try {
- cohortEntry.commit();
- } catch(ExecutionException e) {
- // We may get a "store tree and candidate base differ" IllegalStateException from commit under
- // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
- // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
- // applying it to the state. We then become the leader and a second tx is pre-committed and
- // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
- // candidate via applyState prior to the second tx. Since the second tx has already been
- // pre-committed, when it gets here to commit it will get an IllegalStateException.
-
- // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
- // solution will be forthcoming.
- if(e.getCause() instanceof IllegalStateException) {
- LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
- transactionID, e);
- store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
- } else {
- throw e;
- }
- }
-
- sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
-
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
- } catch (Exception e) {
- sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
- LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- transactionID, e);
- shardMBean.incrementFailedTransactionsCount();
- } finally {
- commitCoordinator.currentTransactionComplete(transactionID, true);
- }
- }
-
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
- // With persistence enabled, this method is called via applyState by the leader strategy
- // after the commit has been replicated to a majority of the followers.
-
- CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if (cohortEntry == null) {
- // The transaction is no longer the current commit. This can happen if the transaction
- // was aborted prior, most likely due to timeout in the front-end. We need to finish
- // committing the transaction though since it was successfully persisted and replicated
- // however we can't use the original cohort b/c it was already preCommitted and may
- // conflict with the current commit or may have been aborted so we commit with a new
- // transaction.
- cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
- if(cohortEntry != null) {
- try {
- store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
- } catch (DataValidationFailedException e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
- }
-
- sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
- getSelf());
- } else {
- // This really shouldn't happen - it likely means that persistence or replication
- // took so long to complete such that the cohort entry was expired from the cache.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
- persistenceId(), transactionID));
- LOG.error(ex.getMessage());
- sender.tell(new akka.actor.Status.Failure(ex), getSelf());
- }
- } else {
- finishCommit(sender, transactionID, cohortEntry);
- }
- }
-
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
}
}
- protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+ protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
} catch (Exception e) {
}
}
- private void handleBatchedModifications(BatchedModifications batched) {
+ private void handleBatchedModifications(final BatchedModifications batched) {
// This message is sent to prepare the modifications transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
// BatchedModifications message, the caller sets the ready flag in the message indicating
LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
newModifications.size(), leader);
- for(BatchedModifications bm: newModifications) {
+ for (BatchedModifications bm : newModifications) {
leader.forward(bm, getContext());
}
}
}
}
- private boolean failIfIsolatedLeader(ActorRef sender) {
- if(isIsolatedLeader()) {
+ private boolean failIfIsolatedLeader(final ActorRef sender) {
+ if (isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
"Shard %s was the leader but has lost contact with all of its followers. Either all" +
" other follower nodes are down or this node is isolated by a network partition.",
}
}
- private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+ private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) {
LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
- store.getSchemaContext());
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
- private void createTransaction(CreateTransaction createTransaction) {
+ private void createTransaction(final CreateTransaction createTransaction) {
try {
- if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+ if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
failIfIsolatedLeader(getSender())) {
return;
}
}
}
- private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
+ private ActorRef createTransaction(final int transactionType, final TransactionIdentifier transactionId) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
transactionId);
}
- private void commitWithNewTransaction(final BatchedModifications modification) {
- ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
- modification.apply(tx.getSnapshot());
- try {
- snapshotCohort.syncCommitTransaction(tx);
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- } catch (Exception e) {
- shardMBean.incrementFailedTransactionsCount();
- LOG.error("{}: Failed to commit", persistenceId(), e);
- }
- }
-
private void updateSchemaContext(final UpdateSchemaContext message) {
updateSchemaContext(message.getSchemaContext());
}
getContext().parent().tell(new ActorInitialized(), getSelf());
// Being paranoid here - this method should only be called once but just in case...
- if(txCommitTimeoutCheckSchedule == null) {
+ if (txCommitTimeoutCheckSchedule == null) {
// Schedule a message to be periodically sent to check if the current in-progress
// transaction should be expired and aborted.
FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
try {
- store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
+ store.applyStateFromLeader(identifier, (DataTreeCandidateSupplier)data);
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}
} else {
// Replication consensus reached, proceed to commit
- finishCommit(clientActor, identifier);
+ store.payloadReplicationComplete(identifier, (DataTreeCandidateSupplier)data);
}
} else {
LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
}
}
- private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
- if(modification == null) {
- LOG.error(
- "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
- } else if(clientActor == null) {
- // There's no clientActor to which to send a commit reply so we must be applying
- // replicated state from the leader.
-
- // The only implementation we know of is BatchedModifications, which also carries a transaction
- // identifier -- which we really need that.
- Preconditions.checkArgument(modification instanceof BatchedModifications);
- commitWithNewTransaction((BatchedModifications)modification);
- } else {
- // This must be the OK to commit after replication consensus.
- finishCommit(clientActor, identifier);
- }
- }
-
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
// If this actor is no longer the leader close all the transaction chains
if (!isLeader) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug(
"{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
persistenceId(), getId());
store.closeAllTransactionChains();
}
- if(hasLeader && !isIsolatedLeader()) {
+ if (hasLeader && !isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}
@Override
- protected void onLeaderChanged(String oldLeader, String newLeader) {
+ protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
boolean hasLeader = hasLeader();
- if(hasLeader && !isLeader()) {
+ if (hasLeader && !isLeader()) {
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
- if(leader != null) {
- Collection<Object> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
- datastoreContext.getShardBatchedModificationCount());
+ if (leader != null) {
+ Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
- if(!messagesToForward.isEmpty()) {
+ if (!messagesToForward.isEmpty()) {
LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
messagesToForward.size(), leader);
- for(Object message: messagesToForward) {
+ for (Object message : messagesToForward) {
leader.tell(message, self());
}
}
}
}
- if(hasLeader && !isIsolatedLeader()) {
+ if (hasLeader && !isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}
@Override
- protected void pauseLeader(Runnable operation) {
+ protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
- commitCoordinator.setRunOnPendingTransactionsComplete(operation);
+ store.setRunOnPendingTransactionsComplete(operation);
}
@Override
private DatastoreContext datastoreContext;
private SchemaContext schemaContext;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
+ private TipProducingDataTree dataTree;
private volatile boolean sealed;
- protected AbstractBuilder(Class<S> shardClass) {
+ protected AbstractBuilder(final Class<S> shardClass) {
this.shardClass = shardClass;
}
return (T) this;
}
- public T id(ShardIdentifier id) {
+ public T id(final ShardIdentifier id) {
checkSealed();
this.id = id;
return self();
}
- public T peerAddresses(Map<String, String> peerAddresses) {
+ public T peerAddresses(final Map<String, String> peerAddresses) {
checkSealed();
this.peerAddresses = peerAddresses;
return self();
}
- public T datastoreContext(DatastoreContext datastoreContext) {
+ public T datastoreContext(final DatastoreContext datastoreContext) {
checkSealed();
this.datastoreContext = datastoreContext;
return self();
}
- public T schemaContext(SchemaContext schemaContext) {
+ public T schemaContext(final SchemaContext schemaContext) {
checkSealed();
this.schemaContext = schemaContext;
return self();
}
- public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
checkSealed();
this.restoreFromSnapshot = restoreFromSnapshot;
return self();
}
+ public T dataTree(final TipProducingDataTree dataTree) {
+ checkSealed();
+ this.dataTree = dataTree;
+ return self();
+ }
+
public ShardIdentifier getId() {
return id;
}
return restoreFromSnapshot;
}
+ public TipProducingDataTree getDataTree() {
+ return dataTree;
+ }
+
public TreeType getTreeType() {
switch (datastoreContext.getLogicalStoreType()) {
case CONFIGURATION:
super(Shard.class);
}
}
+
+ Ticker ticker() {
+ return Ticker.systemTicker();
+ }
}
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
/**
*/
final class ShardCommitCoordinator {
- // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts.
+ @VisibleForTesting
public interface CohortDecorator {
ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
}
private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
- private CohortEntry currentCohortEntry;
-
private final ShardDataTree dataTree;
- private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
-
- // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
- // since this should only be accessed on the shard's dispatcher.
- private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
-
- private int queueCapacity;
-
private final Logger log;
private final String name;
- private final long cacheExpiryTimeoutInMillis;
-
- // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ // This is a hook for unit tests to replace or decorate the ShardDataTreeCohorts.
+ @VisibleForTesting
private CohortDecorator cohortDecorator;
private ReadyTransactionReply readyTransactionReply;
- private Runnable runOnPendingTransactionsComplete;
-
- ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
- String name) {
-
- this.queueCapacity = queueCapacity;
+ ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
this.log = log;
this.name = name;
this.dataTree = Preconditions.checkNotNull(dataTree);
- this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
- }
-
- int getQueueSize() {
- return queuedCohortEntries.size();
}
int getCohortCacheSize() {
return cohortCache.size();
}
- void setQueueCapacity(int queueCapacity) {
- this.queueCapacity = queueCapacity;
+ private String persistenceId() {
+ return dataTree.logContext();
}
- private ReadyTransactionReply readyTransactionReply(Shard shard) {
- if(readyTransactionReply == null) {
- readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
+ private ReadyTransactionReply readyTransactionReply(final ActorRef cohort) {
+ if (readyTransactionReply == null) {
+ readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(cohort));
}
return readyTransactionReply;
}
- private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
- if(queuedCohortEntries.size() < queueCapacity) {
- queuedCohortEntries.offer(cohortEntry);
-
- log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
- queuedCohortEntries.size());
-
- return true;
- } else {
- cohortCache.remove(cohortEntry.getTransactionID());
-
- final RuntimeException ex = new RuntimeException(
- String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
- " capacity %d has been reached.",
- name, cohortEntry.getTransactionID(), queueCapacity));
- log.error(ex.getMessage());
- sender.tell(new Failure(ex), shard.self());
- return false;
- }
- }
-
/**
* This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
* the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
* @param shard the transaction's shard actor
* @param schema
*/
- void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
- SchemaContext schema) {
+ void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
+ final Shard shard) {
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionID(), ready.getTxnClientVersion());
final ShardDataTreeCohort cohort = ready.getTransaction().ready();
- final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
- schema, ready.getTxnClientVersion());
+ final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
- if(!queueCohortEntry(cohortEntry, sender, shard)) {
- return;
- }
-
- if(ready.isDoImmediateCommit()) {
+ if (ready.isDoImmediateCommit()) {
cohortEntry.setDoImmediateCommit(true);
cohortEntry.setReplySender(sender);
cohortEntry.setShard(shard);
} else {
// The caller does not want immediate commit - the 3-phase commit will be coordinated by the
// front-end so send back a ReadyTransactionReply with our actor path.
- sender.tell(readyTransactionReply(shard), shard.self());
+ sender.tell(readyTransactionReply(shard.self()), shard.self());
}
}
* @param batched the BatchedModifications message to process
* @param sender the sender of the message
*/
- void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+ void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
- if(cohortEntry == null) {
- cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID()),
- cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
+ if (cohortEntry == null) {
+ cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()),
+ batched.getVersion());
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
}
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("{}: Applying {} batched modifications for Tx {}", name,
batched.getModifications().size(), batched.getTransactionID());
}
cohortEntry.applyModifications(batched.getModifications());
- if(batched.isReady()) {
- if(cohortEntry.getLastBatchedModificationsException() != null) {
+ if (batched.isReady()) {
+ if (cohortEntry.getLastBatchedModificationsException() != null) {
cohortCache.remove(cohortEntry.getTransactionID());
throw cohortEntry.getLastBatchedModificationsException();
}
- if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+ if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
cohortCache.remove(cohortEntry.getTransactionID());
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
}
- if(!queueCohortEntry(cohortEntry, sender, shard)) {
- return;
- }
-
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("{}: Readying Tx {}, client version {}", name,
batched.getTransactionID(), batched.getVersion());
}
- cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
+ cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
+ cohortEntry.ready(cohortDecorator);
- if(batched.isDoCommitOnReady()) {
+ if (batched.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
cohortEntry.setShard(shard);
handleCanCommit(cohortEntry);
} else {
- sender.tell(readyTransactionReply(shard), shard.self());
+ sender.tell(readyTransactionReply(shard.self()), shard.self());
}
} else {
sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
* @param sender the sender of the message
* @param shard the transaction's shard actor
*/
- void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
- final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
- message.getTransactionID());
- final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
- dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
+ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
+ final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(),
+ message.getModification());
+ final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
- if(!queueCohortEntry(cohortEntry, sender, shard)) {
- return;
- }
-
log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
if (message.isDoCommitOnReady()) {
cohortEntry.setShard(shard);
handleCanCommit(cohortEntry);
} else {
- sender.tell(readyTransactionReply(shard), shard.self());
+ sender.tell(readyTransactionReply(shard.self()), shard.self());
}
}
Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
final int maxModificationsPerBatch) {
- CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
- if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+ CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID());
+ if (cohortEntry == null || cohortEntry.getTransaction() == null) {
return Collections.singletonList(from);
}
cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
@Override
protected BatchedModifications getModifications() {
- if(newModifications.isEmpty() ||
+ if (newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
}
return newModifications;
}
- private void handleCanCommit(CohortEntry cohortEntry) {
- cohortEntry.updateLastAccessTime();
-
- if(currentCohortEntry != null) {
- // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
- // queue and will get processed after all prior entries complete.
+ private void handleCanCommit(final CohortEntry cohortEntry) {
+ cohortEntry.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID());
- if(log.isDebugEnabled()) {
- log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
- name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
+ if (cohortEntry.isDoImmediateCommit()) {
+ doCommit(cohortEntry);
+ } else {
+ cohortEntry.getReplySender().tell(
+ CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
+ }
}
- return;
- }
+ @Override
+ public void onFailure(final Throwable t) {
+ log.debug("{}: An exception occurred during canCommit for {}: {}", name,
+ cohortEntry.getTransactionID(), t);
- // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
- // it the current entry and proceed with canCommit.
- // Purposely checking reference equality here.
- if(queuedCohortEntries.peek() == cohortEntry) {
- currentCohortEntry = queuedCohortEntries.poll();
- doCanCommit(currentCohortEntry);
- } else {
- if(log.isDebugEnabled()) {
- log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
- queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
- cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionID());
+ cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self());
}
- }
+ });
}
/**
* @param sender the actor to which to send the response
* @param shard the transaction's shard actor
*/
- void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
+ void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
// Lookup the cohort entry that was cached previously (or should have been) by
// transactionReady (via the ForwardedReadyTransaction message).
final CohortEntry cohortEntry = cohortCache.get(transactionID);
- if(cohortEntry == null) {
- // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
- // between canCommit and ready and the entry was expired from the cache.
+ if (cohortEntry == null) {
+ // Either canCommit was invoked before ready (shouldn't happen) or a long time passed
+ // between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
- String.format("%s: No cohort entry found for transaction %s", name, transactionID));
+ String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
log.error(ex.getMessage());
sender.tell(new Failure(ex), shard.self());
return;
handleCanCommit(cohortEntry);
}
- private void doCanCommit(final CohortEntry cohortEntry) {
- boolean canCommit = false;
- try {
- canCommit = cohortEntry.canCommit();
-
- log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
-
- if(cohortEntry.isDoImmediateCommit()) {
- if(canCommit) {
- doCommit(cohortEntry);
- } else {
- cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
- "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
- }
- } else {
- cohortEntry.getReplySender().tell(
- canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
- CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
- cohortEntry.getShard().self());
- }
- } catch (Exception e) {
- log.debug("{}: An exception occurred during canCommit", name, e);
-
- Throwable failure = e;
- if(e instanceof ExecutionException) {
- failure = e.getCause();
- }
-
- cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
- } finally {
- if(!canCommit) {
- // Remove the entry from the cache now.
- currentTransactionComplete(cohortEntry.getTransactionID(), true);
- }
- }
- }
-
- private boolean doCommit(CohortEntry cohortEntry) {
+ private void doCommit(final CohortEntry cohortEntry) {
log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
- boolean success = false;
-
// We perform the preCommit phase here atomically with the commit phase. This is an
// optimization to eliminate the overhead of an extra preCommit message. We lose front-end
// coordination of preCommit across shards in case of failure but preCommit should not
// normally fail since we ensure only one concurrent 3-phase commit.
+ cohortEntry.preCommit(new FutureCallback<DataTreeCandidate>() {
+ @Override
+ public void onSuccess(final DataTreeCandidate candidate) {
+ finishCommit(cohortEntry.getReplySender(), cohortEntry);
+ }
- try {
- cohortEntry.preCommit();
+ @Override
+ public void onFailure(final Throwable t) {
+ log.error("{} An exception occurred while preCommitting transaction {}", name,
+ cohortEntry.getTransactionID(), t);
+
+ cohortCache.remove(cohortEntry.getTransactionID());
+ cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self());
+ }
+ });
+ }
- cohortEntry.getShard().continueCommit(cohortEntry);
+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+ log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
- cohortEntry.updateLastAccessTime();
+ cohortEntry.commit(new FutureCallback<UnsignedLong>() {
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ final TransactionIdentifier txId = cohortEntry.getTransactionID();
+ log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
+ sender);
- success = true;
- } catch (Exception e) {
- log.error("{} An exception occurred while preCommitting transaction {}",
- name, cohortEntry.getTransactionID(), e);
- cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
+ cohortCache.remove(cohortEntry.getTransactionID());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+ cohortEntry.getShard().self());
+ }
- currentTransactionComplete(cohortEntry.getTransactionID(), true);
- }
+ @Override
+ public void onFailure(final Throwable t) {
+ log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ cohortEntry.getTransactionID(), t);
- return success;
+ cohortCache.remove(cohortEntry.getTransactionID());
+ sender.tell(new Failure(t), cohortEntry.getShard().self());
+ }
+ });
}
/**
* @param transactionID the ID of the transaction to commit
* @param sender the actor to which to send the response
* @param shard the transaction's shard actor
- * @return true if the transaction was successfully prepared, false otherwise.
*/
- boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
- // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
- // this transaction.
- final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
- if(cohortEntry == null) {
- // We're not the current Tx - the Tx was likely expired b/c it took too long in
- // between the canCommit and commit messages.
+ void handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
+ final CohortEntry cohortEntry = cohortCache.get(transactionID);
+ if (cohortEntry == null) {
+ // Either a long time passed between canCommit and commit and the entry was expired from the cache
+ // or it was aborted.
IllegalStateException ex = new IllegalStateException(
- String.format("%s: Cannot commit transaction %s - it is not the current transaction",
- name, transactionID));
+ String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
log.error(ex.getMessage());
sender.tell(new Failure(ex), shard.self());
- return false;
+ return;
}
cohortEntry.setReplySender(sender);
- return doCommit(cohortEntry);
+ doCommit(cohortEntry);
}
void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
- CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
- if(cohortEntry != null) {
- // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
- // aborted during replication in which case we may still commit locally if replication
- // succeeds.
- currentTransactionComplete(transactionID, false);
- } else {
- cohortEntry = getAndRemoveCohortEntry(transactionID);
- }
-
- if(cohortEntry == null) {
+ CohortEntry cohortEntry = cohortCache.remove(transactionID);
+ if (cohortEntry == null) {
return;
}
shard.getShardMBean().incrementAbortTransactionsCount();
- if(sender != null) {
+ if (sender != null) {
sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
} catch (Exception e) {
log.error("{}: An exception happened during abort", name, e);
- if(sender != null) {
+ if (sender != null) {
sender.tell(new Failure(e), self);
}
}
}
void checkForExpiredTransactions(final long timeout, final Shard shard) {
- CohortEntry cohortEntry = getCurrentCohortEntry();
- if(cohortEntry != null) {
- if(cohortEntry.isExpired(timeout)) {
- log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
- name, cohortEntry.getTransactionID(), timeout);
-
- handleAbort(cohortEntry.getTransactionID(), null, shard);
+ Iterator<CohortEntry> iter = cohortCache.values().iterator();
+ while (iter.hasNext()) {
+ CohortEntry cohortEntry = iter.next();
+ if(cohortEntry.isFailed()) {
+ iter.remove();
}
}
-
- cleanupExpiredCohortEntries();
}
void abortPendingTransactions(final String reason, final Shard shard) {
- if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
- return;
- }
-
- List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+ final Failure failure = new Failure(new RuntimeException(reason));
+ Collection<ShardDataTreeCohort> pending = dataTree.getAndClearPendingTransactions();
- log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+ log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
- for(CohortEntry cohortEntry: cohortEntries) {
- if(cohortEntry.getReplySender() != null) {
- cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+ for (ShardDataTreeCohort cohort : pending) {
+ CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+ if (cohortEntry == null) {
+ continue;
}
- }
- }
-
- private List<CohortEntry> getAndClearPendingCohortEntries() {
- List<CohortEntry> cohortEntries = new ArrayList<>();
-
- if(currentCohortEntry != null) {
- cohortEntries.add(currentCohortEntry);
- cohortCache.remove(currentCohortEntry.getTransactionID());
- currentCohortEntry = null;
- }
- for(CohortEntry cohortEntry: queuedCohortEntries) {
- cohortEntries.add(cohortEntry);
- cohortCache.remove(cohortEntry.getTransactionID());
+ if (cohortEntry.getReplySender() != null) {
+ cohortEntry.getReplySender().tell(failure, shard.self());
+ }
}
- queuedCohortEntries.clear();
- return cohortEntries;
+ cohortCache.clear();
}
- Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
- if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
- return Collections.emptyList();
- }
-
- Collection<Object> messages = new ArrayList<>();
- List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
- for(CohortEntry cohortEntry: cohortEntries) {
- if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+ Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+ final Collection<VersionedExternalizableMessage> messages = new ArrayList<>();
+ for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) {
+ CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+ if (cohortEntry == null) {
continue;
}
- final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ final Deque<BatchedModifications> newMessages = new ArrayDeque<>();
cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
@Override
protected BatchedModifications getModifications() {
- if(newModifications.isEmpty() ||
- newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
- cohortEntry.getClientVersion()));
- }
+ final BatchedModifications lastBatch = newMessages.peekLast();
- return newModifications.getLast();
+ if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
+ return lastBatch;
+ }
+
+ // Allocate a new message
+ final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion());
+ newMessages.add(ret);
+ return ret;
}
});
- if(!newModifications.isEmpty()) {
- BatchedModifications last = newModifications.getLast();
- last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+ final BatchedModifications last = newMessages.peekLast();
+ if (last != null) {
+ final boolean immediate = cohortEntry.isDoImmediateCommit();
+ last.setDoCommitOnReady(immediate);
last.setReady(true);
- last.setTotalMessagesSent(newModifications.size());
- messages.addAll(newModifications);
-
- if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
- messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
- cohortEntry.getClientVersion()));
- }
+ last.setTotalMessagesSent(newMessages.size());
- if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
- messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
- cohortEntry.getClientVersion()));
- }
- }
- }
+ messages.addAll(newMessages);
- return messages;
- }
-
- /**
- * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
- * matches the current entry.
- *
- * @param transactionID the ID of the transaction
- * @return the current CohortEntry or null if the given transaction ID does not match the
- * current entry.
- */
- CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
- if(isCurrentTransaction(transactionID)) {
- return currentCohortEntry;
- }
-
- return null;
- }
-
- CohortEntry getCurrentCohortEntry() {
- return currentCohortEntry;
- }
-
- CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
- return cohortCache.remove(transactionID);
- }
-
- boolean isCurrentTransaction(Identifier transactionID) {
- return currentCohortEntry != null &&
- currentCohortEntry.getTransactionID().equals(transactionID);
- }
-
- /**
- * This method is called when a transaction is complete, successful or not. If the given
- * given transaction ID matches the current in-progress transaction, the next cohort entry,
- * if any, is dequeued and processed.
- *
- * @param transactionID the ID of the completed transaction
- * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
- * the cache.
- */
- void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
- if(removeCohortEntry) {
- cohortCache.remove(transactionID);
- }
-
- if(isCurrentTransaction(transactionID)) {
- currentCohortEntry = null;
-
- log.debug("{}: currentTransactionComplete: {}", name, transactionID);
-
- maybeProcessNextCohortEntry();
- }
- }
-
- private void maybeProcessNextCohortEntry() {
- // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
- // clean out expired entries.
- final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
- while(iter.hasNext()) {
- final CohortEntry next = iter.next();
- if(next.isReadyToCommit()) {
- if(currentCohortEntry == null) {
- if(log.isDebugEnabled()) {
- log.debug("{}: Next entry to canCommit {}", name, next);
+ if (!immediate) {
+ switch (cohort.getState()) {
+ case CAN_COMMIT_COMPLETE:
+ case CAN_COMMIT_PENDING:
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ break;
+ case PRE_COMMIT_COMPLETE:
+ case PRE_COMMIT_PENDING:
+ messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
+ break;
+ default:
+ break;
}
-
- iter.remove();
- currentCohortEntry = next;
- currentCohortEntry.updateLastAccessTime();
- doCanCommit(currentCohortEntry);
}
-
- break;
- } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
- log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
- name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
- } else if(!next.isAborted()) {
- break;
}
-
- iter.remove();
- cohortCache.remove(next.getTransactionID());
}
- maybeRunOperationOnPendingTransactionsComplete();
- }
-
- void cleanupExpiredCohortEntries() {
- maybeProcessNextCohortEntry();
- }
-
- void setRunOnPendingTransactionsComplete(Runnable operation) {
- runOnPendingTransactionsComplete = operation;
- maybeRunOperationOnPendingTransactionsComplete();
- }
-
- private void maybeRunOperationOnPendingTransactionsComplete() {
- if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
- log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
-
- runOnPendingTransactionsComplete.run();
- runOnPendingTransactionsComplete = null;
- }
+ return messages;
}
@VisibleForTesting
- void setCohortDecorator(CohortDecorator cohortDecorator) {
+ void setCohortDecorator(final CohortDecorator cohortDecorator) {
this.cohortDecorator = cohortDecorator;
}
-
- void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
- cohortRegistry.process(sender, message);
- }
}
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
/**
* Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
*/
@NotThreadSafe
public class ShardDataTree extends ShardDataTreeTransactionParent {
+ private static final class CommitEntry {
+ final SimpleShardDataTreeCohort cohort;
+ long lastAccess;
+
+ CommitEntry(final SimpleShardDataTreeCohort cohort, final long now) {
+ this.cohort = Preconditions.checkNotNull(cohort);
+ lastAccess = now;
+ }
+ }
+
+ private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+ private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+ private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
private final TipProducingDataTree dataTree;
private final String logContext;
+ private final Shard shard;
+ private Runnable runOnPendingTransactionsComplete;
+
private SchemaContext schemaContext;
- public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType,
+ public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TipProducingDataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
- dataTree = InMemoryDataTreeFactory.getInstance().create(treeType);
+ this.dataTree = dataTree;
updateSchemaContext(schemaContext);
+ this.shard = Preconditions.checkNotNull(shard);
this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
}
- public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) {
- this(schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+ public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
+ final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
+ final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
+ this(shard, schemaContext, InMemoryDataTreeFactory.getInstance().create(treeType),
+ treeChangeListenerPublisher, dataChangeListenerPublisher, logContext);
+ }
+
+ @VisibleForTesting
+ public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
+ this(shard, schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
new DefaultShardDataChangeListenerPublisher(), "");
}
+ String logContext() {
+ return logContext;
+ }
+
public TipProducingDataTree getDataTree() {
return dataTree;
}
}
void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
+ // FIXME: purge any outstanding transactions
+
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
return new SimpleEntry<>(reg, readCurrentData());
}
+ int getQueueSize() {
+ return pendingTransactions.size();
+ }
+
void applyForeignCandidate(final Identifier identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return new SimpleShardDataTreeCohort(this, snapshot, transaction.getId());
+
+ return createReadyCohort(transaction.getId(), snapshot);
}
public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
dataTree.commit(candidate);
return candidate;
}
+
+ public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
+ Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
+ for(CommitEntry entry: pendingTransactions) {
+ ret.add(entry.cohort);
+ }
+
+ pendingTransactions.clear();
+ return ret;
+ }
+
+ private void processNextTransaction() {
+ while (!pendingTransactions.isEmpty()) {
+ final CommitEntry entry = pendingTransactions.peek();
+ final SimpleShardDataTreeCohort cohort = entry.cohort;
+ final DataTreeModification modification = cohort.getDataTreeModification();
+
+ if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+ break;
+ }
+
+ LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+ Exception cause;
+ try {
+ dataTree.validate(modification);
+ LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+ cohort.successfulCanCommit();
+ entry.lastAccess = shard.ticker().read();
+ return;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+ e.getPath());
+ cause = new OptimisticLockFailedException("Optimistic lock failed.", e);
+ } catch (DataValidationFailedException e) {
+ LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+ e.getPath(), e);
+
+ // For debugging purposes, allow dumping of the modification. Coupled with the above
+ // precondition log, it should allow us to understand what went on.
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+ cause = new TransactionCommitFailedException("Data did not pass validation.", e);
+ } catch (Exception e) {
+ LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
+ cause = e;
+ }
+
+ // Failure path: propagate the failure, remove the transaction from the queue and loop to the next one
+ pendingTransactions.poll().cohort.failedCanCommit(cause);
+ }
+
+ maybeRunOperationOnPendingTransactionsComplete();
+ }
+
+ void startCanCommit(final SimpleShardDataTreeCohort cohort) {
+ final SimpleShardDataTreeCohort current = pendingTransactions.peek().cohort;
+ if (!cohort.equals(current)) {
+ LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ processNextTransaction();
+ }
+
+ private void failPreCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingTransactions.poll().cohort.failedPreCommit(cause);
+ processNextTransaction();
+ }
+
+ void startPreCommit(final SimpleShardDataTreeCohort cohort) {
+ final CommitEntry entry = pendingTransactions.peek();
+ Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
+ final DataTreeCandidateTip candidate;
+ try {
+ candidate = dataTree.prepare(cohort.getDataTreeModification());
+ } catch (Exception e) {
+ failPreCommit(e);
+ return;
+ }
+
+ try {
+ cohort.userPreCommit(candidate);
+ } catch (ExecutionException | TimeoutException e) {
+ failPreCommit(e);
+ return;
+ }
+
+ entry.lastAccess = shard.ticker().read();
+ cohort.successfulPreCommit(candidate);
+ }
+
+ private void failCommit(final Exception cause) {
+ shard.getShardMBean().incrementFailedTransactionsCount();
+ pendingTransactions.poll().cohort.failedCommit(cause);
+ processNextTransaction();
+ }
+
+ private void finishCommit(final SimpleShardDataTreeCohort cohort) {
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final DataTreeCandidate candidate = cohort.getCandidate();
+
+ LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
+
+ try {
+ try {
+ dataTree.commit(candidate);
+ } catch (IllegalStateException e) {
+ // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+ // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+ // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+ // applying it to the state. We then become the leader and a second tx is pre-committed and
+ // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+ // candidate via applyState prior to the second tx. Since the second tx has already been
+ // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+ // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+ // solution will be forthcoming.
+
+ LOG.debug("{}: Commit failed for transaction {} - retrying as foreign candidate", logContext, txId, e);
+ applyForeignCandidate(txId, candidate);
+ }
+ } catch (Exception e) {
+ failCommit(e);
+ return;
+ }
+
+ shard.getShardMBean().incrementCommittedTransactionCount();
+ shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ // FIXME: propagate journal index
+
+ pendingTransactions.poll().cohort.successfulCommit(UnsignedLong.ZERO);
+
+ LOG.trace("{}: Transaction {} committed, proceeding to notify", logContext, txId);
+ notifyListeners(candidate);
+
+ processNextTransaction();
+ }
+
+ void startCommit(final SimpleShardDataTreeCohort cohort, final DataTreeCandidate candidate) {
+ final CommitEntry entry = pendingTransactions.peek();
+ Preconditions.checkState(entry != null, "Attempted to start commit of %s when no transactions pending", cohort);
+
+ final SimpleShardDataTreeCohort current = entry.cohort;
+ Verify.verify(cohort.equals(current), "Attempted to commit %s while %s is pending", cohort, current);
+
+ if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
+ LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
+ finishCommit(cohort);
+ return;
+ }
+
+ final TransactionIdentifier txId = cohort.getIdentifier();
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(txId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
+ pendingTransactions.poll().cohort.failedCommit(e);
+ return;
+ }
+
+ // Once completed, we will continue via payloadReplicationComplete
+ entry.lastAccess = shard.ticker().read();
+ shard.persistPayload(txId, payload);
+ LOG.debug("{}: Transaction {} submitted to persistence", logContext, txId);
+ }
+
+ private void payloadReplicationComplete(final TransactionIdentifier txId, final DataTreeCandidateSupplier payload) {
+ final CommitEntry current = pendingTransactions.peek();
+ if (current == null) {
+ LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+ return;
+ }
+
+ if (!current.cohort.getIdentifier().equals(txId)) {
+ LOG.warn("{}: Head of queue is {}, ignoring consensus on transaction {}", logContext,
+ current.cohort.getIdentifier(), txId);
+ return;
+ }
+
+ finishCommit(current.cohort);
+ }
+
+ void payloadReplicationComplete(final Identifier identifier, final DataTreeCandidateSupplier payload) {
+ // For now we do not care about anything else but transactions
+ Verify.verify(identifier instanceof TransactionIdentifier);
+ payloadReplicationComplete((TransactionIdentifier)identifier, payload);
+ }
+
+ void processCohortRegistryCommand(final ActorRef sender, final CohortRegistryCommand message) {
+ cohortRegistry.process(sender, message);
+ }
+
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
+ final DataTreeModification modification) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+ cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
+ pendingTransactions.add(new CommitEntry(cohort, shard.ticker().read()));
+ return cohort;
+ }
+
+ void applyStateFromLeader(final Identifier identifier, final DataTreeCandidateSupplier payload)
+ throws DataValidationFailedException, IOException {
+ applyForeignCandidate(identifier, payload.getCandidate().getValue());
+ }
+
+ void checkForExpiredTransactions(final long transactionCommitTimeoutMillis) {
+ final long timeout = TimeUnit.MILLISECONDS.toNanos(transactionCommitTimeoutMillis);
+ final long now = shard.ticker().read();
+ final CommitEntry currentTx = pendingTransactions.peek();
+ if (currentTx != null && currentTx.lastAccess + timeout < now) {
+ LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
+ currentTx.cohort.getIdentifier(), transactionCommitTimeoutMillis, currentTx.cohort.getState());
+ boolean processNext = true;
+ switch (currentTx.cohort.getState()) {
+ case CAN_COMMIT_PENDING:
+ pendingTransactions.poll().cohort.failedCanCommit(new TimeoutException());
+ break;
+ case CAN_COMMIT_COMPLETE:
+ pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ break;
+ case PRE_COMMIT_PENDING:
+ pendingTransactions.poll().cohort.failedPreCommit(new TimeoutException());
+ break;
+ case PRE_COMMIT_COMPLETE:
+ // FIXME: this is a legacy behavior problem. Three-phase commit protocol specifies that after we
+ // are ready we should commit the transaction, not abort it. Our current software stack does
+ // not allow us to do that consistently, because we persist at the time of commit, hence
+ // we can end up in a state where we have pre-committed a transaction, then a leader failover
+ // occurred ... the new leader does not see the pre-committed transaction and does not have
+ // a running timer. To fix this we really need two persistence events.
+ //
+ // The first one, done at pre-commit time will hold the transaction payload. When consensus
+ // is reached, we exit the pre-commit phase and start the pre-commit timer. Followers do not
+ // apply the state in this event.
+ //
+ // The second one, done at commit (or abort) time holds only the transaction identifier and
+ // signals to followers that the state should (or should not) be applied.
+ //
+ // In order to make the pre-commit timer working across failovers, though, we need
+ // a per-shard cluster-wide monotonic time, so a follower becoming the leader can accurately
+ // restart the timer.
+ pendingTransactions.poll().cohort.reportFailure(new TimeoutException());
+ break;
+ case COMMIT_PENDING:
+ LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
+ currentTx.cohort.getIdentifier());
+ currentTx.lastAccess = now;
+ processNext = false;
+ return;
+ case ABORTED:
+ case COMMITTED:
+ case FAILED:
+ case READY:
+ default:
+ pendingTransactions.poll();
+ }
+
+ if (processNext) {
+ processNextTransaction();
+ }
+ }
+ }
+
+ void startAbort(final SimpleShardDataTreeCohort cohort) {
+ final Iterator<CommitEntry> it = pendingTransactions.iterator();
+ if (!it.hasNext()) {
+ LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+ return;
+ }
+
+ // First entry is special, as it may already be committing
+ final CommitEntry first = it.next();
+ if (cohort.equals(first.cohort)) {
+ if (cohort.getState() != State.COMMIT_PENDING) {
+ LOG.debug("{}: aborted head of queue {} in state {}", logContext, cohort.getIdentifier(),
+ cohort.getIdentifier());
+ pendingTransactions.poll();
+ processNextTransaction();
+ } else {
+ LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+ }
+
+ return;
+ }
+
+ while (it.hasNext()) {
+ final CommitEntry e = it.next();
+ if (cohort.equals(e.cohort)) {
+ LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+ it.remove();
+ return;
+ }
+ }
+
+ LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+ }
+
+ void setRunOnPendingTransactionsComplete(final Runnable operation) {
+ runOnPendingTransactionsComplete = operation;
+ maybeRunOperationOnPendingTransactionsComplete();
+ }
+
+ private void maybeRunOperationOnPendingTransactionsComplete() {
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-public abstract class ShardDataTreeCohort {
+public abstract class ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
+ public enum State {
+ READY,
+ CAN_COMMIT_PENDING,
+ CAN_COMMIT_COMPLETE,
+ PRE_COMMIT_PENDING,
+ PRE_COMMIT_COMPLETE,
+ COMMIT_PENDING,
+
+ ABORTED,
+ COMMITTED,
+ FAILED,
+ }
+
ShardDataTreeCohort() {
// Prevent foreign instantiation
}
// FIXME: This leaks internal state generated in preCommit,
// should be result of canCommit
abstract DataTreeCandidateTip getCandidate();
+
abstract DataTreeModification getDataTreeModification();
// FIXME: Should return rebased DataTreeCandidateTip
@VisibleForTesting
- public abstract ListenableFuture<Boolean> canCommit();
+ public abstract void canCommit(FutureCallback<Void> callback);
+
@VisibleForTesting
- public abstract ListenableFuture<Void> preCommit();
+ public abstract void preCommit(FutureCallback<DataTreeCandidate> callback);
+
@VisibleForTesting
public abstract ListenableFuture<Void> abort();
+
@VisibleForTesting
- public abstract ListenableFuture<Void> commit();
+ public abstract void commit(FutureCallback<UnsignedLong> callback);
+
+ public abstract boolean isFailed();
+
+ public abstract State getState();
}
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendType;
}
- void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
- throws ExecutionException, InterruptedException {
- ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
- commitCohort.preCommit().get();
- commitCohort.commit().get();
- }
-
@Override
public void applySnapshot(final byte[] snapshotBytes) {
// Since this will be done only on Recovery or when this actor is a Follower
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
-final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+final class SimpleShardDataTreeCohort extends ShardDataTreeCohort implements Identifiable<TransactionIdentifier> {
private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
- private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
private final DataTreeModification transaction;
private final ShardDataTree dataTree;
private final TransactionIdentifier transactionId;
+ private final CompositeDataTreeCohort userCohorts;
+
+ private State state = State.READY;
private DataTreeCandidateTip candidate;
+ private FutureCallback<?> callback;
+ private Exception nextFailure;
SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
- final TransactionIdentifier transactionId) {
+ final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
this.dataTree = Preconditions.checkNotNull(dataTree);
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionId = Preconditions.checkNotNull(transactionId);
+ this.userCohorts = Preconditions.checkNotNull(userCohorts);
+ }
+
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return transactionId;
}
@Override
}
@Override
- public ListenableFuture<Boolean> canCommit() {
- DataTreeModification modification = getDataTreeModification();
- try {
- dataTree.getDataTree().validate(modification);
- LOG.trace("Transaction {} validated", transaction);
- return TRUE_FUTURE;
- }
- catch (ConflictingModificationAppliedException e) {
- LOG.warn("Store Tx {}: Conflicting modification for path {}.", transactionId, e.getPath());
- return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
- } catch (DataValidationFailedException e) {
- LOG.warn("Store Tx {}: Data validation failed for path {}.", transactionId, e.getPath(), e);
-
- // For debugging purposes, allow dumping of the modification. Coupled with the above
- // precondition log, it should allow us to understand what went on.
- LOG.debug("Store Tx {}: modifications: {} tree: {}", transactionId, modification, dataTree.getDataTree());
-
- return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
- } catch (Exception e) {
- LOG.warn("Unexpected failure in validation phase", e);
- return Futures.immediateFailedFuture(e);
+ public DataTreeModification getDataTreeModification() {
+ DataTreeModification dataTreeModification = transaction;
+ if (transaction instanceof PruningDataTreeModification){
+ dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
}
+ return dataTreeModification;
+ }
+
+ private void checkState(State expected) {
+ Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
}
@Override
- public ListenableFuture<Void> preCommit() {
- try {
- candidate = dataTree.getDataTree().prepare(getDataTreeModification());
- /*
- * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
- * persist on the candidate (which gives us a Future).
- */
- LOG.trace("Transaction {} prepared candidate {}", transaction, candidate);
- return VOID_FUTURE;
- } catch (Exception e) {
- if(LOG.isTraceEnabled()) {
- LOG.trace("Transaction {} failed to prepare", transaction, e);
- } else {
- LOG.error("Transaction failed to prepare", e);
- }
- return Futures.immediateFailedFuture(e);
+ public void canCommit(final FutureCallback<Void> callback) {
+ if(state == State.CAN_COMMIT_PENDING) {
+ return;
}
+
+ checkState(State.READY);
+ this.callback = Preconditions.checkNotNull(callback);
+ state = State.CAN_COMMIT_PENDING;
+ dataTree.startCanCommit(this);
}
@Override
- DataTreeModification getDataTreeModification() {
- DataTreeModification dataTreeModification = transaction;
- if(transaction instanceof PruningDataTreeModification){
- dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
+ public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+ checkState(State.CAN_COMMIT_COMPLETE);
+ this.callback = Preconditions.checkNotNull(callback);
+ state = State.PRE_COMMIT_PENDING;
+
+ if (nextFailure == null) {
+ dataTree.startPreCommit(this);
+ } else {
+ failedPreCommit(nextFailure);
}
- return dataTreeModification;
}
@Override
public ListenableFuture<Void> abort() {
- // No-op, really
- return VOID_FUTURE;
+ dataTree.startAbort(this);
+ state = State.ABORTED;
+
+ final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
+ if (!maybeAborts.isPresent()) {
+ return VOID_FUTURE;
+ }
+
+ final Future<Iterable<Object>> aborts = maybeAborts.get();
+ if (aborts.isCompleted()) {
+ return VOID_FUTURE;
+ }
+
+ final SettableFuture<Void> ret = SettableFuture.create();
+ aborts.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(final Throwable failure, final Iterable<Object> objs) {
+ if (failure != null) {
+ ret.setException(failure);
+ } else {
+ ret.set(null);
+ }
+ }
+ }, ExecutionContexts.global());
+
+ return ret;
}
@Override
- public ListenableFuture<Void> commit() {
+ public void commit(final FutureCallback<UnsignedLong> callback) {
+ checkState(State.PRE_COMMIT_COMPLETE);
+ this.callback = Preconditions.checkNotNull(callback);
+ state = State.COMMIT_PENDING;
+ dataTree.startCommit(this, candidate);
+ }
+
+ private <T> FutureCallback<T> switchState(final State newState) {
+ @SuppressWarnings("unchecked")
+ final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
+ this.callback = null;
+ LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
+ this.state = newState;
+ return ret;
+ }
+
+ void successfulCanCommit() {
+ switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
+ }
+
+ void failedCanCommit(final Exception cause) {
+ switchState(State.FAILED).onFailure(cause);
+ }
+
+ /**
+ * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
+ * any failure to validate is propagated before we record the transaction.
+ *
+ * @param candidate {@link DataTreeCandidate} under consideration
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ // FIXME: this should be asynchronous
+ void userPreCommit(final DataTreeCandidate candidate) throws ExecutionException, TimeoutException {
+ userCohorts.canCommit(candidate);
+ userCohorts.preCommit();
+ }
+
+ void successfulPreCommit(final DataTreeCandidateTip candidate) {
+ LOG.trace("Transaction {} prepared candidate {}", transaction, candidate);
+ this.candidate = Verify.verifyNotNull(candidate);
+ switchState(State.PRE_COMMIT_COMPLETE).onSuccess(candidate);
+ }
+
+ void failedPreCommit(final Exception cause) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Transaction {} failed to prepare", transaction, cause);
+ } else {
+ LOG.error("Transaction {} failed to prepare", transactionId, cause);
+ }
+
+ userCohorts.abort();
+ switchState(State.FAILED).onFailure(cause);
+ }
+
+ void successfulCommit(final UnsignedLong journalIndex) {
try {
- dataTree.getDataTree().commit(candidate);
- } catch (Exception e) {
- if(LOG.isTraceEnabled()) {
- LOG.trace("Transaction {} failed to commit", transaction, e);
- } else {
- LOG.error("Transaction failed to commit", e);
- }
- return Futures.immediateFailedFuture(e);
+ userCohorts.commit();
+ } catch (TimeoutException | ExecutionException e) {
+ // We are probably dead, depending on what the cohorts end up doing
+ LOG.error("User cohorts failed to commit", e);
}
- LOG.trace("Transaction {} committed, proceeding to notify", transaction);
- dataTree.notifyListeners(candidate);
- return VOID_FUTURE;
+ switchState(State.COMMITTED).onSuccess(journalIndex);
+ }
+
+ void failedCommit(final Exception cause) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Transaction {} failed to commit", transaction, cause);
+ } else {
+ LOG.error("Transaction failed to commit", cause);
+ }
+
+ userCohorts.abort();
+ switchState(State.FAILED).onFailure(cause);
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ void reportFailure(final Exception cause) {
+ this.nextFailure = Preconditions.checkNotNull(cause);
+ }
+
+ @Override
+ public boolean isFailed() {
+ return state == State.FAILED || nextFailure != null;
}
}
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
}
- protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
- final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
- final MutableCompositeModification modification) {
- return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
- }
-
- protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
- final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
- final MutableCompositeModification modification,
- final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
-
- final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction(nextTransactionId());
- tx.getSnapshot().write(path, data);
- final ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
-
- modification.addModification(new WriteModification(path, data));
-
- return cohort;
- }
-
- protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
- final ShardDataTreeCohort actual) {
- return createDelegatingMockCohort(cohortName, actual, null);
- }
+ protected TipProducingDataTree createDelegatingMockDataTree() throws Exception {
+ TipProducingDataTree actual = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+ final TipProducingDataTree mock = mock(TipProducingDataTree.class);
- protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
- final ShardDataTreeCohort actual,
- final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName);
+ doAnswer(invocation -> {
+ actual.validate(invocation.getArgumentAt(0, DataTreeModification.class));
+ return null;
+ }).when(mock).validate(any(DataTreeModification.class));
- doAnswer(new Answer<ListenableFuture<Boolean>>() {
- @Override
- public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
- return actual.canCommit();
- }
- }).when(cohort).canCommit();
+ doAnswer(invocation -> {
+ return actual.prepare(invocation.getArgumentAt(0, DataTreeModification.class));
+ }).when(mock).prepare(any(DataTreeModification.class));
- doAnswer(new Answer<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- if(preCommit != null) {
- return preCommit.apply(actual);
- } else {
- return actual.preCommit();
- }
- }
- }).when(cohort).preCommit();
+ doAnswer(invocation -> {
+ actual.commit(invocation.getArgumentAt(0, DataTreeCandidate.class));
+ return null;
+ }).when(mock).commit(any(DataTreeCandidate.class));
- doAnswer(new Answer<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- return actual.commit();
- }
- }).when(cohort).commit();
+ doAnswer(invocation -> {
+ actual.setSchemaContext(invocation.getArgumentAt(0, SchemaContext.class));
+ return null;
+ }).when(mock).setSchemaContext(any(SchemaContext.class));
- doAnswer(new Answer<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
- return actual.abort();
- }
- }).when(cohort).abort();
+ doAnswer(invocation -> {
+ return actual.takeSnapshot();
+ }).when(mock).takeSnapshot();
- doAnswer(new Answer<DataTreeCandidateTip>() {
- @Override
- public DataTreeCandidateTip answer(final InvocationOnMock invocation) {
- return actual.getCandidate();
- }
- }).when(cohort).getCandidate();
+ doAnswer(invocation -> {
+ return actual.getRootPath();
+ }).when(mock).getRootPath();
- return cohort;
- }
-
- protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
- TransactionIdentifier transactionID, MutableCompositeModification modification,
- boolean doCommitOnReady) {
- if(remoteReadWriteTransaction){
- return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION,
- doCommitOnReady);
- } else {
- setupCohortDecorator(shard, cohort);
- return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
- }
+ return mock;
}
protected ShardDataTreeCohort mockShardDataTreeCohort() {
ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class);
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- doReturn(mockCandidate("candidate")).when(cohort).getCandidate();
+ DataTreeCandidate candidate = mockCandidate("candidate");
+ successfulCanCommit(cohort);
+ successfulPreCommit(cohort, candidate);
+ successfulCommit(cohort);
+ doReturn(candidate).when(cohort).getCandidate();
return cohort;
}
- static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) {
- ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class);
- doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class));
- doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class));
- return mockParent;
- }
-
- protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort,
- TransactionIdentifier transactionID, short version, boolean doCommitOnReady) {
- return new ForwardedReadyTransaction(transactionID, version,
- new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID,
- mock(DataTreeModification.class)), doCommitOnReady);
- }
-
- protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
- TransactionIdentifier transactionID, MutableCompositeModification modification) {
- return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false);
- }
+ protected Map<TransactionIdentifier, CapturingShardDataTreeCohort> setupCohortDecorator(final Shard shard,
+ final TransactionIdentifier... transactionIDs) {
+ final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = new HashMap<>();
+ for(TransactionIdentifier id: transactionIDs) {
+ cohortMap.put(id, new CapturingShardDataTreeCohort());
+ }
- protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) {
shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() {
@Override
- public ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual) {
+ public ShardDataTreeCohort decorate(final Identifier transactionID, final ShardDataTreeCohort actual) {
+ CapturingShardDataTreeCohort cohort = cohortMap.get(transactionID);
+ cohort.setDelegate(actual);
return cohort;
}
});
+
+ return cohortMap;
}
- protected BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID,
- MutableCompositeModification modification) {
+ protected BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+ final MutableCompositeModification modification) {
return prepareBatchedModifications(transactionID, modification, false);
}
- private static BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID,
- MutableCompositeModification modification,
- boolean doCommitOnReady) {
+ protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+ final MutableCompositeModification modification,
+ final boolean doCommitOnReady) {
final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION);
batchedModifications.addModification(modification);
batchedModifications.setReady(true);
return batchedModifications;
}
+ protected static BatchedModifications prepareBatchedModifications(final TransactionIdentifier transactionID,
+ final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean doCommitOnReady) {
+ final MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new WriteModification(path, data));
+ return prepareBatchedModifications(transactionID, modification, doCommitOnReady);
+ }
+
+ protected static ForwardedReadyTransaction prepareForwardedReadyTransaction(final TestActorRef<Shard> shard,
+ final TransactionIdentifier transactionID, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data, final boolean doCommitOnReady) {
+ ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+ newReadWriteTransaction(transactionID);
+ rwTx.getSnapshot().write(path, data);
+ return new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, doCommitOnReady);
+ }
public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard, final YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
transaction.getSnapshot().write(id, node);
final ShardDataTreeCohort cohort = transaction.ready();
- cohort.canCommit().get();
- cohort.preCommit().get();
- cohort.commit();
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ immediateCommit(cohort);
}
public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
transaction.getSnapshot().merge(id, node);
final ShardDataTreeCohort cohort = transaction.ready();
- cohort.canCommit().get();
- cohort.preCommit().get();
- cohort.commit();
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ immediateCommit(cohort);
}
public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
return delegate.create();
}
}
+
+ public static class CapturingShardDataTreeCohort extends ShardDataTreeCohort {
+ private volatile ShardDataTreeCohort delegate;
+ private FutureCallback<Void> canCommit;
+ private FutureCallback<DataTreeCandidate> preCommit;
+ private FutureCallback<UnsignedLong> commit;
+
+ public void setDelegate(ShardDataTreeCohort delegate) {
+ this.delegate = delegate;
+ }
+
+ public FutureCallback<Void> getCanCommit() {
+ assertNotNull("canCommit was not invoked", canCommit);
+ return canCommit;
+ }
+
+ public FutureCallback<DataTreeCandidate> getPreCommit() {
+ assertNotNull("preCommit was not invoked", preCommit);
+ return preCommit;
+ }
+
+ public FutureCallback<UnsignedLong> getCommit() {
+ assertNotNull("commit was not invoked", commit);
+ return commit;
+ }
+
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return delegate.getIdentifier();
+ }
+
+ @Override
+ DataTreeCandidateTip getCandidate() {
+ return delegate.getCandidate();
+ }
+
+ @Override
+ DataTreeModification getDataTreeModification() {
+ return delegate.getDataTreeModification();
+ }
+
+ @Override
+ public void canCommit(FutureCallback<Void> callback) {
+ canCommit = mockFutureCallback(callback);
+ delegate.canCommit(canCommit);
+ }
+
+ @Override
+ public void preCommit(FutureCallback<DataTreeCandidate> callback) {
+ preCommit = mockFutureCallback(callback);
+ delegate.preCommit(preCommit);
+ }
+
+ @Override
+ public void commit(FutureCallback<UnsignedLong> callback) {
+ commit = mockFutureCallback(callback);
+ delegate.commit(commit);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> FutureCallback<T> mockFutureCallback(final FutureCallback<T> actual ) {
+ FutureCallback<T> mock = mock(FutureCallback.class);
+ doAnswer(invocation -> {
+ actual.onFailure(invocation.getArgumentAt(0, Throwable.class));
+ return null;
+ }).when(mock).onFailure(any(Throwable.class));
+
+ doAnswer(invocation -> {
+ actual.onSuccess((T) invocation.getArgumentAt(0, Throwable.class));
+ return null;
+ }).when(mock).onSuccess((T) any(Object.class));
+
+ return mock;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ public boolean isFailed() {
+ return delegate.isFailed();
+ }
+
+ @Override
+ public State getState() {
+ return delegate.getState();
+ }
+ }
}
private DataChangeListenerSupport support;
@Before
- public void setup() {
+ public void setup() throws InterruptedException {
shard = createShard();
support = new DataChangeListenerSupport(shard);
}
listener.verifyCreatedData(0, innerEntryPath(2, "four"));
}
- private MockDataChangeListener registerChangeListener(YangInstanceIdentifier path, DataChangeScope scope,
- int expectedEvents, boolean isLeader) {
+ private MockDataChangeListener registerChangeListener(final YangInstanceIdentifier path, final DataChangeScope scope,
+ final int expectedEvents, final boolean isLeader) {
MockDataChangeListener listener = new MockDataChangeListener(expectedEvents);
ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener));
private Shard createShard() {
TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
+ ShardTestKit.waitUntilLeader(actor);
+
return actor.underlyingActor();
}
}
listener2.verifyNoNotifiedData(innerEntryPath(2, "three"), innerEntryPath(2, "four"));
}
- private MockDataTreeChangeListener registerChangeListener(YangInstanceIdentifier path,
- int expectedEvents, boolean isLeader) {
+ private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path,
+ final int expectedEvents, final boolean isLeader) {
MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
private Shard createShard() {
TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
+ ShardTestKit.waitUntilLeader(actor);
return actor.underlyingActor();
}
}
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
public class DistributedDataStoreIntegrationTest {
}
@Test
- public void testRestoreFromDatastoreSnapshot() throws Exception{
+ public void testRestoreFromDatastoreSnapshot() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
String name = "transactionIntegrationTest";
CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
- ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
+ DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+ dataTree.setSchemaContext(SchemaContextHelper.full());
AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
- NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
- YangInstanceIdentifier.EMPTY);
+ NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
- Snapshot carsSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(),
+ Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
NormalizedNode<?, ?> peopleNode = PeopleModel.create();
- dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL);
+ dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+ dataTree.setSchemaContext(SchemaContextHelper.full());
AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
- root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.EMPTY);
+ root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
- Snapshot peopleSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(),
+ Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+public final class ShardDataTreeMocking {
+
+ private ShardDataTreeMocking() {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> FutureCallback<T> mockCallback() {
+ return mock(FutureCallback.class);
+ }
+
+ public static ShardDataTreeCohort immediateCanCommit(final ShardDataTreeCohort cohort) {
+ final FutureCallback<Void> callback = mockCallback();
+ doNothing().when(callback).onSuccess(null);
+ cohort.canCommit(callback);
+
+ verify(callback).onSuccess(null);
+ verifyNoMoreInteractions(callback);
+ return cohort;
+ }
+
+ public static ShardDataTreeCohort immediatePreCommit(final ShardDataTreeCohort cohort) {
+ final FutureCallback<DataTreeCandidate> callback = mockCallback();
+ doNothing().when(callback).onSuccess(any(DataTreeCandidate.class));
+ cohort.preCommit(callback);
+
+ verify(callback).onSuccess(any(DataTreeCandidate.class));
+ verifyNoMoreInteractions(callback);
+ return cohort;
+ }
+
+ public static ShardDataTreeCohort immediateCommit(final ShardDataTreeCohort cohort) {
+ final FutureCallback<UnsignedLong> callback = mockCallback();
+ doNothing().when(callback).onSuccess(any(UnsignedLong.class));
+ cohort.commit(callback);
+
+ verify(callback, timeout(5000)).onSuccess(any(UnsignedLong.class));
+ verifyNoMoreInteractions(callback);
+ return cohort;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Object invokeSuccess(final InvocationOnMock invocation, final T value) {
+ invocation.getArgumentAt(0, FutureCallback.class).onSuccess(value);
+ return null;
+ }
+
+ private static Object invokeFailure(final InvocationOnMock invocation) {
+ invocation.getArgumentAt(0, FutureCallback.class).onFailure(mock(Exception.class));
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ShardDataTreeCohort failedCanCommit(final ShardDataTreeCohort mock) {
+ doAnswer(invocation -> {
+ return invokeFailure(invocation);
+ }).when(mock).canCommit(any(FutureCallback.class));
+ return mock;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ShardDataTreeCohort failedPreCommit(final ShardDataTreeCohort mock) {
+ doAnswer(invocation -> {
+ return invokeFailure(invocation);
+ }).when(mock).preCommit(any(FutureCallback.class));
+ return mock;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ShardDataTreeCohort failedCommit(final ShardDataTreeCohort mock) {
+ doAnswer(invocation -> {
+ return invokeFailure(invocation);
+ }).when(mock).commit(any(FutureCallback.class));
+ return mock;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ShardDataTreeCohort successfulCanCommit(final ShardDataTreeCohort mock) {
+ doAnswer(invocation -> {
+ return invokeSuccess(invocation, null);
+ }).when(mock).canCommit(any(FutureCallback.class));
+
+ return mock;
+ }
+
+ public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock) {
+ return successfulPreCommit(mock, mock(DataTreeCandidate.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock, final DataTreeCandidate candidate) {
+ doAnswer(invocation -> {
+ return invokeSuccess(invocation, candidate);
+ }).when(mock).preCommit(any(FutureCallback.class));
+
+ return mock;
+ }
+
+ public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock) {
+ return successfulCommit(mock, UnsignedLong.ZERO);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock, final UnsignedLong index) {
+ doAnswer(invocation -> {
+ return invokeSuccess(invocation, index);
+ }).when(mock).commit(any(FutureCallback.class));
+
+ return mock;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void assertSequencedCommit(final ShardDataTreeCohort mock) {
+ final InOrder inOrder = inOrder(mock);
+ inOrder.verify(mock).canCommit(any(FutureCallback.class));
+ inOrder.verify(mock).preCommit(any(FutureCallback.class));
+ inOrder.verify(mock).commit(any(FutureCallback.class));
+ }
+}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
public class ShardDataTreeTest extends AbstractTest {
- SchemaContext fullSchema;
+ private final Shard mockShard = Mockito.mock(Shard.class);
+
+
+ private SchemaContext fullSchema;
@Before
- public void setUp(){
+ public void setUp() {
+ doReturn(true).when(mockShard).canSkipPayload();
+ doReturn(Ticker.systemTicker()).when(mockShard).ticker();
+ doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
+
fullSchema = SchemaContextHelper.full();
}
@Test
public void testWrite() throws ExecutionException, InterruptedException {
- modify(new ShardDataTree(fullSchema, TreeType.OPERATIONAL), false, true, true);
+ modify(new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL), false, true, true);
}
@Test
public void testMerge() throws ExecutionException, InterruptedException {
- modify(new ShardDataTree(fullSchema, TreeType.OPERATIONAL), true, true, true);
+ modify(new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL), true, true, true);
}
- private void modify(ShardDataTree shardDataTree, boolean merge, boolean expectedCarsPresent, boolean expectedPeoplePresent) throws ExecutionException, InterruptedException {
+ private void modify(final ShardDataTree shardDataTree, final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) throws ExecutionException, InterruptedException {
assertEquals(fullSchema, shardDataTree.getSchemaContext());
- ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
+ final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
- DataTreeModification snapshot = transaction.getSnapshot();
+ final DataTreeModification snapshot = transaction.getSnapshot();
assertNotNull(snapshot);
snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
}
- ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
-
- cohort.preCommit().get();
- cohort.commit().get();
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ immediateCommit(cohort);
- ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
+ final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
- DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
+ final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
- Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
+ final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
assertEquals(expectedCarsPresent, optional.isPresent());
- Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
+ final Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
assertEquals(expectedPeoplePresent, optional1.isPresent());
@Test
public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
- ShardDataTree shardDataTree = new ShardDataTree(fullSchema, TreeType.OPERATIONAL);
+ final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
- List<DataTreeCandidateTip> candidates = new ArrayList<>();
+ final List<DataTreeCandidateTip> candidates = new ArrayList<>();
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
- NormalizedNode<?, ?> expected = getCars(shardDataTree);
+ final NormalizedNode<?, ?> expected = getCars(shardDataTree);
applyCandidates(shardDataTree, candidates);
- NormalizedNode<?, ?> actual = getCars(shardDataTree);
+ final NormalizedNode<?, ?> actual = getCars(shardDataTree);
assertEquals(expected, actual);
}
@Test
public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
- ShardDataTree shardDataTree = new ShardDataTree(fullSchema, TreeType.OPERATIONAL);
+ final ShardDataTree shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
- List<DataTreeCandidateTip> candidates = new ArrayList<>();
+ final List<DataTreeCandidateTip> candidates = new ArrayList<>();
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
- NormalizedNode<?, ?> expected = getCars(shardDataTree);
+ final NormalizedNode<?, ?> expected = getCars(shardDataTree);
applyCandidates(shardDataTree, candidates);
- NormalizedNode<?, ?> actual = getCars(shardDataTree);
+ final NormalizedNode<?, ?> actual = getCars(shardDataTree);
assertEquals(expected, actual);
}
- private static NormalizedNode<?, ?> getCars(ShardDataTree shardDataTree) {
- ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
- DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
+ private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
+ final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
+ final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
- Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
+ final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
assertEquals(true, optional.isPresent());
return optional.get();
}
- private static DataTreeCandidateTip addCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
+ private static DataTreeCandidateTip addCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
return doTransaction(shardDataTree, snapshot -> {
snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
});
}
- private static DataTreeCandidateTip removeCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
+ private static DataTreeCandidateTip removeCar(final ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
}
void execute(DataTreeModification snapshot);
}
- private static DataTreeCandidateTip doTransaction(ShardDataTree shardDataTree, DataTreeOperation operation)
+ private static DataTreeCandidateTip doTransaction(final ShardDataTree shardDataTree, final DataTreeOperation operation)
throws ExecutionException, InterruptedException {
- ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
- DataTreeModification snapshot = transaction.getSnapshot();
+ final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
+ final DataTreeModification snapshot = transaction.getSnapshot();
operation.execute(snapshot);
- ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
- cohort.canCommit().get();
- cohort.preCommit().get();
- DataTreeCandidateTip candidate = cohort.getCandidate();
- cohort.commit().get();
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ final DataTreeCandidateTip candidate = cohort.getCandidate();
+ immediateCommit(cohort);
return candidate;
}
- private static DataTreeCandidateTip applyCandidates(ShardDataTree shardDataTree, List<DataTreeCandidateTip> candidates)
+ private static DataTreeCandidateTip applyCandidates(final ShardDataTree shardDataTree, final List<DataTreeCandidateTip> candidates)
throws ExecutionException, InterruptedException {
- ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
- DataTreeModification snapshot = transaction.getSnapshot();
- for(DataTreeCandidateTip candidateTip : candidates){
+ final ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
+ final DataTreeModification snapshot = transaction.getSnapshot();
+ for(final DataTreeCandidateTip candidateTip : candidates){
DataTreeCandidates.applyToModification(snapshot, candidateTip);
}
- ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
- cohort.canCommit().get();
- cohort.preCommit().get();
- DataTreeCandidateTip candidate = cohort.getCandidate();
- cohort.commit().get();
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ final DataTreeCandidateTip candidate = cohort.getCandidate();
+ immediateCommit(cohort);
return candidate;
}
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
peopleSchemaContext = SchemaContextHelper.select(SchemaContextHelper.PEOPLE_YANG);
carsSchemaContext = SchemaContextHelper.select(SchemaContextHelper.CARS_YANG);
- peopleDataTree = new ShardDataTree(peopleSchemaContext, TreeType.OPERATIONAL);
+ final Shard mockShard = Mockito.mock(Shard.class);
+
+ peopleDataTree = new ShardDataTree(mockShard, peopleSchemaContext, TreeType.OPERATIONAL);
}
@Deprecated
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@Test
public void testApplySnapshot() throws Exception {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
ShardTestKit.waitUntilLeader(shard);
final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
final NormalizedNode<?,?> expected = readStore(store, root);
- final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(),
+ final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
- shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
+ shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
- final NormalizedNode<?,?> actual = readStore(shard, root);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
- assertEquals("Root node", expected, actual);
+ try {
+ assertEquals("Root node", expected, readStore(shard, root));
+ return;
+ } catch(AssertionError e) {
+ // try again
+ }
+ }
+
+ fail("Snapshot was not applied");
}
@Test
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
final TransactionIdentifier transactionID3 = nextTransactionId();
- final MutableCompositeModification modification3 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
+
+ Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+ shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
+ final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
+ final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
+ final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
final long timeoutSec = 5;
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.class));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ // Ready 2 more Tx's.
+
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+ shard.tell(prepareBatchedModifications(transactionID3, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
assertEquals("Commits complete", true, done);
- final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
+ final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
+ cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
+ cohort3.getPreCommit(), cohort3.getCommit());
+ inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
+ inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+ inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
+ inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+ inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
+ inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+ inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
// Verify data in the data store.
final TransactionIdentifier transactionID = nextTransactionId();
final FiniteDuration duration = duration("5 seconds");
- final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
- final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
- if(mockCohort.get() == null) {
- mockCohort.set(createDelegatingMockCohort("cohort", actual));
- }
-
- return mockCohort.get();
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the CanCommitTransaction message.
+ // Send the CommitTransaction message.
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(mockCohort.get());
- inOrder.verify(mockCohort.get()).canCommit();
- inOrder.verify(mockCohort.get()).preCommit();
- inOrder.verify(mockCohort.get()).commit();
-
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
final TransactionIdentifier transactionID = nextTransactionId();
final FiniteDuration duration = duration("5 seconds");
- final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
- final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
- if(mockCohort.get() == null) {
- mockCohort.set(createDelegatingMockCohort("cohort", actual));
- }
-
- return mockCohort.get();
- };
-
- shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(mockCohort.get());
- inOrder.verify(mockCohort.get()).canCommit();
- inOrder.verify(mockCohort.get()).preCommit();
- inOrder.verify(mockCohort.get()).commit();
-
// Verify data in the data store.
verifyOuterListEntry(shard, 1);
Failure failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId,
- DataStoreVersions.CURRENT_VERSION, true), getRef());
+ shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
waitUntilLeader(shard);
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
-
- final FiniteDuration duration = duration("5 seconds");
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
-
- expectMsgClass(duration, CommitTransactionReply.class);
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+ containerNode, true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ }
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
@Test
public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
- testCommitWithPersistenceDisabled(true);
- }
-
- @Test
- public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
- testCommitWithPersistenceDisabled(true);
- }
-
- private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.persistent(false);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWithPersistenceDisabled-" + readWrite);
+ "testCommitWithPersistenceDisabled");
waitUntilLeader(shard);
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
// Setup a simulated transactions with a mock cohort.
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
-
final FiniteDuration duration = duration("5 seconds");
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ final TransactionIdentifier transactionID = nextTransactionId();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
-
final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
}};
private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
// but here that need not happen
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasNoModifications-" + readWrite);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
- waitUntilLeader(shard);
+ waitUntilLeader(shard);
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
- doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
+ final TransactionIdentifier transactionID = nextTransactionId();
- final FiniteDuration duration = duration("5 seconds");
+ final FiniteDuration duration = duration("5 seconds");
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
+ if(readWrite) {
+ ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+ newReadWriteTransaction(transactionID);
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef());
+ }
- // Send the CanCommitTransaction message.
+ expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ // Send the CanCommitTransaction message.
- shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.class);
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
- shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
- final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
- // Use MBean for verification
- // Committed transaction count should increase as usual
- assertEquals(1,shardStats.getCommittedTransactionsCount());
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shardStats.getCommittedTransactionsCount());
- // Commit index should not advance because this does not go into the journal
- assertEquals(-1, shardStats.getCommitIndex());
- }
- };
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shardStats.getCommitIndex());
+ }};
}
@Test
- public void testReadWriteCommitWhenTransactionHasModifications() {
+ public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
testCommitWhenTransactionHasModifications(true);
}
@Test
- public void testWriteOnlyCommitWhenTransactionHasModifications() {
+ public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
testCommitWhenTransactionHasModifications(false);
}
- private void testCommitWhenTransactionHasModifications(final boolean readWrite){
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasModifications-" + readWrite);
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications-" + readWrite);
- waitUntilLeader(shard);
+ waitUntilLeader(shard);
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY));
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
- doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
+ final FiniteDuration duration = duration("5 seconds");
+ final TransactionIdentifier transactionID = nextTransactionId();
- final FiniteDuration duration = duration("5 seconds");
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ }
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
+ expectMsgClass(duration, ReadyTransactionReply.class);
- // Send the CanCommitTransaction message.
+ // Send the CanCommitTransaction message.
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
- shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.class);
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
- final InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ final InOrder inOrder = inOrder(dataTree);
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+ inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+ inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
- shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
- final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
- // Use MBean for verification
- // Committed transaction count should increase as usual
- assertEquals(1, shardStats.getCommittedTransactionsCount());
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1, shardStats.getCommittedTransactionsCount());
- // Commit index should advance as we do not have an empty modification
- assertEquals(0, shardStats.getCommitIndex());
- }
- };
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shardStats.getCommitIndex());
+ }};
}
@Test
public void testCommitPhaseFailure() throws Throwable {
- testCommitPhaseFailure(true);
- testCommitPhaseFailure(false);
- }
-
- private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure-" + readWrite);
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure");
waitUntilLeader(shard);
+ final FiniteDuration duration = duration("5 seconds");
+ final Timeout timeout = new Timeout(duration);
+
// Setup 2 simulated transactions with mock cohorts. The first one fails in the
// commit phase.
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
- doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
- doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
- final FiniteDuration duration = duration("5 seconds");
- final Timeout timeout = new Timeout(duration);
+ doThrow(new RuntimeException("mock commit failure")).when(dataTree).commit(any(DataTreeCandidate.class));
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
+ final InOrder inOrder = inOrder(dataTree);
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+ inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+ inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
}};
}
@Test
public void testPreCommitPhaseFailure() throws Throwable {
- testPreCommitPhaseFailure(true);
- testPreCommitPhaseFailure(false);
- }
-
- private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testPreCommitPhaseFailure-" + readWrite);
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testPreCommitPhaseFailure");
waitUntilLeader(shard);
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ doThrow(new RuntimeException("mock preCommit failure")).when(dataTree).prepare(any(DataTreeModification.class));
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort2).canCommit();
+ final InOrder inOrder = inOrder(dataTree);
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
+ inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
+ inOrder.verify(dataTree).validate(any(DataTreeModification.class));
}};
}
@Test
public void testCanCommitPhaseFailure() throws Throwable {
- testCanCommitPhaseFailure(true);
- testCanCommitPhaseFailure(false);
- }
-
- private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFailure-" + readWrite);
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFailure");
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
-
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
+ doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
+ doNothing().when(dataTree).validate(any(DataTreeModification.class));
+
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
// Send another can commit to ensure the failed one got cleaned up.
- reset(cohort);
-
final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
}};
}
- @Test
- public void testCanCommitPhaseFalseResponse() throws Throwable {
- testCanCommitPhaseFalseResponse(true);
- testCanCommitPhaseFalseResponse(false);
- }
-
- private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCanCommitPhaseFalseResponse-" + readWrite);
-
- waitUntilLeader(shard);
-
- final FiniteDuration duration = duration("5 seconds");
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // Send the CanCommitTransaction message.
-
- shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.class));
- assertEquals("getCanCommit", false, reply.getCanCommit());
-
- // Send another can commit to ensure the failed one got cleaned up.
-
- reset(cohort);
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
- reply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(CanCommitTransactionReply.class));
- assertEquals("getCanCommit", true, reply.getCanCommit());
- }};
- }
-
@Test
public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
testImmediateCommitWithCanCommitPhaseFailure(true);
private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
waitUntilLeader(shard);
+ doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
+ doNothing().when(dataTree).validate(any(DataTreeModification.class));
+
final FiniteDuration duration = duration("5 seconds");
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ }
expectMsgClass(duration, akka.actor.Status.Failure.class);
// Send another can commit to ensure the failed one got cleaned up.
- reset(cohort);
-
final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
- final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
- doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
- doReturn(candidateRoot).when(candidate).getRootNode();
- doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
- doReturn(candidate).when(cohort).getCandidate();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ }
expectMsgClass(duration, CommitTransactionReply.class);
}};
}
+ @SuppressWarnings("serial")
@Test
- public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
- testImmediateCommitWithCanCommitPhaseFalseResponse(true);
- testImmediateCommitWithCanCommitPhaseFalseResponse(false);
- }
-
- private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
+ public void testAbortWithCommitPending() throws Throwable {
new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
-
- waitUntilLeader(shard);
-
- final FiniteDuration duration = duration("5 seconds");
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
-
- expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- // Send another can commit to ensure the failed one got cleaned up.
-
- reset(cohort);
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort).commit();
- final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
- final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
- doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
- doReturn(candidateRoot).when(candidate).getRootNode();
- doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
- doReturn(candidate).when(cohort).getCandidate();
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
-
- expectMsgClass(duration, CommitTransactionReply.class);
- }};
- }
+ final Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(newShardBuilder()) {
+ @Override
+ void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
+ // Simulate an AbortTransaction message occurring during replication, after
+ // persisting and before finishing the commit to the in-memory store.
- @Test
- public void testAbortBeforeFinishCommit() throws Throwable {
- testAbortBeforeFinishCommit(true);
- testAbortBeforeFinishCommit(false);
- }
+ doAbortTransaction(transactionId, null);
+ super.persistPayload(transactionId, payload);
+ }
+ };
+ }
+ };
- private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
- new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortBeforeFinishCommit-" + readWrite);
+ Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortWithCommitPending");
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
final TransactionIdentifier transactionID = nextTransactionId();
- final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
- cohort -> {
- final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-
- // Simulate an AbortTransaction message occurring during replication, after
- // persisting and before finishing the commit to the in-memory store.
- // We have no followers so due to optimizations in the RaftActor, it does not
- // attempt replication and thus we can't send an AbortTransaction message b/c
- // it would be processed too late after CommitTransaction completes. So we'll
- // simulate an AbortTransaction message occurring during replication by calling
- // the shard directly.
- //
- shard.underlyingActor().doAbortTransaction(transactionID, null);
-
- return preCommitFuture;
- };
-
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- modification, preCommit);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
- expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.class);
@Test
public void testTransactionCommitTimeout() throws Throwable {
- testTransactionCommitTimeout(true);
- testTransactionCommitTimeout(false);
- }
-
- private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
-
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionCommitTimeout-" + readWrite);
+ "testTransactionCommitTimeout");
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- // Create 1st Tx - will timeout
+ // Ready 2 Tx's - the first will timeout
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification1);
-
- // Create 2nd Tx
+ shard.tell(prepareBatchedModifications(transactionID1, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
- listNodePath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
- modification2);
-
- // Ready the Tx's
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ shard.tell(prepareBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
}};
}
- @Test
- public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
- dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
-
- new ShardTestKit(getSystem()) {{
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testTransactionCommitQueueCapacityExceeded");
-
- waitUntilLeader(shard);
-
- final FiniteDuration duration = duration("5 seconds");
-
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
- final TransactionIdentifier transactionID3 = nextTransactionId();
- final MutableCompositeModification modification3 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
-
- // Ready the Tx's
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // The 3rd Tx should exceed queue capacity and fail.
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
- expectMsgClass(duration, akka.actor.Status.Failure.class);
-
- // canCommit 1st Tx.
-
- shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CanCommitTransactionReply.class);
-
- // canCommit the 2nd Tx - it should get queued.
-
- shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
-
- // canCommit the 3rd Tx - should exceed queue capacity and fail.
-
- shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, akka.actor.Status.Failure.class);
- }};
- }
+// @Test
+// @Ignore
+// public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+// dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
+//
+// new ShardTestKit(getSystem()) {{
+// final TestActorRef<Shard> shard = actorFactory.createTestActor(
+// newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+// "testTransactionCommitQueueCapacityExceeded");
+//
+// waitUntilLeader(shard);
+//
+// final FiniteDuration duration = duration("5 seconds");
+//
+// final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+//
+// final TransactionIdentifier transactionID1 = nextTransactionId();
+// final MutableCompositeModification modification1 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
+// modification1);
+//
+// final TransactionIdentifier transactionID2 = nextTransactionId();
+// final MutableCompositeModification modification2 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+// TestModel.OUTER_LIST_PATH,
+// ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
+// modification2);
+//
+// final TransactionIdentifier transactionID3 = nextTransactionId();
+// final MutableCompositeModification modification3 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
+// modification3);
+//
+// // Ready the Tx's
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// // The 3rd Tx should exceed queue capacity and fail.
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+//
+// // canCommit 1st Tx.
+//
+// shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, CanCommitTransactionReply.class);
+//
+// // canCommit the 2nd Tx - it should get queued.
+//
+// shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+//
+// // canCommit the 3rd Tx - should exceed queue capacity and fail.
+//
+// shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+// }};
+// }
@Test
public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
- dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
-
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
final FiniteDuration duration = duration("5 seconds");
- final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
-
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
final TransactionIdentifier transactionID3 = nextTransactionId();
- final MutableCompositeModification modification3 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// All Tx's are readied. We'll send canCommit for the last one but not the others. The others
@Test
public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
- dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
-
+ dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- // CanCommit the first one so it's the current in-progress CohortEntry.
+ // CanCommit the first Tx so it's the current in-progress Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, CanCommitTransactionReply.class);
// Ready the second Tx.
final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
-
- shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Ready the third Tx.
final DataTreeModification modification3 = dataStore.newModification();
new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
.apply(modification3);
- modification3.ready();
+ modification3.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
-
shard.tell(readyMessage, getRef());
// Commit the first Tx. After completing, the second should expire from the queue and the third
}
@Test
- public void testAbortCurrentTransaction() throws Throwable {
- testAbortCurrentTransaction(true);
- testAbortCurrentTransaction(false);
- }
-
- private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
+ public void testAbortAfterCanCommit() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = actorFactory.createTestActor(
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testAbortCurrentTransaction-" + readWrite);
+ "testAbortAfterCanCommit");
waitUntilLeader(shard);
- // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
-
- final TransactionIdentifier transactionID1 = nextTransactionId();
- final MutableCompositeModification modification1 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
- doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
-
- final TransactionIdentifier transactionID2 = nextTransactionId();
- final MutableCompositeModification modification2 = new MutableCompositeModification();
- final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
- doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
-
final FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
+ // Ready 2 transactions - the first one will be aborted.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message for the first Tx.
shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
expectMsgClass(duration, CanCommitTransactionReply.class));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
// Wait for the 2nd Tx to complete the canCommit phase.
- Await.ready(canCommitFuture, duration);
-
- final InOrder inOrder = inOrder(cohort1, cohort2);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort2).canCommit();
+ canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
}};
}
@Test
- public void testAbortQueuedTransaction() throws Throwable {
- testAbortQueuedTransaction(true);
- testAbortQueuedTransaction(false);
- }
-
- private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
+ public void testAbortAfterReady() throws Throwable {
dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
new ShardTestKit(getSystem()) {{
- final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
- @SuppressWarnings("serial")
- final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- super.handleCommand(message);
- if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
- if(cleaupCheckLatch.get() != null) {
- cleaupCheckLatch.get().countDown();
- }
- }
- }
- };
-
final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(
- Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
waitUntilLeader(shard);
- final TransactionIdentifier transactionID = nextTransactionId();
- final MutableCompositeModification modification = new MutableCompositeModification();
- final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
- doReturn(Futures.immediateFuture(null)).when(cohort).abort();
-
final FiniteDuration duration = duration("5 seconds");
- // Ready the tx.
+ // Ready a tx.
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
-
// Send the AbortTransaction message.
- shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration, AbortTransactionReply.class);
- verify(cohort).abort();
-
- // Verify the tx cohort is removed from queue at the cleanup check interval.
-
- cleaupCheckLatch.set(new CountDownLatch(1));
- assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
- cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
-
assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
// Now send CanCommitTransaction - should fail.
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
-
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
assertTrue("Failure type", failure instanceof IllegalStateException);
+
+ // Ready and CanCommit another and verify success.
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testAbortQueuedTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ // Ready 3 tx's.
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+ shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Abort the second tx while it's queued.
+
+ shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, AbortTransactionReply.class);
+
+ // Commit the other 2.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
}};
}
new ShardTestKit(getSystem()) {{
class TestShard extends Shard {
- protected TestShard(AbstractBuilder<?, ?> builder) {
+ protected TestShard(final AbstractBuilder<?, ?> builder) {
super(builder);
setPersistence(new TestPersistentDataProvider(super.persistence()));
}
awaitAndValidateSnapshot(expectedRoot);
}
- private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
+ private void awaitAndValidateSnapshot(final NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
waitUntilNoLeader(shard);
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
-
shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterDataTreeChangeListenerReply.class);
import akka.testkit.TestActorRef;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
private static final TransactionType RW = TransactionType.READ_WRITE;
private static final TransactionType WO = TransactionType.WRITE_ONLY;
- private static final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
+ private static final Shard mockShard = Mockito.mock(Shard.class);
+
+ private static final ShardDataTree store = new ShardDataTree(mockShard, testSchemaContext, TreeType.OPERATIONAL);
private static final ShardIdentifier SHARD_IDENTIFIER =
ShardIdentifier.create("inventory", MemberName.forName("member-1"), "operational");
import akka.actor.Props;
import akka.actor.Status.Failure;
import akka.actor.Terminated;
+import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import java.util.concurrent.TimeUnit;
-import org.junit.Ignore;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class ShardTransactionTest extends AbstractActorTest {
- private static final SchemaContext testSchemaContext = TestModel.createTestContext();
private static final TransactionType RO = TransactionType.READ_ONLY;
private static final TransactionType RW = TransactionType.READ_WRITE;
private static final TransactionType WO = TransactionType.WRITE_ONLY;
private static final ShardIdentifier SHARD_IDENTIFIER =
ShardIdentifier.create("inventory", MEMBER_NAME, "config");
+ private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
- private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
- private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+ private TestActorRef<Shard> shard;
+ private ShardDataTree store;
- private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
-
- private ActorRef createShard() {
- ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
- schemaContext(TestModel.createTestContext()).props());
+ @Before
+ public void setUp() {
+ shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+ schemaContext(TestModel.createTestContext()).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
ShardTestKit.waitUntilLeader(shard);
- return shard;
- }
-
- private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
- return newTransactionActor(type, transaction, null, name);
+ store = shard.underlyingActor().getDataStore();
}
- private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
- Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
- datastoreContext, shardStats);
- return getSystem().actorOf(props, name);
+ private ActorRef newTransactionActor(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction, final String name) {
+ Props props = ShardTransaction.props(type, transaction, shard, datastoreContext, shard.underlyingActor().getShardMBean());
+ return actorFactory.createActor(props, name);
}
private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
+ testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
- testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
-
- testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
+ testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
}
private void testOnReceiveReadData(final ActorRef transaction) {
@Test
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
-
testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
- RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
+ RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
- RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
+ RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
}
private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
-
- testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
+ testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(),
"testDataExistsPositiveRO"));
- testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
+ testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(),
"testDataExistsPositiveRW"));
}
@Test
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = createShard();
-
- testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
+ testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(),
"testDataExistsNegativeRO"));
- testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
+ testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(),
"testDataExistsNegativeRW"));
}
}};
}
- // Unknown operations are being logged
- @Ignore
- @Test(expected=UnknownMessageException.class)
- public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
- datastoreContext, shardStats);
- final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
-
- transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION),
- ActorRef.noSender());
- }
-
@Test
public void testShardTransactionInactivity() {
*/
package org.opendaylight.controller.cluster.datastore;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import scala.concurrent.Promise;
/**
* Unit tests for SimpleShardDataTreeCohort.
* @author Thomas Pantelis
*/
public class SimpleShardDataTreeCohortTest extends AbstractTest {
- @Mock
- private TipProducingDataTree mockDataTree;
-
@Mock
private ShardDataTree mockShardDataTree;
@Mock
private DataTreeModification mockModification;
+ @Mock
+ private CompositeDataTreeCohort mockUserCohorts;
+
+ @Mock
+ private FutureCallback<DataTreeCandidate> mockPreCallback;
+
private SimpleShardDataTreeCohort cohort;
@Before
- public void setup() {
+ public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
- doReturn(mockDataTree).when(mockShardDataTree).getDataTree();
+ doNothing().when(mockUserCohorts).commit();
+ doReturn(Optional.empty()).when(mockUserCohorts).abort();
- cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId());
+ cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
+ mockUserCohorts);
}
@Test
public void testCanCommitSuccess() throws Exception {
- ListenableFuture<Boolean> future = cohort.canCommit();
- assertNotNull("Future is null", future);
- assertEquals("Future", true, future.get(3, TimeUnit.SECONDS));
- verify(mockDataTree).validate(mockModification);
+ canCommitSuccess();
}
- @Test(expected=OptimisticLockFailedException.class)
- public void testCanCommitWithConflictingModEx() throws Throwable {
- doThrow(new ConflictingModificationAppliedException(YangInstanceIdentifier.EMPTY, "mock")).
- when(mockDataTree).validate(mockModification);
- try {
- cohort.canCommit().get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
+ private void canCommitSuccess() {
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCanCommit();
+ return null;
+ }).when(mockShardDataTree).startCanCommit(cohort);
+
+ @SuppressWarnings("unchecked")
+ final FutureCallback<Void> callback = mock(FutureCallback.class);
+ cohort.canCommit(callback);
+
+ verify(callback).onSuccess(null);
+ verifyNoMoreInteractions(callback);
}
- @Test(expected=TransactionCommitFailedException.class)
- public void testCanCommitWithDataValidationEx() throws Throwable {
- doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")).
- when(mockDataTree).validate(mockModification);
- try {
- cohort.canCommit().get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
+ private void testValidatationPropagates(final Exception cause) throws DataValidationFailedException {
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedCanCommit(cause);
+ return null;
+ }).when(mockShardDataTree).startCanCommit(cohort);
+
+ @SuppressWarnings("unchecked")
+ final FutureCallback<Void> callback = mock(FutureCallback.class);
+ cohort.canCommit(callback);
+
+ verify(callback).onFailure(cause);
+ verifyNoMoreInteractions(callback);
}
- @Test(expected=IllegalArgumentException.class)
- public void testCanCommitWithIllegalArgumentEx() throws Throwable {
- doThrow(new IllegalArgumentException("mock")).when(mockDataTree).validate(mockModification);
- try {
- cohort.canCommit().get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
+ @Test
+ public void testCanCommitWithConflictingModEx() throws DataValidationFailedException {
+ testValidatationPropagates(new ConflictingModificationAppliedException(YangInstanceIdentifier.EMPTY, "mock"));
}
@Test
- public void testPreCommitAndCommitSuccess() throws Exception {
- DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class);
- doReturn(mockCandidate ).when(mockDataTree).prepare(mockModification);
+ public void testCanCommitWithDataValidationEx() throws DataValidationFailedException {
+ testValidatationPropagates(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock"));
+ }
- ListenableFuture<Void> future = cohort.preCommit();
- assertNotNull("Future is null", future);
- future.get();
- verify(mockDataTree).prepare(mockModification);
+ @Test
+ public void testCanCommitWithIllegalArgumentEx() throws DataValidationFailedException {
+ testValidatationPropagates(new IllegalArgumentException("mock"));
+ }
+
+ private DataTreeCandidateTip preCommitSuccess() {
+ final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class);
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulPreCommit(mockCandidate);
+ return null;
+ }).when(mockShardDataTree).startPreCommit(cohort);
+
+ @SuppressWarnings("unchecked")
+ final FutureCallback<DataTreeCandidate> callback = mock(FutureCallback.class);
+ cohort.preCommit(callback);
+
+ verify(callback).onSuccess(mockCandidate);
+ verifyNoMoreInteractions(callback);
assertSame("getCandidate", mockCandidate, cohort.getCandidate());
- future = cohort.commit();
- assertNotNull("Future is null", future);
- future.get();
- verify(mockDataTree).commit(mockCandidate);
+ return mockCandidate;
+ }
+
+ @Test
+ public void testPreCommitAndCommitSuccess() throws Exception {
+ canCommitSuccess();
+ final DataTreeCandidateTip candidate = preCommitSuccess();
+
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).successfulCommit(UnsignedLong.valueOf(0));
+ return null;
+ }).when(mockShardDataTree).startCommit(cohort, candidate);
+
+ @SuppressWarnings("unchecked")
+ final
+ FutureCallback<UnsignedLong> mockCommitCallback = mock(FutureCallback.class);
+ cohort.commit(mockCommitCallback);
+
+ verify(mockCommitCallback).onSuccess(any(UnsignedLong.class));
+ verifyNoMoreInteractions(mockCommitCallback);
+
+ verify(mockUserCohorts).commit();
}
- @Test(expected=IllegalArgumentException.class)
+ @Test
public void testPreCommitWithIllegalArgumentEx() throws Throwable {
- doThrow(new IllegalArgumentException("mock")).when(mockDataTree).prepare(mockModification);
- try {
- cohort.preCommit().get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
+ canCommitSuccess();
+
+ final Exception cause = new IllegalArgumentException("mock");
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedPreCommit(cause);
+ return null;
+ }).when(mockShardDataTree).startPreCommit(cohort);
+
+ @SuppressWarnings("unchecked")
+ final FutureCallback<DataTreeCandidate> callback = mock(FutureCallback.class);
+ cohort.preCommit(callback);
+
+ verify(callback).onFailure(cause);
+ verifyNoMoreInteractions(callback);
+
+ verify(mockUserCohorts).abort();
}
- @Test(expected=IllegalArgumentException.class)
- public void testCommitWithIllegalArgumentEx() throws Throwable {
- doThrow(new IllegalArgumentException("mock")).when(mockDataTree).commit(any(DataTreeCandidateTip.class));
- try {
- cohort.commit().get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
+ @Test
+ public void testPreCommitWithReportedFailure() throws Throwable {
+ canCommitSuccess();
+
+ final Exception cause = new IllegalArgumentException("mock");
+ cohort.reportFailure(cause);
+
+ @SuppressWarnings("unchecked")
+ final FutureCallback<DataTreeCandidate> callback = mock(FutureCallback.class);
+ cohort.preCommit(callback);
+
+ verify(callback).onFailure(cause);
+ verifyNoMoreInteractions(callback);
+
+ verify(mockShardDataTree, never()).startPreCommit(cohort);
+ }
+
+ @Test
+ public void testCommitWithIllegalArgumentEx() {
+ canCommitSuccess();
+ final DataTreeCandidateTip candidate = preCommitSuccess();
+
+ final Exception cause = new IllegalArgumentException("mock");
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SimpleShardDataTreeCohort.class).failedCommit(cause);
+ return null;
+ }).when(mockShardDataTree).startCommit(cohort, candidate);
+
+ @SuppressWarnings("unchecked")
+ final FutureCallback<UnsignedLong> callback = mock(FutureCallback.class);
+ cohort.commit(callback);
+
+ verify(callback).onFailure(cause);
+ verifyNoMoreInteractions(callback);
+
+ verify(mockUserCohorts).abort();
}
@Test
public void testAbort() throws Exception {
+ doNothing().when(mockShardDataTree).startAbort(cohort);
+
cohort.abort().get();
+
+ verify(mockShardDataTree).startAbort(cohort);
+ }
+
+ @Test
+ public void testAbortWithCohorts() throws Exception {
+ doNothing().when(mockShardDataTree).startAbort(cohort);
+
+ final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
+ doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
+
+ final ListenableFuture<Void> abortFuture = cohort.abort();
+
+ cohortFuture.success(Collections.emptyList());
+
+ abortFuture.get();
+ verify(mockShardDataTree).startAbort(cohort);
}
}
import akka.testkit.JavaTestKit;
import com.google.common.collect.ImmutableSet;
import java.util.concurrent.TimeUnit;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
private static final YangInstanceIdentifier ENTITY_ID2 =
YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
- private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(),
- TreeType.OPERATIONAL);
+ private ShardDataTree shardDataTree;
+
+ @Mock
+ private Shard mockShard;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL);
+ }
@Test
public void testOnDataTreeChanged() throws Exception {
ImmutableSet.copyOf(candidateRemoved.getRemainingCandidates()));
}
- private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+ private void writeNode(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) throws DataValidationFailedException {
AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
}
- private void deleteNode(YangInstanceIdentifier path) throws DataValidationFailedException {
+ private void deleteNode(final YangInstanceIdentifier path) throws DataValidationFailedException {
AbstractEntityOwnershipTest.deleteNode(path, shardDataTree);
}
}
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
DistributedEntityOwnershipService service = spy(DistributedEntityOwnershipService.start(
dataStore.getActorContext(), EntityOwnerSelectionStrategyConfig.newBuilder().build()));
- ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(), TreeType.OPERATIONAL);
+ final Shard mockShard = Mockito.mock(Shard.class);
+ ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(),
+ TreeType.OPERATIONAL);
when(service.getLocalEntityOwnershipShardDataTree()).thenReturn(shardDataTree.getDataTree());
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
private static final DOMEntity ENTITY1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
private static final DOMEntity ENTITY2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
- private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(),
+ private final Shard mockShard = Mockito.mock(Shard.class);
+
+ private final ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(),
TreeType.OPERATIONAL);
private final EntityOwnershipListenerSupport mockListenerSupport = mock(EntityOwnershipListenerSupport.class);
private EntityOwnerChangeListener listener;
anyBoolean(), anyBoolean());
}
- private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+ private void writeNode(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) throws DataValidationFailedException {
AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
}
}
TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
- LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+ leaderId.getMemberName().getName(), EntityOwnerSelectionStrategyConfig.newBuilder().build())
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
leader.tell(TimeoutNow.INSTANCE, leader);
ShardTestKit.waitUntilLeader(leader);
TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
DatastoreContext datastoreContext) {
super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
- schemaContext(SCHEMA_CONTEXT).localMemberName(MemberName.forName(LOCAL_MEMBER_NAME)));
+ schemaContext(SCHEMA_CONTEXT).localMemberName(name.getMemberName()));
}
@Override
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.yangtools.yang.common.QName;
private static final YangInstanceIdentifier ENTITY_ID2 =
YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
- private final ShardDataTree shardDataTree = new ShardDataTree(SchemaContextHelper.entityOwners(),
+ private final Shard mockShard = Mockito.mock(Shard.class);
+
+ private final ShardDataTree shardDataTree = new ShardDataTree(mockShard, SchemaContextHelper.entityOwners(),
TreeType.OPERATIONAL);
private EntityOwnershipStatistics ownershipStatistics;
}
- private static void assertStatistics(Map<String, Map<String, Long>> statistics, String memberName, long val) {
+ private static void assertStatistics(final Map<String, Map<String, Long>> statistics, final String memberName, final long val) {
assertEquals(val, statistics.get(ENTITY_TYPE).get(memberName).longValue());
}
- private void writeNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node) throws DataValidationFailedException {
+ private void writeNode(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) throws DataValidationFailedException {
AbstractEntityOwnershipTest.writeNode(path, node, shardDataTree);
}
}
\ No newline at end of file
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
realModification = dataTree.takeSnapshot().newModification();
proxyModification = Reflection.newProxy(DataTreeModification.class, new InvocationHandler() {
@Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
try {
method.invoke(mockModification, args);
return method.invoke(realModification, args);
}
@Test
- public void testWriteRootNodeWithInvalidChild() throws Exception{
- ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION);
+ public void testWriteRootNodeWithInvalidChild() throws Exception {
+ final Shard mockShard = Mockito.mock(Shard.class);
+
+ ShardDataTree shardDataTree = new ShardDataTree(mockShard, SCHEMA_CONTEXT, TreeType.CONFIGURATION);
NormalizedNode<?, ?> root = shardDataTree.readNode(YangInstanceIdentifier.EMPTY).get();
NormalizedNode<?, ?> normalizedNode = ImmutableContainerNodeBuilder.create().withNodeIdentifier(