import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
/**
@NotThreadSafe
abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot> {
private final T snapshot;
- private final String id;
+ private final TransactionIdentifier id;
private boolean closed;
- protected AbstractShardDataTreeTransaction(final String id, final T snapshot) {
+ protected AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
this.snapshot = Preconditions.checkNotNull(snapshot);
this.id = Preconditions.checkNotNull(id);
}
- String getId() {
+ final TransactionIdentifier getId() {
return id;
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private final ReadWriteShardDataTreeTransaction transaction;
- private final StringIdentifier transactionID;
+ private final TransactionIdentifier transactionID;
private final CompositeDataTreeCohort userCohorts;
private final short clientVersion;
private ActorRef replySender;
private Shard shard;
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+ CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionID = new StringIdentifier(transactionID);
+ this.transactionID = Preconditions.checkNotNull(transactionID);
this.clientVersion = clientVersion;
this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+ CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
SchemaContext schema, short clientVersion) {
- this.transactionID = new StringIdentifier(transactionID);
+ this.transactionID = Preconditions.checkNotNull(transactionID);
this.cohort = cohort;
this.transaction = null;
this.clientVersion = clientVersion;
lastAccessTimer.start();
}
- StringIdentifier getTransactionID() {
+ TransactionIdentifier getTransactionID() {
return transactionID;
}
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
private final DataTreeCohortActorRegistry registry;
- private final String txId;
+ private final TransactionIdentifier txId;
private final SchemaContext schema;
private final Timeout timeout;
private Iterable<Success> successfulFromPrevious;
private State state = State.IDLE;
- CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, String txId, SchemaContext schema, Timeout timeout) {
+ CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID,
+ SchemaContext schema, Timeout timeout) {
this.registry = Preconditions.checkNotNull(registry);
- this.txId = Preconditions.checkNotNull(txId);
+ this.txId = Preconditions.checkNotNull(transactionID);
this.schema = Preconditions.checkNotNull(schema);
this.timeout = Preconditions.checkNotNull(timeout);
}
import akka.actor.Props;
import akka.actor.Status;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.mdsal.common.api.PostCanCommitStep;
import org.opendaylight.mdsal.common.api.PostPreCommitStep;
*/
static abstract class CommitProtocolCommand<R extends CommitReply> {
- private final String txId;
+ private final TransactionIdentifier txId;
- final String getTxId() {
+ final TransactionIdentifier getTxId() {
return txId;
}
- protected CommitProtocolCommand(String txId) {
+ protected CommitProtocolCommand(TransactionIdentifier txId) {
this.txId = Preconditions.checkNotNull(txId);
}
}
private final ActorRef cohort;
private final SchemaContext schema;
- CanCommit(String txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
+ CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
super(txId);
this.cohort = Preconditions.checkNotNull(cohort);
this.candidate = Preconditions.checkNotNull(candidate);
static abstract class CommitReply {
private final ActorRef cohortRef;
- private final String txId;
+ private final TransactionIdentifier txId;
- protected CommitReply(ActorRef cohortRef, String txId) {
+ protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
this.cohortRef = Preconditions.checkNotNull(cohortRef);
this.txId = Preconditions.checkNotNull(txId);
}
return cohortRef;
}
- final String getTxId() {
+ final TransactionIdentifier getTxId() {
return txId;
}
-
}
static final class Success extends CommitReply {
- public Success(ActorRef cohortRef, String txId) {
+ public Success(ActorRef cohortRef, TransactionIdentifier txId) {
super(cohortRef, txId);
}
static final class PreCommit extends CommitProtocolCommand<Success> {
- public PreCommit(String txId) {
+ public PreCommit(TransactionIdentifier txId) {
super(txId);
}
}
static final class Abort extends CommitProtocolCommand<Success> {
- public Abort(String txId) {
+ public Abort(TransactionIdentifier txId) {
super(txId);
}
}
static final class Commit extends CommitProtocolCommand<Success> {
- public Commit(String txId) {
+ public Commit(TransactionIdentifier txId) {
super(txId);
}
}
extends CohortBehaviour<M> {
private final S step;
- private final String txId;
+ private final TransactionIdentifier txId;
- CohortStateWithStep(String txId, S step) {
+ CohortStateWithStep(TransactionIdentifier txId, S step) {
this.txId = Preconditions.checkNotNull(txId);
this.step = Preconditions.checkNotNull(step);
}
return step;
}
- final String getTxId() {
+ final TransactionIdentifier getTxId() {
return txId;
}
private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
- PostCanCommit(String txId, PostCanCommitStep nextStep) {
+ PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
super(txId, nextStep);
}
private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
- PostPreCommit(String txId, PostPreCommitStep step) {
+ PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
super(txId, step);
}
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.md.sal.dom.spi.AbstractRegistrationTree;
import org.opendaylight.controller.md.sal.dom.spi.RegistrationTreeNode;
cohort.tell(PoisonPill.getInstance(), cohort);
}
- Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(String txId, DataTreeCandidate candidate,
- SchemaContext schema) {
+ Collection<DataTreeCohortActor.CanCommit> createCanCommitMessages(TransactionIdentifier txId,
+ DataTreeCandidate candidate, SchemaContext schema) {
try (RegistrationTreeSnapshot<ActorRef> cohorts = takeSnapshot()) {
return new CanCommitMessageBuilder(txId, candidate, schema).perform(cohorts.getRootNode());
}
private static class CanCommitMessageBuilder {
- private final String txId;
+ private final TransactionIdentifier txId;
private final DataTreeCandidate candidate;
private final SchemaContext schema;
private final Collection<DataTreeCohortActor.CanCommit> messages =
new ArrayList<>();
- CanCommitMessageBuilder(String txId, DataTreeCandidate candidate, SchemaContext schema) {
+ CanCommitMessageBuilder(TransactionIdentifier txId, DataTreeCandidate candidate, SchemaContext schema) {
this.txId = Preconditions.checkNotNull(txId);
this.candidate = Preconditions.checkNotNull(candidate);
this.schema = schema;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
return Futures.failed(operationError);
}
- final ReadyLocalTransaction message = new ReadyLocalTransaction(
- TransactionIdentifierUtils.actorNameFor(transaction.getIdentifier()), modification, immediate);
+ final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(),
+ modification, immediate);
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
*/
package org.opendaylight.controller.cluster.datastore;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
- ReadOnlyShardDataTreeTransaction(final String id, final DataTreeSnapshot snapshot) {
+ ReadOnlyShardDataTreeTransaction(final TransactionIdentifier id, final DataTreeSnapshot snapshot) {
super(id, snapshot);
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
private final ShardDataTreeTransactionParent parent;
- protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final String id, final DataTreeModification modification) {
+ protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
+ final TransactionIdentifier id, final DataTreeModification modification) {
super(id, modification);
this.parent = Preconditions.checkNotNull(parent);
}
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(TransactionIdentifierUtils.actorNameFor(getIdentifier()),
- getTransactionVersion(), RemoteTransactionContextSupport.compatTransactionChainId(getIdentifier()));
+ return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
private void batchModification(Modification modification) {
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
}
/**
- * @deprecated Temporary utility for extracting transaction chain ID from a {@link TransactionIdentifier}
- */
- @Deprecated
- static String compatTransactionChainId(final TransactionIdentifier txId) {
- final long historyId = txId.getHistoryId().getHistoryId();
- return historyId == 0 ? "" : Long.toUnsignedString(historyId);
- }
-
- /**
- * Performs a CreateTransaction try async.
+ Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
if(LOG.isDebugEnabled()) {
primaryShardInfo.getPrimaryShardActor());
}
- Object serializedCreateMessage = new CreateTransaction(TransactionIdentifierUtils.actorNameFor(getIdentifier()),
- getTransactionType().ordinal(), compatTransactionChainId(getIdentifier()),
+ Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
primaryShardInfo.getPrimaryShardVersion()).toSerializable();
Future<Object> createTxFuture = getActorContext().executeOperationAsync(
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
- snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+ snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
+ LOG, this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
shardMBean.incrementFailedTransactionsCount();
}
} else {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
doAbortTransaction(abort.getTransactionID(), getSender());
}
- void doAbortTransaction(final String transactionID, final ActorRef sender) {
- commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
+ void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+ commitCoordinator.handleAbort(transactionID, sender, this);
}
private void handleCreateTransaction(final Object message) {
}
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionId());
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
}
}
- private ActorRef createTransaction(int transactionType, String transactionId, String transactionChainId) {
+ private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
- transactionId, transactionChainId);
+ transactionId);
}
private void commitWithNewTransaction(final BatchedModifications modification) {
- ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID(),
- modification.getTransactionChainID());
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
* @param shard the transaction's shard actor
*/
void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
- CohortEntry cohortEntry = cohortCache.get(new StringIdentifier(batched.getTransactionID()));
+ CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+ dataTree.newReadWriteTransaction(batched.getTransactionID()),
cohortRegistry, schema, batched.getVersion());
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
}
Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
final int maxModificationsPerBatch) {
- CohortEntry cohortEntry = getAndRemoveCohortEntry(new StringIdentifier(from.getTransactionID()));
+ CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
if(cohortEntry == null || cohortEntry.getTransaction() == null) {
return Collections.singletonList(from);
}
protected BatchedModifications getModifications() {
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(from.getTransactionID(),
- from.getVersion(), from.getTransactionChainID()));
+ newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
}
return newModifications.getLast();
protected BatchedModifications getModifications() {
if(newModifications.isEmpty() ||
newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(cohortEntry.getTransactionID().getString(),
- cohortEntry.getClientVersion(), ""));
+ newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+ cohortEntry.getClientVersion()));
}
return newModifications.getLast();
messages.addAll(newModifications);
if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
- messages.add(new CanCommitTransaction(cohortEntry.getTransactionID().getString(),
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion()));
}
if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
- messages.add(new CommitTransaction(cohortEntry.getTransactionID().getString(),
+ messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
cohortEntry.getClientVersion()));
}
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
- private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+ private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
private final TipProducingDataTree dataTree;
dataTree.setSchemaContext(schemaContext);
}
- private ShardDataTreeTransactionChain ensureTransactionChain(final String chainId) {
- ShardDataTreeTransactionChain chain = transactionChains.get(chainId);
+ private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+ ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
if (chain == null) {
- chain = new ShardDataTreeTransactionChain(chainId, this);
- transactionChains.put(chainId, chain);
+ chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
+ transactionChains.put(localHistoryIdentifier, chain);
}
return chain;
}
- ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId, final String chainId) {
- if (Strings.isNullOrEmpty(chainId)) {
+ ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
+ if (txId.getHistoryId().getHistoryId() == 0) {
return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
}
- return ensureTransactionChain(chainId).newReadOnlyTransaction(txId);
+ return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
}
- ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId, final String chainId) {
- if (Strings.isNullOrEmpty(chainId)) {
+ ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
+ if (txId.getHistoryId().getHistoryId() == 0) {
return new ReadWriteShardDataTreeTransaction(ShardDataTree.this, txId, dataTree.takeSnapshot()
.newModification());
}
- return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
+ return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
}
public void notifyListeners(final DataTreeCandidate candidate) {
transactionChains.clear();
}
- void closeTransactionChain(final String transactionChainId) {
+ void closeTransactionChain(final LocalHistoryIdentifier transactionChainId) {
final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
if (chain != null) {
chain.close();
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent {
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
private final ShardDataTree dataTree;
- private final String chainId;
+ private final LocalHistoryIdentifier chainId;
private ReadWriteShardDataTreeTransaction previousTx;
private ReadWriteShardDataTreeTransaction openTransaction;
private boolean closed;
- ShardDataTreeTransactionChain(final String chainId, final ShardDataTree dataTree) {
+ ShardDataTreeTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier, final ShardDataTree dataTree) {
this.dataTree = Preconditions.checkNotNull(dataTree);
- this.chainId = Preconditions.checkNotNull(chainId);
+ this.chainId = Preconditions.checkNotNull(localHistoryIdentifier);
}
private DataTreeSnapshot getSnapshot() {
}
}
- ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId) {
+ ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
final DataTreeSnapshot snapshot = getSnapshot();
LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
}
- ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId) {
+ ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
final DataTreeSnapshot snapshot = getSnapshot();
LOG.debug("Allocated read-write transaction {} snapshot {}", txId, snapshot);
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
ShardStats shardStats) {
super(shardActor, shardStats, transaction.getId());
- this.transaction = transaction;
+ this.transaction = Preconditions.checkNotNull(transaction);
}
@Override
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
* @author Thomas Pantelis
*/
class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+ private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
+ private static final FrontendType SNAPSHOT_READ = FrontendType.forName("snapshot-read");
+
private final ShardTransactionActorFactory transactionActorFactory;
+ private final LocalHistoryIdentifier applyHistoryId;
+ private final LocalHistoryIdentifier readHistoryId;
private final ShardDataTree store;
private final String logId;
private final Logger log;
- private int createSnapshotTransactionCounter;
+ private long applyCounter;
+ private long readCounter;
- ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
+ ShardSnapshotCohort(MemberName memberName, ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
Logger log, String logId) {
- this.transactionActorFactory = transactionActorFactory;
+ this.transactionActorFactory = Preconditions.checkNotNull(transactionActorFactory);
this.store = Preconditions.checkNotNull(store);
this.log = log;
this.logId = logId;
+
+ this.applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+ FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+ this.readHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+ FrontendIdentifier.create(memberName, SNAPSHOT_READ), 0), 0);
}
@Override
// after processing the CreateSnapshot message.
ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
- TransactionType.READ_ONLY, "createSnapshot" + ++createSnapshotTransactionCounter, "");
+ TransactionType.READ_ONLY, new TransactionIdentifier(readHistoryId, readCounter++));
createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
}
log.info("{}: Applying snapshot", logId);
try {
- ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
+ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
+ new TransactionIdentifier(applyHistoryId, applyCounter++));
NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
import akka.japi.Creator;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
private final ActorRef shardActor;
private final ShardStats shardStats;
- private final String transactionID;
+ private final TransactionIdentifier transactionID;
- protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID) {
+ protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, TransactionIdentifier transactionID) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.shardStats = shardStats;
return shardActor;
}
- protected String getTransactionID() {
+ protected final TransactionIdentifier getTransactionID() {
return transactionID;
}
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
/**
this.shardActor = shardActor;
}
- ActorRef newShardTransaction(TransactionType type, String transactionID, String transactionChainID) {
+ private static String actorNameFor(final TransactionIdentifier txId) {
+ final LocalHistoryIdentifier historyId = txId.getHistoryId();
+ final ClientIdentifier clientId = historyId.getClientId();
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+
+ final StringBuilder sb = new StringBuilder("shard-");
+ sb.append(frontendId.getMemberName().getName()).append(':');
+ sb.append(frontendId.getClientType().getName()).append('@');
+ sb.append(clientId.getGeneration()).append(':');
+ if (historyId.getHistoryId() != 0) {
+ sb.append(historyId.getHistoryId()).append('-');
+ }
+
+ return sb.append(txId.getTransactionId()).toString();
+ }
+
+ ActorRef newShardTransaction(TransactionType type, TransactionIdentifier transactionID) {
final AbstractShardDataTreeTransaction<?> transaction;
switch (type) {
case READ_ONLY:
- transaction = dataTree.newReadOnlyTransaction(transactionID, transactionChainID);
+ transaction = dataTree.newReadOnlyTransaction(transactionID);
shardMBean.incrementReadOnlyTransactionCount();
break;
case READ_WRITE:
- transaction = dataTree.newReadWriteTransaction(transactionID, transactionChainID);
+ transaction = dataTree.newReadWriteTransaction(transactionID);
shardMBean.incrementReadWriteTransactionCount();
break;
case WRITE_ONLY:
- transaction = dataTree.newReadWriteTransaction(transactionID, transactionChainID);
+ transaction = dataTree.newReadWriteTransaction(transactionID);
shardMBean.incrementWriteOnlyTransactionCount();
break;
default:
}
return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean)
- .withDispatcher(txnDispatcherPath), "shard-" + transactionID);
+ .withDispatcher(txnDispatcherPath), actorNameFor(transactionID));
}
}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
}
private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit, short clientTxVersion) {
- String transactionID = getTransactionID();
+ TransactionIdentifier transactionID = getTransactionID();
LOG.debug("readyTransaction : {}", transactionID);
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
private final DataTreeModification transaction;
private final ShardDataTree dataTree;
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
private DataTreeCandidateTip candidate;
SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
- final String transactionId) {
+ final TransactionIdentifier transactionId) {
this.dataTree = Preconditions.checkNotNull(dataTree);
this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionId = transactionId;
+ this.transactionId = Preconditions.checkNotNull(transactionId);
}
@Override
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
@Override
- public Object newMessage(String transactionId, short version) {
+ public Object newMessage(TransactionIdentifier transactionId, short version) {
return new CommitTransaction(transactionId, version).toSerializable();
}
private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
@Override
- public Object newMessage(String transactionId, short version) {
+ public Object newMessage(TransactionIdentifier transactionId, short version) {
return new AbortTransaction(transactionId, version).toSerializable();
}
private final ActorContext actorContext;
private final List<CohortInfo> cohorts;
private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
private volatile OperationCallback commitOperationCallback;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts, String transactionId) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts,
+ TransactionIdentifier transactionId) {
this.actorContext = actorContext;
this.cohorts = cohorts;
- this.transactionId = transactionId;
+ this.transactionId = Preconditions.checkNotNull(transactionId);
if(cohorts.isEmpty()) {
cohortsResolvedFuture.set(null);
}
private interface MessageSupplier {
- Object newMessage(String transactionId, short version);
+ Object newMessage(TransactionIdentifier transactionId, short version);
boolean isSerializedReplyType(Object reply);
}
}
this.parent = parent;
}
- public String getTransactionChainId() {
- return getHistoryId().toString();
- }
-
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
currentState.checkReady();
getActorContext().broadcast(new Function<Short, Object>() {
@Override
public Object apply(Short version) {
- return new CloseTransactionChain(getHistoryId().toString(), version).toSerializable();
+ return new CloseTransactionChain(getHistoryId(), version).toSerializable();
}
});
}
LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
} else {
previousTransactionId = "";
- LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
+ LOG.debug("Waiting for ready futures on chain {}", getHistoryId());
}
previous = combineFutureWithPossiblePriorReadOnlyTxFutures(previous, txId);
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
-import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
}
- return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts,
- TransactionIdentifierUtils.actorNameFor(getIdentifier()));
+ return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier());
}
private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
}
BatchedModifications newBatchedModifications() {
- BatchedModifications modifications = new BatchedModifications(TransactionIdentifierUtils.actorNameFor(
- new TransactionIdentifier(historyId, ++transactionIDCounter)), DataStoreVersions.CURRENT_VERSION, "");
+ BatchedModifications modifications = new BatchedModifications(
+ new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
modifications.setDoCommitOnReady(true);
modifications.setReady(true);
modifications.setTotalMessagesSent(1);
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
public class AbortTransaction extends AbstractThreePhaseCommitMessage {
private static final long serialVersionUID = 1L;
public AbortTransaction() {
}
- public AbortTransaction(String transactionID, final short version) {
+ public AbortTransaction(TransactionIdentifier transactionID, final short version) {
super(transactionID, version);
}
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
* Base class for a 3PC message.
public abstract class AbstractThreePhaseCommitMessage extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
- private String transactionID;
+ private TransactionIdentifier transactionID;
protected AbstractThreePhaseCommitMessage() {
}
- protected AbstractThreePhaseCommitMessage(final String transactionID, final short version) {
+ protected AbstractThreePhaseCommitMessage(final TransactionIdentifier transactionID, final short version) {
super(version);
this.transactionID = Preconditions.checkNotNull(transactionID);
}
- public String getTransactionID() {
+ public TransactionIdentifier getTransactionID() {
return transactionID;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- transactionID = in.readUTF();
+ transactionID = TransactionIdentifier.readFrom(in);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(transactionID);
+ transactionID.writeTo(out);
}
@Override
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
/**
private boolean ready;
private boolean doCommitOnReady;
private int totalMessagesSent;
- private String transactionID;
- private String transactionChainID;
+ private TransactionIdentifier transactionID;
public BatchedModifications() {
}
- public BatchedModifications(String transactionID, short version, String transactionChainID) {
+ public BatchedModifications(TransactionIdentifier transactionID, short version) {
super(version);
this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null");
- this.transactionChainID = transactionChainID != null ? transactionChainID : "";
}
public boolean isReady() {
this.totalMessagesSent = totalMessagesSent;
}
- public String getTransactionID() {
+ public TransactionIdentifier getTransactionID() {
return transactionID;
}
- public String getTransactionChainID() {
- return transactionChainID;
- }
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- transactionID = in.readUTF();
- transactionChainID = in.readUTF();
+ transactionID = TransactionIdentifier.readFrom(in);
ready = in.readBoolean();
totalMessagesSent = in.readInt();
doCommitOnReady = in.readBoolean();
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(transactionID);
- out.writeUTF(transactionChainID);
+ transactionID.writeTo(out);
out.writeBoolean(ready);
out.writeInt(totalMessagesSent);
out.writeBoolean(doCommitOnReady);
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
- .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
- .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
- .append("]");
+ builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
+ .append(", totalMessagesSent=").append(totalMessagesSent).append(", modifications size=")
+ .append(getModifications().size()).append("]");
return builder.toString();
}
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
public class CanCommitTransaction extends AbstractThreePhaseCommitMessage {
private static final long serialVersionUID = 1L;
public CanCommitTransaction() {
}
- public CanCommitTransaction(String transactionID, final short version) {
+ public CanCommitTransaction(TransactionIdentifier transactionID, final short version) {
super(transactionID, version);
}
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
public class CloseTransactionChain extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
- private String transactionChainId;
+ private LocalHistoryIdentifier transactionChainId;
public CloseTransactionChain() {
}
- public CloseTransactionChain(final String transactionChainId, final short version) {
+ public CloseTransactionChain(final LocalHistoryIdentifier transactionChainId, final short version) {
super(version);
- this.transactionChainId = transactionChainId;
+ this.transactionChainId = Preconditions.checkNotNull(transactionChainId);
}
- public String getTransactionChainId() {
+ public LocalHistoryIdentifier getTransactionChainId() {
return transactionChainId;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- transactionChainId = in.readUTF();
+ transactionChainId = LocalHistoryIdentifier.readFrom(in);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(transactionChainId);
+ transactionChainId.writeTo(out);
}
public static CloseTransactionChain fromSerializable(final Object serializable){
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
public class CommitTransaction extends AbstractThreePhaseCommitMessage {
private static final long serialVersionUID = 1L;
public CommitTransaction() {
}
- public CommitTransaction(String transactionID, final short version) {
+ public CommitTransaction(TransactionIdentifier transactionID, final short version) {
super(transactionID, version);
}
public static CommitTransaction fromSerializable(Object serializable) {
Preconditions.checkArgument(serializable instanceof CommitTransaction);
return (CommitTransaction)serializable;
-
}
public static boolean isSerializedType(Object message) {
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
public class CreateTransaction extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
- private String transactionId;
+ private TransactionIdentifier transactionId;
private int transactionType;
- private String transactionChainId;
public CreateTransaction() {
}
- public CreateTransaction(String transactionId, int transactionType, String transactionChainId,
- short version) {
+ public CreateTransaction(TransactionIdentifier transactionId, int transactionType, short version) {
super(version);
this.transactionId = Preconditions.checkNotNull(transactionId);
this.transactionType = transactionType;
- this.transactionChainId = transactionChainId != null ? transactionChainId : "";
}
- public String getTransactionId() {
+ public TransactionIdentifier getTransactionId() {
return transactionId;
}
return transactionType;
}
- public String getTransactionChainId() {
- return transactionChainId;
- }
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- transactionId = in.readUTF();
+ transactionId = TransactionIdentifier.readFrom(in);
transactionType = in.readInt();
- transactionChainId = in.readUTF();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(transactionId);
+ transactionId.writeTo(out);
out.writeInt(transactionType);
- out.writeUTF(transactionChainId);
}
@Override
public String toString() {
- return "CreateTransaction [transactionId=" + transactionId + ", transactionType=" + transactionType
- + ", transactionChainId=" + transactionChainId + "]";
+ return "CreateTransaction [transactionId=" + transactionId + ", transactionType=" + transactionType + "]";
}
public static CreateTransaction fromSerializable(Object message) {
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
public class CreateTransactionReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
private String transactionPath;
- private String transactionId;
+ private TransactionIdentifier transactionId;
public CreateTransactionReply() {
}
- public CreateTransactionReply(final String transactionPath, final String transactionId, final short version) {
+ public CreateTransactionReply(final String transactionPath, final TransactionIdentifier transactionId,
+ final short version) {
super(version);
- this.transactionPath = transactionPath;
- this.transactionId = transactionId;
+ this.transactionPath = Preconditions.checkNotNull(transactionPath);
+ this.transactionId = Preconditions.checkNotNull(transactionId);
}
public String getTransactionPath() {
return transactionPath;
}
- public String getTransactionId() {
+ public TransactionIdentifier getTransactionId() {
return transactionId;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- transactionId = in.readUTF();
+ transactionId = TransactionIdentifier.readFrom(in);
transactionPath = in.readUTF();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(transactionId);
+ transactionId.writeTo(out);
out.writeUTF(transactionPath);
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ReadWriteShardDataTreeTransaction;
/**
* @author Thomas Pantelis
*/
public class ForwardedReadyTransaction {
- private final String transactionID;
+ private final TransactionIdentifier transactionID;
private final ReadWriteShardDataTreeTransaction transaction;
private final boolean doImmediateCommit;
private final short txnClientVersion;
- public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
+ public ForwardedReadyTransaction(TransactionIdentifier transactionID, short txnClientVersion,
ReadWriteShardDataTreeTransaction transaction, boolean doImmediateCommit) {
this.transactionID = Preconditions.checkNotNull(transactionID);
this.transaction = Preconditions.checkNotNull(transaction);
this.doImmediateCommit = doImmediateCommit;
}
- public String getTransactionID() {
+ public TransactionIdentifier getTransactionID() {
return transactionID;
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
*/
public final class ReadyLocalTransaction {
private final DataTreeModification modification;
- private final String transactionID;
+ private final TransactionIdentifier transactionID;
private final boolean doCommitOnReady;
// The version of the remote system used only when needing to convert to BatchedModifications.
private short remoteVersion = DataStoreVersions.CURRENT_VERSION;
- public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) {
+ public ReadyLocalTransaction(final TransactionIdentifier transactionID, final DataTreeModification modification,
+ final boolean doCommitOnReady) {
this.transactionID = Preconditions.checkNotNull(transactionID);
this.modification = Preconditions.checkNotNull(modification);
this.doCommitOnReady = doCommitOnReady;
}
- public String getTransactionID() {
+ public TransactionIdentifier getTransactionID() {
return transactionID;
}
Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
final ReadyLocalTransaction readyLocal = (ReadyLocalTransaction) obj;
final BatchedModifications batched = new BatchedModifications(readyLocal.getTransactionID(),
- readyLocal.getRemoteVersion(), "");
+ readyLocal.getRemoteVersion());
batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady());
batched.setTotalMessagesSent(1);
batched.setReady(true);
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-
-public final class TransactionIdentifierUtils {
- private TransactionIdentifierUtils() {
- throw new UnsupportedOperationException();
- }
-
- public static String actorNameFor(final TransactionIdentifier txId) {
- final LocalHistoryIdentifier historyId = txId.getHistoryId();
- final ClientIdentifier clientId = historyId.getClientId();
- final FrontendIdentifier frontendId = clientId.getFrontendId();
-
- final StringBuilder sb = new StringBuilder();
- sb.append(frontendId.getMemberName().getName()).append(':');
- sb.append(frontendId.getClientType().getName()).append('@');
- sb.append(clientId.getGeneration()).append(':');
- if (historyId.getHistoryId() != 0) {
- sb.append(historyId.getHistoryId()).append('-');
- }
-
- return sb.append(txId.getTransactionId()).toString();
- }
-}
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-
+import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import java.io.IOException;
-
-public abstract class AbstractActorTest {
+public abstract class AbstractActorTest extends AbstractTest {
private static ActorSystem system;
@BeforeClass
public static void setUpClass() throws IOException {
-
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
}
system = null;
}
- protected ActorSystem getSystem() {
+ protected static ActorSystem getSystem() {
return system;
}
}
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
final MutableCompositeModification modification,
final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
- final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
+ final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction(nextTransactionId());
tx.getSnapshot().write(path, data);
final ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
}
protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
- String transactionID,
- MutableCompositeModification modification,
- boolean doCommitOnReady) {
+ TransactionIdentifier transactionID, MutableCompositeModification modification,
+ boolean doCommitOnReady) {
if(remoteReadWriteTransaction){
return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION,
doCommitOnReady);
}
protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort,
- String transactionID, short version, boolean doCommitOnReady) {
+ TransactionIdentifier transactionID, short version, boolean doCommitOnReady) {
return new ForwardedReadyTransaction(transactionID, version,
new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID,
mock(DataTreeModification.class)), doCommitOnReady);
}
protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort,
- String transactionID,
- MutableCompositeModification modification) {
+ TransactionIdentifier transactionID, MutableCompositeModification modification) {
return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false);
}
});
}
- protected BatchedModifications prepareBatchedModifications(String transactionID,
+ protected BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID,
MutableCompositeModification modification) {
return prepareBatchedModifications(transactionID, modification, false);
}
- private static BatchedModifications prepareBatchedModifications(String transactionID,
+ private static BatchedModifications prepareBatchedModifications(TransactionIdentifier transactionID,
MutableCompositeModification modification,
boolean doCommitOnReady) {
- final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+ final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION);
batchedModifications.addModification(modification);
batchedModifications.setReady(true);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
return store.takeSnapshot().readNode(id).orNull();
}
- public static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
+ public void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
- Future<Object> future = Patterns.ask(shard, newBatchedModifications("tx", id, node, true, true, 1),
+ Future<Object> future = Patterns.ask(shard, newBatchedModifications(nextTransactionId(), id, node, true, true, 1),
new Timeout(5, TimeUnit.SECONDS));
try {
Await.ready(future, Duration.create(5, TimeUnit.SECONDS));
public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
- final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
+ final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId());
transaction.getSnapshot().write(id, node);
final ShardDataTreeCohort cohort = transaction.ready();
cohort.commit();
}
- public static void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
+ public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
- final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
+ final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId());
transaction.getSnapshot().merge(id, node);
final ShardDataTreeCohort cohort = transaction.ready();
return DataTreeCandidatePayload.create(candidate);
}
- static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
- return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
- }
-
- static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
+ static BatchedModifications newBatchedModifications(final TransactionIdentifier transactionID,
final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
final int messagesSent) {
- final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
+ final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
batched.addModification(new WriteModification(path, data));
batched.setReady(ready);
batched.setDoCommitOnReady(doCommitOnReady);
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public abstract class AbstractTest {
+ protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
+ private static final FrontendType FRONTEND_TYPE = FrontendType.forName(ShardTransactionTest.class.getSimpleName());
+
+ protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
+
+ private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
+ private static final LocalHistoryIdentifier HISTORY_ID = new LocalHistoryIdentifier(CLIENT_ID, 0);
+ private static final AtomicLong HISTORY_COUNTER = new AtomicLong();
+ private static final AtomicLong TX_COUNTER = new AtomicLong();
+
+ protected static TransactionIdentifier nextTransactionId() {
+ return new TransactionIdentifier(HISTORY_ID, TX_COUNTER.getAndIncrement());
+ }
+
+ protected static LocalHistoryIdentifier nextHistoryId() {
+ return new LocalHistoryIdentifier(CLIENT_ID, HISTORY_COUNTER.incrementAndGet());
+ }
+}
*
* @author Thomas Pantelis
*/
-public abstract class AbstractTransactionProxyTest {
+public abstract class AbstractTransactionProxyTest extends AbstractTest {
protected final Logger log = LoggerFactory.getLogger(getClass());
private static ActorSystem system;
public boolean matches(Object argument) {
if(CreateTransaction.class.equals(argument.getClass())) {
CreateTransaction obj = CreateTransaction.fromSerializable(argument);
- return obj.getTransactionId().startsWith(memberName + ':') &&
+ return obj.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName().getName().equals(memberName) &&
obj.getTransactionType() == type.ordinal();
}
}
protected CreateTransactionReply createTransactionReply(ActorRef actorRef, short transactionVersion){
- return new CreateTransactionReply(actorRef.path().toString(), "txn-1", transactionVersion);
+ return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
}
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
*
* @author Thomas Pantelis
*/
-public class DistributedDataStoreRemotingIntegrationTest {
+public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
private static final String[] CARS = {"cars"};
private final DatastoreContext.Builder followerDatastoreContextBuilder =
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+ private final TransactionIdentifier tx1 = nextTransactionId();
+ private final TransactionIdentifier tx2 = nextTransactionId();
private DistributedDataStore followerDistributedDataStore;
private DistributedDataStore leaderDistributedDataStore;
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true);
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
- readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
+ readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
- ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
+ ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
- forwardedReady = new ForwardedReadyTransaction("tx-2",
+ forwardedReady = new ForwardedReadyTransaction(tx2,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), false);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
leaderDistributedDataStore.getActorContext(), Arrays.asList(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
+ new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
cohort.commit().get(5, TimeUnit.SECONDS);
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class ShardDataTreeTest {
+public class ShardDataTreeTest extends AbstractTest {
SchemaContext fullSchema;
assertEquals(fullSchema, shardDataTree.getSchemaContext());
- ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction("txn-1", null);
+ ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
DataTreeModification snapshot = transaction.getSnapshot();
cohort.commit().get();
- ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction("txn-2", null);
+ ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
}
private static NormalizedNode<?, ?> getCars(ShardDataTree shardDataTree) {
- ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction("txn-2", null);
+ ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction = shardDataTree.newReadOnlyTransaction(nextTransactionId());
DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
assertEquals(true, optional.isPresent());
- System.out.println(optional.get());
-
return optional.get();
}
private static DataTreeCandidateTip addCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
- return doTransaction(shardDataTree, new DataTreeOperation() {
- @Override
- public void execute(DataTreeModification snapshot) {
+ return doTransaction(shardDataTree, snapshot -> {
snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
snapshot.write(CarsModel.newCarPath("altima"), CarsModel.newCarEntry("altima", new BigInteger("100")));
- }
- });
+ });
}
private static DataTreeCandidateTip removeCar(ShardDataTree shardDataTree) throws ExecutionException, InterruptedException {
- return doTransaction(shardDataTree, new DataTreeOperation() {
- @Override
- public void execute(DataTreeModification snapshot) {
- snapshot.delete(CarsModel.newCarPath("altima"));
- }
- });
+ return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
}
- private abstract static class DataTreeOperation {
- public abstract void execute(DataTreeModification snapshot);
+ @FunctionalInterface
+ private static interface DataTreeOperation {
+ void execute(DataTreeModification snapshot);
}
private static DataTreeCandidateTip doTransaction(ShardDataTree shardDataTree, DataTreeOperation operation)
throws ExecutionException, InterruptedException {
- ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction("txn-1", null);
+ ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
DataTreeModification snapshot = transaction.getSnapshot();
operation.execute(snapshot);
ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
private static DataTreeCandidateTip applyCandidates(ShardDataTree shardDataTree, List<DataTreeCandidateTip> candidates)
throws ExecutionException, InterruptedException {
- ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction("txn-1", null);
+ ReadWriteShardDataTreeTransaction transaction = shardDataTree.newReadWriteTransaction(nextTransactionId());
DataTreeModification snapshot = transaction.getSnapshot();
for(DataTreeCandidateTip candidateTip : candidates){
DataTreeCandidates.applyToModification(snapshot, candidateTip);
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
+ shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
final String path = reply.getTransactionPath().toString();
assertTrue("Unexpected transaction path " + path,
- path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
+ path.startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
}};
}
waitUntilLeader(shard);
- shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
+ shard.tell(new CreateTransaction(nextTransactionId(),TransactionType.READ_ONLY.ordinal(),
DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
final String path = reply.getTransactionPath().toString();
assertTrue("Unexpected transaction path " + path,
- path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+ path.startsWith("akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
}};
}
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
modification2);
- final String transactionID3 = "tx3";
+ final TransactionIdentifier transactionID3 = nextTransactionId();
final MutableCompositeModification modification3 = new MutableCompositeModification();
final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
}
class OnCanCommitFutureComplete extends OnFutureComplete {
- private final String transactionID;
+ private final TransactionIdentifier transactionID;
- OnCanCommitFutureComplete(final String transactionID) {
+ OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
super(CanCommitTransactionReply.class);
this.transactionID = transactionID;
}
waitUntilLeader(shard);
- final String transactionID = "tx";
+ final TransactionIdentifier transactionID = nextTransactionId();
final FiniteDuration duration = duration("5 seconds");
final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
waitUntilLeader(shard);
- final String transactionID = "tx";
+ final TransactionIdentifier transactionID = nextTransactionId();
final FiniteDuration duration = duration("5 seconds");
final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
waitUntilLeader(shard);
- final String transactionID = "tx1";
- final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+ final TransactionIdentifier transactionID = nextTransactionId();
+ final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
batched.setReady(true);
batched.setTotalMessagesSent(2);
// Test merge with invalid data. An exception should occur when the merge is applied. Note that
// write will not validate the children for performance reasons.
- String transactionID = "tx1";
+ TransactionIdentifier transactionID = nextTransactionId();
ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
- BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
+ BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
shard.tell(batched, getRef());
Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
Throwable cause = failure.cause();
- batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+ batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
batched.setReady(true);
batched.setTotalMessagesSent(2);
waitUntilLeader(shard);
- final String transactionChainID = "txChain";
- final String transactionID1 = "tx1";
- final String transactionID2 = "tx2";
+ final LocalHistoryIdentifier historyId = nextHistoryId();
+ final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
+ final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
final FiniteDuration duration = duration("5 seconds");
final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
- shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
- containerNode, true, false, 1), getRef());
+ shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Create a read Tx on the same chain.
shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
- transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
+ DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
overrideLeaderCalls.set(true);
- final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+ final BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
shard.tell(batched, ActorRef.noSender());
waitUntilNoLeader(shard);
- shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
+ final TransactionIdentifier txId = nextTransactionId();
+ shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
Failure failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
+ shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId,
DataStoreVersions.CURRENT_VERSION, true), getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
- shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
+ shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
failure = expectMsgClass(Failure.class);
assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
}};
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final String transactionID = "tx1";
+ final TransactionIdentifier transactionID = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
- final String txId = "tx1";
+ final TransactionIdentifier txId = nextTransactionId();
modification.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
- final String txId = "tx1";
- modification.ready();
+ final TransactionIdentifier txId = nextTransactionId();
+ modification.ready();
final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
shard.tell(readyMessage, getRef());
// Setup a simulated transactions with a mock cohort.
- final String transactionID = "tx";
+ final TransactionIdentifier transactionID = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
waitUntilLeader(shard);
- final String transactionID = "tx1";
+ final TransactionIdentifier transactionID = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
waitUntilLeader(shard);
- final String transactionID = "tx1";
+ final TransactionIdentifier transactionID = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY));
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
// Setup 2 simulated transactions with mock cohorts. The first one fails in the
// commit phase.
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
waitUntilLeader(shard);
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
final FiniteDuration duration = duration("5 seconds");
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
reset(cohort);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
final FiniteDuration duration = duration("5 seconds");
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
reset(cohort);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
final FiniteDuration duration = duration("5 seconds");
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
reset(cohort);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).commit();
final FiniteDuration duration = duration("5 seconds");
- final String transactionID = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
- shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
+ shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
reset(cohort);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).commit();
final FiniteDuration duration = duration("5 seconds");
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final String transactionID = "tx1";
+ final TransactionIdentifier transactionID = nextTransactionId();
final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
cohort -> {
final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
// Create 1st Tx - will timeout
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
// Create 2nd Tx
- final String transactionID2 = "tx3";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
modification2);
- final String transactionID3 = "tx3";
+ final TransactionIdentifier transactionID3 = nextTransactionId();
final MutableCompositeModification modification3 = new MutableCompositeModification();
final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
- final String transactionID3 = "tx3";
+ final TransactionIdentifier transactionID3 = nextTransactionId();
final MutableCompositeModification modification3 = new MutableCompositeModification();
final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
// Ready the second Tx.
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
// Ready the third Tx.
- final String transactionID3 = "tx3";
+ final TransactionIdentifier transactionID3 = nextTransactionId();
final DataTreeModification modification3 = dataStore.newModification();
new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
.apply(modification3);
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testCanCommitBeforeReadyFailure");
- shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
+ shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
}};
}
// Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
- final String transactionID1 = "tx1";
+ final TransactionIdentifier transactionID1 = nextTransactionId();
final MutableCompositeModification modification1 = new MutableCompositeModification();
final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
- final String transactionID2 = "tx2";
+ final TransactionIdentifier transactionID2 = nextTransactionId();
final MutableCompositeModification modification2 = new MutableCompositeModification();
final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
waitUntilLeader(shard);
- final String transactionID = "tx1";
-
+ final TransactionIdentifier transactionID = nextTransactionId();
final MutableCompositeModification modification = new MutableCompositeModification();
final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
doReturn(Futures.immediateFuture(null)).when(cohort).abort();
public void testNegativeReadWithReadOnlyTransactionClosed() throws Throwable {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction("test-txn", null), shard,
+ final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction(nextTransactionId()), shard,
datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
public void testNegativeReadWithReadWriteTransactionClosed() throws Throwable {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction(nextTransactionId()), shard,
datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
public void testNegativeExistsWithReadWriteTransactionClosed() throws Throwable {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction(nextTransactionId()), shard,
datastoreContext, shardStats);
final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
private static final TransactionType WO = TransactionType.WRITE_ONLY;
private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.create("inventory", MemberName.forName("member-1"), "config");
+ ShardIdentifier.create("inventory", MEMBER_NAME, "config");
+
private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
- private int txCounter = 0;
-
private ActorRef createShard() {
ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
schemaContext(TestModel.createTestContext()).props());
}
private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
- return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
+ return store.newReadOnlyTransaction(nextTransactionId());
}
private ReadWriteShardDataTreeTransaction readWriteTransaction() {
- return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
+ return store.newReadWriteTransaction(nextTransactionId());
}
@Test
ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
- ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+ nextTransactionId(), mockModification);
final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
YangInstanceIdentifier writePath = TestModel.TEST_PATH;
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ final TransactionIdentifier tx1 = nextTransactionId();
+ BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
batched.addModification(new WriteModification(writePath, writeData));
transaction.tell(batched, getRef());
BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
assertEquals("getNumBatched", 1, reply.getNumBatched());
- batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
batched.setReady(true);
batched.setTotalMessagesSent(2);
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
batched.addModification(new WriteModification(writePath, writeData));
batched.setReady(true);
batched.setDoCommitOnReady(true);
ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
- ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
+ nextTransactionId(), mockModification);
final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
"testOnReceiveBatchedModificationsFailure");
doThrow(new TestException()).when(mockModification).write(path, node);
- BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ final TransactionIdentifier tx1 = nextTransactionId();
+ BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
batched.addModification(new WriteModification(path, node));
transaction.tell(batched, getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
- batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
batched.setReady(true);
batched.setTotalMessagesSent(2);
JavaTestKit watcher = new JavaTestKit(getSystem());
watcher.watch(transaction);
- BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
batched.setReady(true);
batched.setTotalMessagesSent(2);
datastoreContext, shardStats);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
+ transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION),
ActorRef.noSender());
}
*
* @author Thomas Pantelis
*/
-public class SimpleShardDataTreeCohortTest {
+public class SimpleShardDataTreeCohortTest extends AbstractTest {
@Mock
private TipProducingDataTree mockDataTree;
doReturn(mockDataTree).when(mockShardDataTree).getDataTree();
- cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, "tx");
+ cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId());
}
@Test
import akka.testkit.TestActorRef;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+ private final TransactionIdentifier tx = nextTransactionId();
+
@Before
public void setUp() {
@Test
public void testCanCommitYesWithOneCohort() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
- CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
verifyCanCommit(proxy.canCommit(), true);
verifyCohortActors();
@Test
public void testCanCommitNoWithOneCohort() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
- CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
verifyCanCommit(proxy.canCommit(), false);
verifyCohortActors();
@Test
public void testCanCommitYesWithTwoCohorts() throws Exception {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.yes(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.yes(CURRENT_VERSION))));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), true);
verifyCohortActors();
@Test
public void testCanCommitNoWithThreeCohorts() throws Exception {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.yes(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
CanCommitTransactionReply.no(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1")));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), false);
verifyCohortActors();
@Test(expected = TestException.class)
public void testCanCommitWithExceptionFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = IllegalArgumentException.class)
public void testCanCommitWithInvalidResponseType() throws Throwable {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = TestException.class)
public void testCanCommitWithFailedCohortFuture() throws Throwable {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1")),
+ newCohortInfo(new CohortActor.Builder(tx)),
newCohortInfoWithFailedFuture(new TestException()),
- newCohortInfo(new CohortActor.Builder("txn-1")));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
propagateExecutionExceptionCause(proxy.canCommit());
}
@Test
public void testAllThreePhasesSuccessful() throws Exception {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
@Test(expected = TestException.class)
public void testCommitWithExceptionFailure() throws Throwable {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
expectCommit(new TestException())));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
@Test(expected = IllegalArgumentException.class)
public void testCommitWithInvalidResponseType() throws Throwable {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").
+ newCohortInfo(new CohortActor.Builder(tx).
expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
- expectCommit("invalid"))), "txn-1");
+ expectCommit("invalid"))), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
@Test
public void testAbort() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
- AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectAbort(
+ AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
verifySuccessfulFuture(proxy.abort());
verifyCohortActors();
@Test
public void testAbortWithFailure() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
+ newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
// The exception should not get propagated.
verifySuccessfulFuture(proxy.abort());
@Test
public void testAbortWithFailedCohortFuture() throws Throwable {
List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfoWithFailedFuture(new TestException()),
- newCohortInfo(new CohortActor.Builder("txn-1")));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
verifySuccessfulFuture(proxy.abort());
verifyCohortActors();
@Test
public void testWithNoCohorts() throws Exception {
ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
- Collections.<CohortInfo>emptyList(), "txn-1");
+ Collections.<CohortInfo>emptyList(), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
private Object canCommitReply;
private Object commitReply;
private Object abortReply;
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
- Builder(String transactionId) {
- this.transactionId = transactionId;
+ Builder(TransactionIdentifier transactionId) {
+ this.transactionId = Preconditions.checkNotNull(transactionId);
}
Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
String actorPath = txActorRef.path().toString();
- CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, "txn-1",
+ CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
DataStoreVersions.CURRENT_VERSION);
doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
}
private static BatchedModifications newBatchedModifications() {
- BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
+ BatchedModifications modifications = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
modifications.setDoCommitOnReady(true);
modifications.setReady(true);
modifications.setTotalMessagesSent(1);
volatile boolean dropAppendEntries;
private final String myId;
+ @SuppressWarnings("unused")
public MockFollower(String myId) {
this(myId, true);
}
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.MockIdentifiers;
/**
* Unit tests for AbortTransaction.
@Test
public void testSerialization() {
- AbortTransaction expected = new AbortTransaction("txId", DataStoreVersions.CURRENT_VERSION);
+ AbortTransaction expected = new AbortTransaction(
+ MockIdentifiers.transactionIdentifier(AbortTransactionTest.class, "mock"), DataStoreVersions.CURRENT_VERSION);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", AbortTransaction.class, serialized.getClass());
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
*
* @author Thomas Pantelis
*/
-public class BatchedModificationsTest {
+public class BatchedModificationsTest extends AbstractTest {
@Test
public void testSerialization() {
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain");
+ final TransactionIdentifier tx1 = nextTransactionId();
+ BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
(Serializable) batched.toSerializable());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
- assertEquals("getTransactionID", "tx1", clone.getTransactionID());
- assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
+ assertEquals("getTransactionID", tx1, clone.getTransactionID());
assertEquals("isReady", true, clone.isReady());
assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
assertEquals("getPath", deletePath, delete.getPath());
// Test with different params.
-
- batched = new BatchedModifications("tx2", (short)10000, null);
+ final TransactionIdentifier tx2 = nextTransactionId();
+ batched = new BatchedModifications(tx2, (short)10000);
clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
- assertEquals("getTransactionID", "tx2", clone.getTransactionID());
- assertEquals("getTransactionChainID", "", clone.getTransactionChainID());
+ assertEquals("getTransactionID", tx2, clone.getTransactionID());
assertEquals("isReady", false, clone.isReady());
assertEquals("getModifications size", 0, clone.getModifications().size());
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
*
* @author Thomas Pantelis
*/
-public class CanCommitTransactionTest {
+public class CanCommitTransactionTest extends AbstractTest {
@Test
public void testSerialization() {
- CanCommitTransaction expected = new CanCommitTransaction("txId", DataStoreVersions.CURRENT_VERSION);
+ CanCommitTransaction expected = new CanCommitTransaction(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", CanCommitTransaction.class, serialized.getClass());
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
*
* @author Thomas Pantelis
*/
-public class CloseTransactionChainTest {
+public class CloseTransactionChainTest extends AbstractTest {
@Test
public void testSerialization() {
- CloseTransactionChain expected = new CloseTransactionChain("txId", DataStoreVersions.CURRENT_VERSION);
+ CloseTransactionChain expected = new CloseTransactionChain(nextHistoryId(), DataStoreVersions.CURRENT_VERSION);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", CloseTransactionChain.class, serialized.getClass());
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
*
* @author Thomas Pantelis
*/
-public class CommitTransactionTest {
+public class CommitTransactionTest extends AbstractTest {
@Test
public void testSerialization() {
- CommitTransaction expected = new CommitTransaction("txId", DataStoreVersions.CURRENT_VERSION);
+ CommitTransaction expected = new CommitTransaction(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", CommitTransaction.class, serialized.getClass());
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
*
* @author Thomas Pantelis
*/
-public class CreateTransactionReplyTest {
+public class CreateTransactionReplyTest extends AbstractTest {
@Test
public void testSerialization() {
- CreateTransactionReply expected = new CreateTransactionReply("txPath", "txId", DataStoreVersions.CURRENT_VERSION);
+ CreateTransactionReply expected = new CreateTransactionReply("txPath", nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", CreateTransactionReply.class, serialized.getClass());
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
*
* @author Thomas Pantelis
*/
-public class CreateTransactionTest {
+public class CreateTransactionTest extends AbstractTest {
@Test
public void testSerialization() {
- CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", DataStoreVersions.CURRENT_VERSION);
+ CreateTransaction expected = new CreateTransaction(nextTransactionId(), 2, DataStoreVersions.CURRENT_VERSION);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", CreateTransaction.class, serialized.getClass());
SerializationUtils.clone((Serializable) serialized));
assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
- assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
}
@Test
public void testSerializationWithNewerVersion() {
short newerVersion = DataStoreVersions.CURRENT_VERSION + (short)1;
- CreateTransaction expected = new CreateTransaction("txId", 2, "chainId", newerVersion);
+ CreateTransaction expected = new CreateTransaction(nextTransactionId(), 2, newerVersion);
Object serialized = expected.toSerializable();
assertEquals("Serialized type", CreateTransaction.class, serialized.getClass());
SerializationUtils.clone((Serializable) serialized));
assertEquals("getTransactionId", expected.getTransactionId(), actual.getTransactionId());
assertEquals("getTransactionType", expected.getTransactionType(), actual.getTransactionType());
- assertEquals("getTransactionChainId", expected.getTransactionChainId(), actual.getTransactionChainId());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
}
import static org.junit.Assert.assertNotNull;
import java.util.List;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
*
* @author Thomas Pantelis
*/
-public class ReadyLocalTransactionSerializerTest {
+public class ReadyLocalTransactionSerializerTest extends AbstractTest {
@Test
public void testToAndFromBinary() {
MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
- String txId = "tx-id";
+ TransactionIdentifier txId = nextTransactionId();
ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
ReadyLocalTransactionSerializer serializer = new ReadyLocalTransactionSerializer();