+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-
-@Deprecated(since = "9.0.0", forRemoval = true)
-final class CohortEntry {
- private final ReadWriteShardDataTreeTransaction transaction;
- private final TransactionIdentifier transactionId;
- private final short clientVersion;
-
- private RuntimeException lastBatchedModificationsException;
- private int totalBatchedModificationsReceived;
- private int totalOperationsProcessed;
- private ShardDataTreeCohort cohort;
- private boolean doImmediateCommit;
- private ActorRef replySender;
- private Shard shard;
-
- private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
- cohort = null;
- this.transaction = requireNonNull(transaction);
- transactionId = transaction.getIdentifier();
- this.clientVersion = clientVersion;
- }
-
- private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
- this.cohort = requireNonNull(cohort);
- transactionId = cohort.transactionId();
- transaction = null;
- this.clientVersion = clientVersion;
- }
-
- static CohortEntry createOpen(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
- return new CohortEntry(transaction, clientVersion);
- }
-
- static CohortEntry createReady(final ShardDataTreeCohort cohort, final short clientVersion) {
- return new CohortEntry(cohort, clientVersion);
- }
-
- TransactionIdentifier getTransactionId() {
- return transactionId;
- }
-
- short getClientVersion() {
- return clientVersion;
- }
-
- boolean isFailed() {
- return cohort != null && cohort.isFailed();
- }
-
- DataTreeModification getDataTreeModification() {
- return cohort.getDataTreeModification();
- }
-
- ReadWriteShardDataTreeTransaction getTransaction() {
- return transaction;
- }
-
- int getTotalBatchedModificationsReceived() {
- return totalBatchedModificationsReceived;
- }
-
- int getTotalOperationsProcessed() {
- return totalOperationsProcessed;
- }
-
- RuntimeException getLastBatchedModificationsException() {
- return lastBatchedModificationsException;
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Re-thrown")
- void applyModifications(final List<Modification> modifications) {
- totalBatchedModificationsReceived++;
- if (lastBatchedModificationsException == null) {
- totalOperationsProcessed += modifications.size();
- for (Modification modification : modifications) {
- try {
- modification.apply(transaction.getSnapshot());
- } catch (RuntimeException e) {
- lastBatchedModificationsException = e;
- throw e;
- }
- }
- }
- }
-
- void canCommit(final FutureCallback<Empty> callback) {
- cohort.canCommit(callback);
- }
-
- void preCommit(final FutureCallback<DataTreeCandidate> callback) {
- cohort.preCommit(callback);
- }
-
- void commit(final FutureCallback<UnsignedLong> callback) {
- cohort.commit(callback);
- }
-
- void abort(final FutureCallback<Empty> callback) {
- cohort.abort(callback);
- }
-
- void ready(final Optional<SortedSet<String>> participatingShardNames, final CohortDecorator cohortDecorator) {
- checkState(cohort == null, "cohort was already set");
-
- cohort = transaction.ready(participatingShardNames);
-
- if (cohortDecorator != null) {
- // Call the hook for unit tests.
- cohort = cohortDecorator.decorate(transactionId, cohort);
- }
- }
-
- boolean isSealed() {
- return cohort != null;
- }
-
- Optional<SortedSet<String>> getParticipatingShardNames() {
- return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty();
- }
-
- boolean isDoImmediateCommit() {
- return doImmediateCommit;
- }
-
- void setDoImmediateCommit(final boolean doImmediateCommit) {
- this.doImmediateCommit = doImmediateCommit;
- }
-
- ActorRef getReplySender() {
- return replySender;
- }
-
- void setReplySender(final ActorRef replySender) {
- this.replySender = replySender;
- }
-
- Shard getShard() {
- return shard;
- }
-
- void setShard(final Shard shard) {
- this.shard = shard;
- }
-
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("CohortEntry [transactionId=").append(transactionId).append(", doImmediateCommit=")
- .append(doImmediateCommit).append("]");
- return builder.toString();
- }
-}
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
/**
public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
-
private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
@Override
public int getTxCohortCacheSize() {
- return shard != null ? shard.getCohortCacheSize() : -1;
+ // FIXME: deprecate this?
+ return shard != null ? 0 : -1;
}
@Override
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.persistence.SnapshotOffer;
import org.apache.pekko.serialization.JavaSerializer;
-import org.apache.pekko.serialization.Serialization;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStatsMXBean;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
private DatastoreContext datastoreContext;
- @Deprecated(since = "9.0.0", forRemoval = true)
- private final ShardCommitCoordinator commitCoordinator;
-
private long transactionCommitTimeout;
private Cancellable txCommitTimeoutCheckSchedule;
private final MessageTracker appendEntriesReplyTracker;
- @Deprecated(since = "9.0.0", forRemoval = true)
- private final ShardTransactionActorFactory transactionActorFactory;
-
private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private ShardSnapshot restoreFromSnapshot;
- @Deprecated(since = "9.0.0", forRemoval = true)
- private final ShardTransactionMessageRetrySupport messageRetrySupport;
-
@VisibleForTesting
final FrontendMetadata frontendMetadata;
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
-
setTransactionCommitTimeout();
// create a notifier actor for each cluster member
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
dispatchers = new Dispatchers(context().system().dispatchers());
- transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
- dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction),
- self(), getContext(), shardMBean.shardStats(), builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
name, datastoreContext);
- messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
-
responseMessageSlicer = MessageSlicer.builder().logContext(name)
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
super.postStop();
- messageRetrySupport.close();
-
if (txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
- commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
-
shardMBean.unregisterMBean();
listenerInfoMXBean.unregister();
}
} else if (GetKnownClients.INSTANCE.equals(message)) {
handleGetKnownClients();
} else if (!responseMessageSlicer.handleMessage(message)) {
- // Ask-based protocol messages
- if (CreateTransaction.isSerializedType(message)) {
- handleCreateTransaction(message);
- } else if (message instanceof BatchedModifications request) {
- handleBatchedModifications(request);
- } else if (message instanceof ForwardedReadyTransaction request) {
- handleForwardedReadyTransaction(request);
- } else if (message instanceof ReadyLocalTransaction request) {
- handleReadyLocalTransaction(request);
- } else if (CanCommitTransaction.isSerializedType(message)) {
- handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (CommitTransaction.isSerializedType(message)) {
- handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (AbortTransaction.isSerializedType(message)) {
- handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (CloseTransactionChain.isSerializedType(message)) {
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
- messageRetrySupport.onTimerMessage(message);
- } else {
- super.handleNonRaftCommand(message);
- }
+ super.handleNonRaftCommand(message);
}
}
}
private void commitTimeoutCheck() {
store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
- commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
requestMessageAssembler.checkExpiredAssembledMessageState();
}
return store.getQueueSize();
}
- final int getCohortCacheSize() {
- return commitCoordinator.getCohortCacheSize();
- }
-
@Override
protected final ActorRef roleChangeNotifier() {
return roleChangeNotifier;
}
}
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleCommitTransaction(final CommitTransaction commit) {
- final var txId = commit.getTransactionId();
- if (isLeader()) {
- askProtocolEncountered(txId);
- commitCoordinator.handleCommit(txId, getSender(), this);
- } else {
- final var leader = getLeader();
- if (leader == null) {
- messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
- } else {
- LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
- leader.forward(commit, getContext());
- }
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- final var txId = canCommit.getTransactionId();
- LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
-
- if (isLeader()) {
- askProtocolEncountered(txId);
- commitCoordinator.handleCanCommit(txId, getSender(), this);
- } else {
- final var leader = getLeader();
- if (leader == null) {
- messageRetrySupport.addMessageToRetry(canCommit, getSender(),
- "Could not canCommit transaction " + txId);
- } else {
- LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
- leader.forward(canCommit, getContext());
- }
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
- askProtocolEncountered(batched.getTransactionId());
-
- try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
- } catch (Exception e) {
- LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
- batched.getTransactionId(), e);
- sender.tell(new Failure(e), getSelf());
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleBatchedModifications(final BatchedModifications batched) {
- // This message is sent to prepare the modifications transaction directly on the Shard as an
- // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
- // BatchedModifications message, the caller sets the ready flag in the message indicating
- // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
- // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
- // ReadyTransaction message.
-
- // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
- // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
- // the primary/leader shard. However with timing and caching on the front-end, there's a small
- // window where it could have a stale leader during leadership transitions.
- //
- boolean isLeaderActive = isLeaderActive();
- if (isLeader() && isLeaderActive) {
- handleBatchedModificationsLocal(batched, getSender());
- } else {
- final var leader = getLeader();
- if (!isLeaderActive || leader == null) {
- messageRetrySupport.addMessageToRetry(batched, getSender(),
- "Could not process BatchedModifications " + batched.getTransactionId());
- } else {
- // If this is not the first batch and leadership changed in between batched messages,
- // we need to reconstruct previous BatchedModifications from the transaction
- // DataTreeModification, honoring the max batched modification count, and forward all the
- // previous BatchedModifications to the new leader.
- final var newModifications = commitCoordinator.createForwardedBatchedModifications(batched,
- datastoreContext.getShardBatchedModificationCount());
-
- LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
- newModifications.size(), leader);
-
- for (BatchedModifications bm : newModifications) {
- leader.forward(bm, getContext());
- }
- }
- }
- }
-
- private boolean failIfIsolatedLeader(final ActorRef sender) {
- if (isIsolatedLeader()) {
- sender.tell(new Failure(new NoShardLeaderException(String.format(
- "Shard %s was the leader but has lost contact with all of its followers. Either all"
- + " other follower nodes are down or this node is isolated by a network partition.",
- persistenceId()))), getSelf());
- return true;
- }
-
- return false;
- }
-
protected boolean isIsolatedLeader() {
return getRaftState() == RaftState.IsolatedLeader;
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
- final var txId = message.getTransactionId();
- LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
-
- final var isLeaderActive = isLeaderActive();
- if (isLeader() && isLeaderActive) {
- askProtocolEncountered(txId);
- try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
- } catch (Exception e) {
- LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
- getSender().tell(new Failure(e), getSelf());
- }
- } else {
- final var leader = getLeader();
- if (!isLeaderActive || leader == null) {
- messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not process ready local transaction " + txId);
- } else {
- LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
- message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
- leader.forward(message, getContext());
- }
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) {
- LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionId());
-
- final var isLeaderActive = isLeaderActive();
- if (isLeader() && isLeaderActive) {
- askProtocolEncountered(forwardedReady.getTransactionId());
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
- } else {
- final var leader = getLeader();
- if (!isLeaderActive || leader == null) {
- messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
- "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
- } else {
- LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
-
- final var readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
- forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
- forwardedReady.getParticipatingShardNames());
- readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
- leader.forward(readyLocal, getContext());
- }
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleAbortTransaction(final AbortTransaction abort) {
- final var transactionId = abort.getTransactionId();
- askProtocolEncountered(transactionId);
- doAbortTransaction(transactionId, getSender());
- }
-
- final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
- commitCoordinator.handleAbort(transactionID, sender, this);
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void handleCreateTransaction(final Object message) {
- if (isLeader()) {
- createTransaction(CreateTransaction.fromSerializable(message));
- } else if (getLeader() != null) {
- getLeader().forward(message, getContext());
- } else {
- getSender().tell(new Failure(new NoShardLeaderException(
- "Could not create a shard transaction", persistenceId())), getSelf());
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- if (isLeader()) {
- final var id = closeTransactionChain.getIdentifier();
- askProtocolEncountered(id.getClientId());
- store.closeTransactionChain(id);
- } else if (getLeader() != null) {
- getLeader().forward(closeTransactionChain, getContext());
- } else {
- LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier());
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void createTransaction(final CreateTransaction createTransaction) {
- askProtocolEncountered(createTransaction.getTransactionId());
-
- try {
- if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
- && failIfIsolatedLeader(getSender())) {
- return;
- }
-
- final var transactionActor = createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId());
-
- getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
- createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
- } catch (Exception e) {
- getSender().tell(new Failure(e), getSelf());
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- private ActorRef createTransaction(final int transactionType, final TransactionIdentifier transactionId) {
- LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
- return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
- transactionId);
- }
-
- // Called on leader only
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void askProtocolEncountered(final TransactionIdentifier transactionId) {
- askProtocolEncountered(transactionId.getHistoryId().getClientId());
- }
-
- // Called on leader only
- @Deprecated(since = "9.0.0", forRemoval = true)
- private void askProtocolEncountered(final ClientIdentifier clientId) {
- final var frontend = clientId.getFrontendId();
- final var state = knownFrontends.get(frontend);
- if (!(state instanceof LeaderFrontendState.Disabled)) {
- LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
- if (knownFrontends.isEmpty()) {
- knownFrontends = new HashMap<>();
- }
- knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
-
- persistPayload(clientId, DisableTrackingPayload.create(clientId,
- datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
- }
- }
-
private void updateSchemaContext(final UpdateSchemaContext message) {
updateSchemaContext(message.modelContext());
}
paused = false;
store.purgeLeaderState();
}
-
- if (hasLeader && !isIsolatedLeader()) {
- messageRetrySupport.retryMessages();
- }
}
@Override
}
requestMessageAssembler.close();
-
- if (!hasLeader()) {
- // No leader anywhere, nothing else to do
- return;
- }
-
- // Another leader was elected. If we were the previous leader and had pending transactions, convert
- // them to transaction messages and send to the new leader.
- ActorSelection leader = getLeader();
- if (leader != null) {
- // Clears all pending transactions and converts them to messages to be forwarded to a new leader.
- Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
- datastoreContext.getShardBatchedModificationCount());
-
- if (!messagesToForward.isEmpty()) {
- LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
- messagesToForward.size(), leader);
-
- for (Object message : messagesToForward) {
- LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
-
- leader.tell(message, self());
- }
- }
- } else {
- commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
- + "change and the leader address isn't available.", this);
- }
} else {
// We have become the leader, we need to reconstruct frontend state
knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
}
-
- if (!isIsolatedLeader()) {
- messageRetrySupport.retryMessages();
- }
}
@Override
return super.journalPluginId();
}
- @VisibleForTesting
- final ShardCommitCoordinator getCommitCoordinator() {
- return commitCoordinator;
- }
-
// non-final for mocking
DatastoreContext getDatastoreContext() {
return datastoreContext;
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Status.Failure;
-import org.apache.pekko.serialization.Serialization;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
-import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
-import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
-import org.slf4j.Logger;
-
-/**
- * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-final class ShardCommitCoordinator {
-
- // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts.
- @VisibleForTesting
- public interface CohortDecorator {
- ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
- }
-
- private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
-
- private final ShardDataTree dataTree;
-
- private final Logger log;
-
- private final String name;
-
- // This is a hook for unit tests to replace or decorate the ShardDataTreeCohorts.
- @VisibleForTesting
- private CohortDecorator cohortDecorator;
-
- private ReadyTransactionReply readyTransactionReply;
-
- ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
- this.log = log;
- this.name = name;
- this.dataTree = requireNonNull(dataTree);
- }
-
- int getCohortCacheSize() {
- return cohortCache.size();
- }
-
- private String persistenceId() {
- return dataTree.logContext();
- }
-
- private ReadyTransactionReply readyTransactionReply(final ActorRef cohort) {
- if (readyTransactionReply == null) {
- readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(cohort));
- }
-
- return readyTransactionReply;
- }
-
- /**
- * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
- * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
- *
- * @param ready the ForwardedReadyTransaction message to process
- * @param sender the sender of the message
- * @param shard the transaction's shard actor
- */
- void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
- final Shard shard) {
- log.debug("{}: Readying transaction {}, client version {}", name,
- ready.getTransactionId(), ready.getTxnClientVersion());
-
- final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
- final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
- cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
-
- if (ready.isDoImmediateCommit()) {
- cohortEntry.setDoImmediateCommit(true);
- cohortEntry.setReplySender(sender);
- cohortEntry.setShard(shard);
- handleCanCommit(cohortEntry);
- } else {
- // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
- // front-end so send back a ReadyTransactionReply with our actor path.
- sender.tell(readyTransactionReply(shard.self()), shard.self());
- }
- }
-
- /**
- * This method handles a BatchedModifications message for a transaction being prepared directly on the
- * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
- * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
- * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
- *
- * @param batched the BatchedModifications message to process
- * @param sender the sender of the message
- */
- @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of captured failure")
- void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
- CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
- if (cohortEntry == null || cohortEntry.isSealed()) {
- cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
- batched.getVersion());
- cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("{}: Applying {} batched modifications for Tx {}", name,
- batched.getModifications().size(), batched.getTransactionId());
- }
-
- cohortEntry.applyModifications(batched.getModifications());
-
- if (batched.isReady()) {
- if (cohortEntry.getLastBatchedModificationsException() != null) {
- cohortCache.remove(cohortEntry.getTransactionId());
- throw cohortEntry.getLastBatchedModificationsException();
- }
-
- if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
- cohortCache.remove(cohortEntry.getTransactionId());
- throw new IllegalStateException(String.format(
- "The total number of batched messages received %d does not match the number sent %d",
- cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
- }
-
- if (log.isDebugEnabled()) {
- log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
- batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
- }
-
- cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
- cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
-
- if (batched.isDoCommitOnReady()) {
- cohortEntry.setReplySender(sender);
- cohortEntry.setShard(shard);
- handleCanCommit(cohortEntry);
- } else {
- sender.tell(readyTransactionReply(shard.self()), shard.self());
- }
- } else {
- sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
- }
- }
-
- /**
- * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
- * been prepared beforehand by the sender and we just need to drive them through into the
- * dataTree.
- *
- * @param message the ReadyLocalTransaction message to process
- * @param sender the sender of the message
- * @param shard the transaction's shard actor
- */
- void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
- final TransactionIdentifier txId = message.getTransactionId();
- final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
- message.getParticipatingShardNames());
- final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
- cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
- cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
-
- log.debug("{}: Applying local modifications for Tx {}", name, txId);
-
- if (message.isDoCommitOnReady()) {
- cohortEntry.setReplySender(sender);
- cohortEntry.setShard(shard);
- handleCanCommit(cohortEntry);
- } else {
- sender.tell(readyTransactionReply(shard.self()), shard.self());
- }
- }
-
- @Deprecated(since = "9.0.0", forRemoval = true)
- Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
- final int maxModificationsPerBatch) {
- CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId());
- if (cohortEntry == null || cohortEntry.getTransaction() == null) {
- return Collections.singletonList(from);
- }
-
- cohortEntry.applyModifications(from.getModifications());
-
- final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
- cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
- @Override
- protected BatchedModifications getModifications() {
- if (newModifications.isEmpty()
- || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion()));
- }
-
- return newModifications.getLast();
- }
- });
-
- BatchedModifications last = newModifications.getLast();
- last.setDoCommitOnReady(from.isDoCommitOnReady());
- if (from.isReady()) {
- last.setReady(from.getParticipatingShardNames());
- }
- last.setTotalMessagesSent(newModifications.size());
- return newModifications;
- }
-
- private void handleCanCommit(final CohortEntry cohortEntry) {
- cohortEntry.canCommit(new FutureCallback<>() {
- @Override
- public void onSuccess(final Empty result) {
- log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId());
-
- if (cohortEntry.isDoImmediateCommit()) {
- doCommit(cohortEntry);
- } else {
- cohortEntry.getReplySender().tell(
- CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(),
- cohortEntry.getShard().self());
- }
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
- failure);
-
- cohortCache.remove(cohortEntry.getTransactionId());
- cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
- }
- });
- }
-
- /**
- * This method handles the canCommit phase for a transaction.
- *
- * @param transactionID the ID of the transaction to canCommit
- * @param sender the actor to which to send the response
- * @param shard the transaction's shard actor
- */
- void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
- // Lookup the cohort entry that was cached previously (or should have been) by
- // transactionReady (via the ForwardedReadyTransaction message).
- final CohortEntry cohortEntry = cohortCache.get(transactionID);
- if (cohortEntry == null) {
- // Either canCommit was invoked before ready (shouldn't happen) or a long time passed
- // between canCommit and ready and the entry was expired from the cache or it was aborted.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
- log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
- sender.tell(new Failure(ex), shard.self());
- return;
- }
-
- cohortEntry.setReplySender(sender);
- cohortEntry.setShard(shard);
-
- handleCanCommit(cohortEntry);
- }
-
- void doCommit(final CohortEntry cohortEntry) {
- log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
-
- // We perform the preCommit phase here atomically with the commit phase. This is an
- // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
- // coordination of preCommit across shards in case of failure but preCommit should not
- // normally fail since we ensure only one concurrent 3-phase commit.
- cohortEntry.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate candidate) {
- finishCommit(cohortEntry.getReplySender(), cohortEntry);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- log.error("{} An exception occurred while preCommitting transaction {}", name,
- cohortEntry.getTransactionId(), failure);
-
- cohortCache.remove(cohortEntry.getTransactionId());
- cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
- }
- });
- }
-
- void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
- log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
-
- cohortEntry.commit(new FutureCallback<UnsignedLong>() {
- @Override
- public void onSuccess(final UnsignedLong result) {
- final TransactionIdentifier txId = cohortEntry.getTransactionId();
- log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
- sender);
-
- cohortCache.remove(cohortEntry.getTransactionId());
- sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
- cohortEntry.getShard().self());
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- final TransactionIdentifier txId = cohortEntry.getTransactionId();
- log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
-
- cohortCache.remove(cohortEntry.getTransactionId());
- sender.tell(new Failure(failure), cohortEntry.getShard().self());
- }
- });
- }
-
- /**
- * This method handles the preCommit and commit phases for a transaction.
- *
- * @param transactionID the ID of the transaction to commit
- * @param sender the actor to which to send the response
- * @param shard the transaction's shard actor
- */
- void handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
- final CohortEntry cohortEntry = cohortCache.get(transactionID);
- if (cohortEntry == null) {
- // Either a long time passed between canCommit and commit and the entry was expired from the cache
- // or it was aborted.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
- log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
- sender.tell(new Failure(ex), shard.self());
- return;
- }
-
- cohortEntry.setReplySender(sender);
- doCommit(cohortEntry);
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
- CohortEntry cohortEntry = cohortCache.remove(transactionID);
- if (cohortEntry == null) {
- return;
- }
-
- log.debug("{}: Aborting transaction {}", name, transactionID);
-
- final ActorRef self = shard.getSelf();
- cohortEntry.abort(new FutureCallback<>() {
- @Override
- public void onSuccess(final Empty result) {
- if (sender != null) {
- sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
- }
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- log.error("{}: An exception happened during abort", name, failure);
-
- if (sender != null) {
- sender.tell(new Failure(failure), self);
- }
- }
- });
- }
-
- void checkForExpiredTransactions(final long timeout, final Shard shard) {
- cohortCache.values().removeIf(CohortEntry::isFailed);
- }
-
- void abortPendingTransactions(final String reason, final Shard shard) {
- final var failure = new Failure(new RuntimeException(reason));
- final var pending = dataTree.getAndClearPendingTransactions();
-
- log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
-
- for (var cohort : pending) {
- final var cohortEntry = cohortCache.remove(cohort.transactionId());
- if (cohortEntry != null) {
- final var replySender = cohortEntry.getReplySender();
- if (replySender != null) {
- replySender.tell(failure, shard.self());
- }
- }
- }
-
- cohortCache.clear();
- }
-
- Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
- final var messages = new ArrayList<VersionedExternalizableMessage>();
- for (var cohort : dataTree.getAndClearPendingTransactions()) {
- final var cohortEntry = cohortCache.remove(cohort.transactionId());
- if (cohortEntry == null) {
- continue;
- }
-
- final var newMessages = new ArrayDeque<BatchedModifications>();
- cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
- @Override
- protected BatchedModifications getModifications() {
- final var lastBatch = newMessages.peekLast();
- if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
- return lastBatch;
- }
-
- // Allocate a new message
- final var ret = new BatchedModifications(cohortEntry.getTransactionId(),
- cohortEntry.getClientVersion());
- newMessages.add(ret);
- return ret;
- }
- });
-
- final var last = newMessages.peekLast();
- if (last != null) {
- final boolean immediate = cohortEntry.isDoImmediateCommit();
- last.setDoCommitOnReady(immediate);
- last.setReady(cohortEntry.getParticipatingShardNames());
- last.setTotalMessagesSent(newMessages.size());
-
- messages.addAll(newMessages);
-
- if (!immediate) {
- switch (cohort.getState()) {
- case CAN_COMMIT_COMPLETE:
- case CAN_COMMIT_PENDING:
- messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(),
- cohortEntry.getClientVersion()));
- break;
- case PRE_COMMIT_COMPLETE:
- case PRE_COMMIT_PENDING:
- messages.add(new CommitTransaction(cohortEntry.getTransactionId(),
- cohortEntry.getClientVersion()));
- break;
- default:
- break;
- }
- }
- }
- }
-
- return messages;
- }
-
- @VisibleForTesting
- void setCohortDecorator(final CohortDecorator cohortDecorator) {
- this.cohortDecorator = cohortDecorator;
- }
-}
return cohort;
}
- // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
- // the newReadWriteTransaction()
- final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
- final Optional<SortedSet<String>> participatingShardNames) {
- final var historyId = txId.getHistoryId();
- if (historyId.getHistoryId() == 0) {
- return createReadyCohort(txId, mod, participatingShardNames);
- }
- return ensureTransactionChain(historyId, null).createReadyCohort(txId, mod, participatingShardNames);
- }
-
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-
-/**
- * Actor for a shard read transaction.
- *
- * @author syedbahm
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public final class ShardReadTransaction extends ShardTransaction {
- private final AbstractShardDataTreeTransaction<?> transaction;
-
- public ShardReadTransaction(final AbstractShardDataTreeTransaction<?> transaction, final ActorRef shardActor,
- final ShardStats shardStats) {
- super(shardActor, shardStats, transaction.getIdentifier());
- this.transaction = requireNonNull(transaction);
- }
-
- @Override
- public void handleReceive(final Object message) {
- if (ReadData.isSerializedType(message)) {
- readData(transaction, ReadData.fromSerializable(message));
- } else if (DataExists.isSerializedType(message)) {
- dataExists(transaction, DataExists.fromSerializable(message));
- } else {
- super.handleReceive(message);
- }
- }
-
- @Override
- protected AbstractShardDataTreeTransaction<?> getDOMStoreTransaction() {
- return transaction;
- }
-
- @Override
- protected boolean returnCloseTransactionReply() {
- return false;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-
-/**
- * Actor for a shard read/write transaction.
- *
- * @author syedbahm
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardReadWriteTransaction extends ShardWriteTransaction {
- public ShardReadWriteTransaction(final ReadWriteShardDataTreeTransaction transaction, final ActorRef shardActor,
- final ShardStats shardStats) {
- super(transaction, shardActor, shardStats);
- }
-
- @Override
- public void handleReceive(final Object message) {
- if (ReadData.isSerializedType(message)) {
- readData(ReadData.fromSerializable(message));
- } else if (DataExists.isSerializedType(message)) {
- dataExists((DataExists) message);
- } else {
- super.handleReceive(message);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.PoisonPill;
-import org.apache.pekko.actor.Props;
-import org.apache.pekko.actor.ReceiveTimeout;
-import org.apache.pekko.actor.Status.Failure;
-import org.apache.pekko.japi.Creator;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
- private final ActorRef shardActor;
- private final ShardStats shardStats;
- private final TransactionIdentifier transactionId;
-
- protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
- final TransactionIdentifier transactionId) {
- // actor name override used for metering. This does not change the "real" actor name
- super("shard-tx");
- this.shardActor = shardActor;
- this.shardStats = shardStats;
- this.transactionId = requireNonNull(transactionId);
- }
-
- public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
- final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
- return Props.create(ShardTransaction.class,
- new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
- }
-
- protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
-
- protected ActorRef getShardActor() {
- return shardActor;
- }
-
- protected final TransactionIdentifier getTransactionId() {
- return transactionId;
- }
-
- @Override
- @Deprecated(since = "11.0.0", forRemoval = true)
- public final ActorRef getSender() {
- return super.getSender();
- }
-
- @Override
- public void handleReceive(final Object message) {
- if (CloseTransaction.isSerializedType(message)) {
- closeTransaction(true);
- } else if (message instanceof ReceiveTimeout) {
- LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
- closeTransaction(false);
- } else {
- unknownMessage(message);
- }
- }
-
- protected boolean returnCloseTransactionReply() {
- return true;
- }
-
- private void closeTransaction(final boolean sendReply) {
- getDOMStoreTransaction().abortFromTransactionActor();
-
- if (sendReply && returnCloseTransactionReply()) {
- getSender().tell(new CloseTransactionReply(), getSelf());
- }
-
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
-
- private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
- final boolean ret = transaction.isClosed();
- if (ret) {
- shardStats.incrementFailedReadTransactionsCount();
- getSender().tell(new Failure(new ReadFailedException("Transaction is closed")), getSelf());
- }
- return ret;
- }
-
- protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
- if (checkClosed(transaction)) {
- return;
- }
-
- final YangInstanceIdentifier path = message.getPath();
- ReadDataReply readDataReply = new ReadDataReply(transaction.getSnapshot().readNode(path).orElse(null),
- message.getVersion());
- getSender().tell(readDataReply.toSerializable(), self());
- }
-
- protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
- if (checkClosed(transaction)) {
- return;
- }
-
- final YangInstanceIdentifier path = message.getPath();
- boolean exists = transaction.getSnapshot().readNode(path).isPresent();
- getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
- }
-
- @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
- + "create remote instances of this actor and thus don't need it to be Serializable.")
- private static class ShardTransactionCreator implements Creator<ShardTransaction> {
- @java.io.Serial
- private static final long serialVersionUID = 1L;
-
- final AbstractShardDataTreeTransaction<?> transaction;
- final ActorRef shardActor;
- final DatastoreContext datastoreContext;
- final ShardStats shardStats;
- final TransactionType type;
-
- ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
- final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
- this.transaction = requireNonNull(transaction);
- this.shardActor = shardActor;
- this.shardStats = shardStats;
- this.datastoreContext = datastoreContext;
- this.type = type;
- }
-
- @Override
- public ShardTransaction create() {
- final var tx = switch (type) {
- case READ_ONLY -> new ShardReadTransaction(transaction, shardActor, shardStats);
- case READ_WRITE -> new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction) transaction,
- shardActor, shardStats);
- case WRITE_ONLY -> new ShardWriteTransaction((ReadWriteShardDataTreeTransaction) transaction,
- shardActor, shardStats);
- default -> throw new IllegalArgumentException("Unhandled transaction type " + type);
- };
- tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
- return tx;
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.pekko.actor.AbstractActor.ActorContext;
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-
-/**
- * A factory for creating ShardTransaction actors.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-final class ShardTransactionActorFactory {
- private static final AtomicLong ACTOR_NAME_COUNTER = new AtomicLong();
-
- private final ShardDataTree dataTree;
- private final DatastoreContext datastoreContext;
- private final String txnDispatcherPath;
- private final ShardStats shardMBean;
- private final ActorContext actorContext;
- private final ActorRef shardActor;
- private final String shardName;
-
- ShardTransactionActorFactory(final ShardDataTree dataTree, final DatastoreContext datastoreContext,
- final String txnDispatcherPath, final ActorRef shardActor, final ActorContext actorContext,
- final ShardStats shardMBean, final String shardName) {
- this.dataTree = requireNonNull(dataTree);
- this.datastoreContext = requireNonNull(datastoreContext);
- this.txnDispatcherPath = requireNonNull(txnDispatcherPath);
- this.shardMBean = requireNonNull(shardMBean);
- this.actorContext = requireNonNull(actorContext);
- this.shardActor = requireNonNull(shardActor);
- this.shardName = requireNonNull(shardName);
- }
-
- private String actorNameFor(final TransactionIdentifier txId) {
- final LocalHistoryIdentifier historyId = txId.getHistoryId();
- final ClientIdentifier clientId = historyId.getClientId();
- final FrontendIdentifier frontendId = clientId.getFrontendId();
-
- final StringBuilder sb = new StringBuilder("shard-");
- sb.append(shardName).append('-')
- .append(frontendId.getMemberName().getName()).append(':')
- .append(frontendId.getClientType().getName()).append('@')
- .append(clientId.getGeneration()).append(':');
- if (historyId.getHistoryId() != 0) {
- sb.append(historyId.getHistoryId()).append('-');
- }
-
- return sb.append(txId.getTransactionId()).append('_').append(ACTOR_NAME_COUNTER.incrementAndGet()).toString();
- }
-
- ActorRef newShardTransaction(final TransactionType type, final TransactionIdentifier transactionID) {
- final AbstractShardDataTreeTransaction<?> transaction = switch (type) {
- case READ_ONLY -> dataTree.newReadOnlyTransaction(transactionID);
- case READ_WRITE, WRITE_ONLY -> dataTree.newReadWriteTransaction(transactionID);
- default -> throw new IllegalArgumentException("Unsupported transaction type " + type);
- };
- return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean)
- .withDispatcher(txnDispatcherPath), actorNameFor(transactionID));
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.Closeable;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Cancellable;
-import org.apache.pekko.actor.Status.Failure;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-class ShardTransactionMessageRetrySupport implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
-
- static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
-
- private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
- private final Shard shard;
-
- ShardTransactionMessageRetrySupport(final Shard shard) {
- this.shard = shard;
- }
-
- void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
- LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
-
- MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
-
- FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
- messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
- messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
-
- messagesToRetry.add(messageInfo);
- }
-
- void retryMessages() {
- if (messagesToRetry.isEmpty()) {
- return;
- }
-
- MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
- messagesToRetry.clear();
-
- for (MessageInfo info: copy) {
- LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
- info.retry(shard);
- }
- }
-
- void onTimerMessage(final Object message) {
- MessageInfo messageInfo = (MessageInfo)message;
-
- LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
-
- messagesToRetry.remove(messageInfo);
- messageInfo.timedOut(shard);
- }
-
- @Override
- public void close() {
- for (MessageInfo info: messagesToRetry) {
- info.timedOut(shard);
- }
-
- messagesToRetry.clear();
- }
-
- private static final class MessageInfo {
- final Object message;
- final ActorRef replyTo;
- final String failureMessage;
- Cancellable timer;
-
- MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
- this.message = message;
- this.replyTo = replyTo;
- this.failureMessage = requireNonNull(failureMessage);
- }
-
- void retry(final Shard shard) {
- timer.cancel();
- shard.getSelf().tell(message, replyTo);
- }
-
- void timedOut(final Shard shard) {
- replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
- shard.getSelf());
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.PoisonPill;
-import org.apache.pekko.actor.Status.Failure;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-
-/**
- * Actor for a shard write-only transaction.
- *
- * @author syedbahm
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardWriteTransaction extends ShardTransaction {
- private int totalBatchedModificationsReceived;
- private Exception lastBatchedModificationsException;
- private final ReadWriteShardDataTreeTransaction transaction;
-
- public ShardWriteTransaction(final ReadWriteShardDataTreeTransaction transaction, final ActorRef shardActor,
- final ShardStats shardStats) {
- super(shardActor, shardStats, transaction.getIdentifier());
- this.transaction = transaction;
- }
-
- @Override
- protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
- return transaction;
- }
-
- @Override
- public void handleReceive(final Object message) {
- if (message instanceof BatchedModifications) {
- batchedModifications((BatchedModifications)message);
- } else {
- super.handleReceive(message);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void batchedModifications(final BatchedModifications batched) {
- if (checkClosed()) {
- if (batched.isReady()) {
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
- return;
- }
-
- try {
- for (Modification modification: batched.getModifications()) {
- modification.apply(transaction.getSnapshot());
- }
-
- totalBatchedModificationsReceived++;
- if (batched.isReady()) {
- if (lastBatchedModificationsException != null) {
- throw lastBatchedModificationsException;
- }
-
- if (totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
- throw new IllegalStateException(String.format(
- "The total number of batched messages received %d does not match the number sent %d",
- totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
- }
-
- readyTransaction(batched);
- } else {
- getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
- }
- } catch (Exception e) {
- lastBatchedModificationsException = e;
- getSender().tell(new Failure(e), getSelf());
-
- if (batched.isReady()) {
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
- }
- }
-
- protected final void dataExists(final DataExists message) {
- super.dataExists(transaction, message);
- }
-
- protected final void readData(final ReadData message) {
- super.readData(transaction, message);
- }
-
- private boolean checkClosed() {
- final boolean ret = transaction.isClosed();
- if (ret) {
- getSender().tell(new Failure(new IllegalStateException("Transaction is closed, no modifications allowed")),
- getSelf());
- }
- return ret;
- }
-
- private void readyTransaction(final BatchedModifications batched) {
- TransactionIdentifier transactionID = getTransactionId();
-
- LOG.debug("readyTransaction : {}", transactionID);
-
- getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(),
- transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext());
-
- // The shard will handle the commit from here so we're no longer needed - self-destruct.
- getSelf().tell(PoisonPill.getInstance(), getSelf());
- }
-}
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
return cohort;
}
- @Deprecated(since = "11.0.0", forRemoval = true)
- protected static Map<TransactionIdentifier, CapturingShardDataTreeCohort> setupCohortDecorator(final Shard shard,
- final TransactionIdentifier... transactionIDs) {
- final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = new HashMap<>();
- for (TransactionIdentifier id: transactionIDs) {
- cohortMap.put(id, new CapturingShardDataTreeCohort());
- }
-
- shard.getCommitCoordinator().setCohortDecorator((transactionID, actual) -> {
- CapturingShardDataTreeCohort cohort = cohortMap.get(transactionID);
- cohort.setDelegate(actual);
- return cohort;
- });
-
- return cohortMap;
- }
-
public static NormalizedNode readStore(final TestActorRef<? extends Shard> shard,
final YangInstanceIdentifier id) {
return shard.underlyingActor().getDataStore().readNode(id).orElse(null);
return delegate.create();
}
}
-
- @Deprecated(since = "11.0.0", forRemoval = true)
- public static class CapturingShardDataTreeCohort extends ShardDataTreeCohort {
- private volatile ShardDataTreeCohort delegate;
- private FutureCallback<Empty> canCommit;
- private FutureCallback<DataTreeCandidate> preCommit;
- private FutureCallback<UnsignedLong> commit;
-
- public void setDelegate(final ShardDataTreeCohort delegate) {
- this.delegate = delegate;
- }
-
- public FutureCallback<Empty> getCanCommit() {
- assertNotNull("canCommit was not invoked", canCommit);
- return canCommit;
- }
-
- public FutureCallback<DataTreeCandidate> getPreCommit() {
- assertNotNull("preCommit was not invoked", preCommit);
- return preCommit;
- }
-
- public FutureCallback<UnsignedLong> getCommit() {
- assertNotNull("commit was not invoked", commit);
- return commit;
- }
-
- @Override
- TransactionIdentifier transactionId() {
- return delegate.transactionId();
- }
-
- @Override
- DataTreeCandidateTip getCandidate() {
- return delegate.getCandidate();
- }
-
- @Override
- DataTreeModification getDataTreeModification() {
- return delegate.getDataTreeModification();
- }
-
- @Override
- public void canCommit(final FutureCallback<Empty> callback) {
- canCommit = mockFutureCallback(callback);
- delegate.canCommit(canCommit);
- }
-
- @Override
- public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
- preCommit = mockFutureCallback(callback);
- delegate.preCommit(preCommit);
- }
-
- @Override
- public void commit(final FutureCallback<UnsignedLong> callback) {
- commit = mockFutureCallback(callback);
- delegate.commit(commit);
- }
-
- @SuppressWarnings("unchecked")
- private static <T> FutureCallback<T> mockFutureCallback(final FutureCallback<T> actual) {
- FutureCallback<T> mock = mock(FutureCallback.class);
- doAnswer(invocation -> {
- actual.onFailure(invocation.getArgument(0));
- return null;
- }).when(mock).onFailure(any(Throwable.class));
-
- doAnswer(invocation -> {
- actual.onSuccess(invocation.getArgument(0));
- return null;
- }).when(mock).onSuccess((T) nullable(Object.class));
-
- return mock;
- }
-
- @Override
- public void abort(final FutureCallback<Empty> callback) {
- delegate.abort(callback);
- }
-
- @Override
- public boolean isFailed() {
- return delegate.isFailed();
- }
-
- @Override
- public State getState() {
- return delegate.getState();
- }
-
- @Override
- Optional<SortedSet<String>> getParticipatingShardNames() {
- return delegate.getParticipatingShardNames();
- }
- }
}
protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
protected static final MemberName MEMBER_2_NAME = MemberName.forName("member-2");
- private static final FrontendType FRONTEND_TYPE = FrontendType.forName(ShardTransactionTest.class.getSimpleName());
+ // FIXME: use a different name
+ private static final FrontendType FRONTEND_TYPE = FrontendType.forName("ShardTransactionTest");
protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.AddressFromURIString;
-import org.apache.pekko.actor.Status.Failure;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.Member;
-import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.After;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.collection.Set;
}
}
- @Test
- public void testReadyLocalTransactionForwardedToLeader() throws Exception {
- initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
- followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
-
- final Optional<ActorRef> carsFollowerShard =
- followerDistributedDataStore.getActorUtils().findLocalShard("cars");
- assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
-
- final DataTree dataTree = new InMemoryDataTreeFactory().create(
- DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
-
- // Send a tx with immediate commit.
-
- DataTreeModification modification = dataTree.takeSnapshot().newModification();
- modification.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- modification.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-
- final var car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
- modification.write(CarsModel.newCarPath("optima"), car1);
- modification.ready();
-
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
-
- carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
- Object resp = followerTestKit.expectMsgClass(Object.class);
- if (resp instanceof Failure failure) {
- throw new AssertionError("Unexpected failure response", failure.cause());
- }
-
- assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
-
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
-
- // Send another tx without immediate commit.
-
- final var car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
- modification = dataTree.takeSnapshot().newModification();
- modification.write(CarsModel.newCarPath("sportage"), car2);
- modification.ready();
-
- readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
-
- carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
- resp = followerTestKit.expectMsgClass(Object.class);
- if (resp instanceof Failure failure) {
- throw new AssertionError("Unexpected failure response", failure.cause());
- }
-
- assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
-
- final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
- ((ReadyTransactionReply)resp).getCohortPath());
-
- ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(),
- List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
- () -> DataStoreVersions.CURRENT_VERSION)), tx2);
- cohort.canCommit().get(5, TimeUnit.SECONDS);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
-
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
- }
-
- @Test
- public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
- initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
- followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
-
- final Optional<ActorRef> carsFollowerShard =
- followerDistributedDataStore.getActorUtils().findLocalShard("cars");
- assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
-
- carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
- final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
-
- // Send a tx with immediate commit.
-
- DataTreeModification modification = dataTree.takeSnapshot().newModification();
- modification.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- modification.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-
- final var car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
- modification.write(CarsModel.newCarPath("optima"), car1);
-
- ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION,
- new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
- true, Optional.empty());
-
- carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
- Object resp = followerTestKit.expectMsgClass(Object.class);
- if (resp instanceof Failure failure) {
- throw new AssertionError("Unexpected failure response", failure.cause());
- }
-
- assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
-
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
-
- // Send another tx without immediate commit.
-
- final var car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
- modification = dataTree.takeSnapshot().newModification();
- modification.write(CarsModel.newCarPath("sportage"), car2);
-
- forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION,
- new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
- false, Optional.empty());
-
- carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
- resp = followerTestKit.expectMsgClass(Object.class);
- if (resp instanceof Failure failure) {
- throw new AssertionError("Unexpected failure response", failure.cause());
- }
-
- assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
-
- ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
- ((ReadyTransactionReply)resp).getCohortPath());
-
- final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorUtils(), List.of(
- new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
- () -> DataStoreVersions.CURRENT_VERSION)), tx2);
- cohort.canCommit().get(5, TimeUnit.SECONDS);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
-
- verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
- }
-
@Test
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
followerDatastoreContextBuilder.shardBatchedModificationCount(2);
+++ /dev/null
-/*
- * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertNotNull;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.EMPTY_OUTER_LIST;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.EMPTY_TEST;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntry;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
-
-import com.google.common.collect.ImmutableSortedSet;
-import java.time.Duration;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.apache.pekko.dispatch.Dispatchers;
-import org.apache.pekko.testkit.TestActorRef;
-import org.apache.pekko.testkit.javadsl.TestKit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit tests for various 3PC coordination scenarios.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardCommitCoordinationTest extends AbstractShardTest {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
-
- /**
- * Test 2 tx's accessing the same shards.
- * <pre>
- * tx1 -> shard A, shard B
- * tx2 -> shard A, shard B
- * </pre>
- * The tx's are readied such the pendingTransactions queue are as follows:
- * <pre>
- * Queue for shard A -> tx1, tx2
- * Queue for shard B -> tx2, tx1
- * </pre>
- * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
- * even though it isn't at the head of the queues.
- */
- @Test
- public void testTwoTransactionsWithSameTwoParticipatingShards() {
- final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
- LOG.info("{} starting", testName);
-
- final TestKit kit1 = new TestKit(getSystem());
- final TestKit kit2 = new TestKit(getSystem());
-
- final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
- final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
-
- final TestActorRef<Shard> shardA = actorFactory.createTestActor(
- newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardA);
-
- final TestActorRef<Shard> shardB = actorFactory.createTestActor(
- newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardB);
-
- final TransactionIdentifier txId1 = nextTransactionId();
- final TransactionIdentifier txId2 = nextTransactionId();
-
- SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
- shardBId.getShardName());
-
- // Ready [tx1, tx2] on shard A.
-
- shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames), kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
- participatingShardNames), kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- // Ready [tx2, tx1] on shard B.
-
- shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
- participatingShardNames), kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames), kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
- // in the participating shard list.
-
- shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectNoMessage(Duration.ofMillis(100));
-
- // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
-
- shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
- // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
-
- shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
-
- shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectNoMessage(Duration.ofMillis(100));
-
- // Finish commit of tx1.
-
- shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx2.
-
- kit2.expectMsgClass(CanCommitTransactionReply.class);
- kit2.expectMsgClass(CanCommitTransactionReply.class);
-
- shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- // Verify data in the data store.
-
- verifyOuterListEntry(shardA, 1);
- verifyOuterListEntry(shardB, 1);
-
- LOG.info("{} ending", testName);
- }
-
- /**
- * Test multiple tx's accessing a mix of same and differing shards.
- * <pre>
- * tx1 -> shard X, shard B
- * tx2 -> shard X, shard B
- * tx3 -> shard A, shard B
- * tx4 -> shard A, shard B
- * tx5 -> shard A, shard B
- * </pre>
- * The tx's are readied such the pendingTransactions queue are as follows:
- * <pre>
- * Queue for shard A -> tx3, tx4, tx5
- * Queue for shard B -> tx1, tx2, tx5, tx4, tx3
- * </pre>
- * Note: shard X means any other shard which isn't relevant for the test.
- * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
- * CanCommit is requested.
- */
- @Test
- public void testMultipleTransactionsWithMixedParticipatingShards() {
- final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
- LOG.info("{} starting", testName);
-
- final TestKit kit1 = new TestKit(getSystem());
- final TestKit kit2 = new TestKit(getSystem());
- final TestKit kit3 = new TestKit(getSystem());
- final TestKit kit4 = new TestKit(getSystem());
- final TestKit kit5 = new TestKit(getSystem());
-
- final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
- final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
-
- final TestActorRef<Shard> shardA = actorFactory.createTestActor(
- newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardA);
-
- final TestActorRef<Shard> shardB = actorFactory.createTestActor(
- newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardB);
-
- final TransactionIdentifier txId1 = nextTransactionId();
- final TransactionIdentifier txId2 = nextTransactionId();
- final TransactionIdentifier txId3 = nextTransactionId();
- final TransactionIdentifier txId4 = nextTransactionId();
- final TransactionIdentifier txId5 = nextTransactionId();
-
- final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
- shardBId.getShardName());
- final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
-
- // Ready [tx3, tx4, tx5] on shard A.
-
- shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH, EMPTY_TEST, participatingShardNames1),
- kit3.getRef());
- kit3.expectMsgClass(ReadyTransactionReply.class);
-
- shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, EMPTY_OUTER_LIST,
- participatingShardNames1), kit4.getRef());
- kit4.expectMsgClass(ReadyTransactionReply.class);
-
- shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1), outerEntry(1), participatingShardNames1),
- kit5.getRef());
- kit5.expectMsgClass(ReadyTransactionReply.class);
-
- // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
-
- shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames2),
- kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, EMPTY_OUTER_LIST,
- participatingShardNames2), kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
- ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
- kit5.expectMsgClass(ReadyTransactionReply.class);
-
- shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
- participatingShardNames1), kit4.getRef());
- kit4.expectMsgClass(ReadyTransactionReply.class);
-
- shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1), outerEntry(1), participatingShardNames1),
- kit3.getRef());
- kit3.expectMsgClass(ReadyTransactionReply.class);
-
- // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
-
- shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
- kit3.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
-
- shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
- // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
-
- shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
- kit3.expectNoMessage(Duration.ofMillis(100));
-
- // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
- // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
-
- shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
- kit4.expectNoMessage(Duration.ofMillis(100));
-
- // Send tx5 CanCommit to B - it's position in the queue should remain the same.
-
- shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
- kit5.expectNoMessage(Duration.ofMillis(100));
-
- // Finish commit of tx1.
-
- shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx2.
-
- shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CanCommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx3.
-
- // From shard B
- kit3.expectMsgClass(CanCommitTransactionReply.class);
-
- shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
- kit3.expectMsgClass(CommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
- kit3.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx4.
-
- // From shard B
- kit4.expectMsgClass(CanCommitTransactionReply.class);
-
- shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
- kit4.expectMsgClass(CanCommitTransactionReply.class);
- shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
- kit4.expectMsgClass(CommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
- kit4.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx5.
-
- // From shard B
- kit5.expectMsgClass(CanCommitTransactionReply.class);
-
- shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
- kit5.expectMsgClass(CanCommitTransactionReply.class);
- shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
- kit5.expectMsgClass(CommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
- kit5.expectMsgClass(CommitTransactionReply.class);
-
- verifyOuterListEntry(shardA, 1);
- verifyInnerListEntry(shardB, 1, "one");
-
- LOG.info("{} ending", testName);
- }
-
- /**
- * Test 2 tx's accessing 2 shards, the second in common.
- * <pre>
- * tx1 -> shard A, shard C
- * tx2 -> shard B, shard C
- * </pre>
- * The tx's are readied such the pendingTransactions queue are as follows:
- * <pre>
- * Queue for shard A -> tx1
- * Queue for shard B -> tx2
- * Queue for shard C -> tx2, tx1
- * </pre>
- * When the tx's re committed verify the ready order is preserved.
- */
- @Test
- public void testTwoTransactionsWithOneCommonParticipatingShard1() {
- final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
- LOG.info("{} starting", testName);
-
- final TestKit kit1 = new TestKit(getSystem());
- final TestKit kit2 = new TestKit(getSystem());
-
- final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
- final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
- final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
-
- final TestActorRef<Shard> shardA = actorFactory.createTestActor(
- newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardA);
-
- final TestActorRef<Shard> shardB = actorFactory.createTestActor(
- newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardB);
-
- final TestActorRef<Shard> shardC = actorFactory.createTestActor(
- newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardC);
-
- final TransactionIdentifier txId1 = nextTransactionId();
- final TransactionIdentifier txId2 = nextTransactionId();
-
- SortedSet<String> participatingShardNames1 =
- ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
- SortedSet<String> participatingShardNames2 =
- ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
-
- // Ready [tx1] on shard A.
-
- shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames1),
- kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- // Ready [tx2] on shard B.
-
- shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
- kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- // Ready [tx2, tx1] on shard C.
-
- shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
- kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
- participatingShardNames1), kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- // Send tx1 CanCommit to A - should succeed.
-
- shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx2 CanCommit to B - should succeed.
-
- shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
- // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
-
- shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectNoMessage(Duration.ofMillis(100));
-
- // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
-
- shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CanCommitTransactionReply.class);
-
- // Finish commit of tx2.
-
- shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx1.
-
- kit1.expectMsgClass(CanCommitTransactionReply.class);
- shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- // Verify data in the data store.
-
- verifyOuterListEntry(shardC, 1);
-
- LOG.info("{} ending", testName);
- }
-
- /**
- * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
- * <pre>
- * tx1 -> shard A, shard B
- * tx2 -> shard B, shard C
- * </pre>
- * The tx's are readied such the pendingTransactions queue are as follows:
- * <pre>
- * Queue for shard A -> tx1
- * Queue for shard B -> tx2, tx1
- * Queue for shard C -> tx2
- * </pre>
- * When the tx's re committed verify the ready order is preserved.
- */
- @Test
- public void testTwoTransactionsWithOneCommonParticipatingShard2() {
- final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
- LOG.info("{} starting", testName);
-
- final TestKit kit1 = new TestKit(getSystem());
- final TestKit kit2 = new TestKit(getSystem());
-
- final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
- final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
- final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
-
- final TestActorRef<Shard> shardA = actorFactory.createTestActor(
- newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardA);
-
- final TestActorRef<Shard> shardB = actorFactory.createTestActor(
- newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardB);
-
- final TestActorRef<Shard> shardC = actorFactory.createTestActor(
- newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shardC);
-
- final TransactionIdentifier txId1 = nextTransactionId();
- final TransactionIdentifier txId2 = nextTransactionId();
-
- SortedSet<String> participatingShardNames1 =
- ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
- SortedSet<String> participatingShardNames2 =
- ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
-
- // Ready [tx1] on shard A.
-
- shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames1),
- kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- // Ready [tx2, tx1] on shard B.
-
- shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
- kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
- participatingShardNames1), kit1.getRef());
- kit1.expectMsgClass(ReadyTransactionReply.class);
-
- // Ready [tx2] on shard C.
-
- shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
- kit2.getRef());
- kit2.expectMsgClass(ReadyTransactionReply.class);
-
- // Send tx1 CanCommit to A - should succeed.
-
- shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CanCommitTransactionReply.class);
-
- // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
- // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
-
- shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectNoMessage(Duration.ofMillis(100));
-
- // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
-
- shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CanCommitTransactionReply.class);
-
- // Finish commit of tx2.
-
- shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CanCommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
- kit2.expectMsgClass(CommitTransactionReply.class);
-
- // Finish commit of tx1.
-
- kit1.expectMsgClass(CanCommitTransactionReply.class);
- shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
- kit1.expectMsgClass(CommitTransactionReply.class);
-
- // Verify data in the data store.
-
- verifyOuterListEntry(shardB, 1);
-
- LOG.info("{} ending", testName);
- }
-
- static void verifyInnerListEntry(final TestActorRef<Shard> shard, final int outerID, final String innerID) {
- final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
- final NormalizedNode innerListEntry = readStore(shard, path);
- assertNotNull(path + " not found", innerListEntry);
- }
-
- private static BatchedModifications newReadyBatchedModifications(final TransactionIdentifier transactionID,
- final YangInstanceIdentifier path, final NormalizedNode data,
- final SortedSet<String> participatingShardNames) {
- final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
- batched.addModification(new WriteModification(path, data));
- batched.setReady(Optional.of(participatingShardNames));
- batched.setTotalMessagesSent(1);
- return batched;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import java.util.concurrent.TimeUnit;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Props;
-import org.apache.pekko.testkit.TestActorRef;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Covers negative test cases.
- *
- * @author Basheeruddin Ahmed
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardTransactionFailureTest extends AbstractActorTest {
- private static final EffectiveModelContext TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
- private static final TransactionType RO = TransactionType.READ_ONLY;
- private static final TransactionType RW = TransactionType.READ_WRITE;
-
- private static final Shard MOCK_SHARD = mock(Shard.class);
-
- private static final ShardDataTree STORE = new ShardDataTree(MOCK_SHARD, TEST_SCHEMA_CONTEXT, TreeType.OPERATIONAL);
-
- private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.create("inventory", MemberName.forName("member-1"), "operational");
-
- private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
-
- private final ShardStats shardStats = new DefaultShardStatsMXBean(SHARD_IDENTIFIER.toString(), "DataStore", null)
- .shardStats();
-
- private ActorRef createShard() {
- ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
- .schemaContextProvider(() -> TEST_SCHEMA_CONTEXT).props());
- ShardTestKit.waitUntilLeader(shard);
- return shard;
- }
-
- @Before
- public void setup() {
- doReturn(new ShardStats()).when(MOCK_SHARD).shardStats();
- }
-
- @Test(expected = ReadFailedException.class)
- public void testNegativeReadWithReadOnlyTransactionClosed() throws Exception {
-
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(RO, STORE.newReadOnlyTransaction(nextTransactionId()), shard,
- datastoreContext, shardStats);
-
- final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
- "testNegativeReadWithReadOnlyTransactionClosed");
-
- Future<Object> future = org.apache.pekko.pattern.Patterns.ask(subject,
- new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
- Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-
- subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
-
- future = org.apache.pekko.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.of(),
- DataStoreVersions.CURRENT_VERSION), 3000);
- Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
- }
-
-
- @Test(expected = ReadFailedException.class)
- public void testNegativeReadWithReadWriteTransactionClosed() throws Exception {
-
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(RW, STORE.newReadWriteTransaction(nextTransactionId()), shard,
- datastoreContext, shardStats);
-
- final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
- "testNegativeReadWithReadWriteTransactionClosed");
-
- Future<Object> future = org.apache.pekko.pattern.Patterns.ask(subject,
- new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
- Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-
- subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
-
- future = org.apache.pekko.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.of(),
- DataStoreVersions.CURRENT_VERSION), 3000);
- Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
- }
-
- @Test(expected = ReadFailedException.class)
- public void testNegativeExistsWithReadWriteTransactionClosed() throws Exception {
-
- final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(RW, STORE.newReadWriteTransaction(nextTransactionId()), shard,
- datastoreContext, shardStats);
-
- final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
- "testNegativeExistsWithReadWriteTransactionClosed");
-
- Future<Object> future = org.apache.pekko.pattern.Patterns.ask(subject,
- new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
- Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-
- subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
-
- future = org.apache.pekko.pattern.Patterns.ask(subject,
- new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
- Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-
-import com.google.common.base.Throwables;
-import java.time.Duration;
-import java.util.concurrent.TimeUnit;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Props;
-import org.apache.pekko.actor.Status.Failure;
-import org.apache.pekko.actor.Terminated;
-import org.apache.pekko.dispatch.Dispatchers;
-import org.apache.pekko.testkit.TestActorRef;
-import org.apache.pekko.testkit.javadsl.TestKit;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.TestActorFactory;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardTransactionTest extends AbstractActorTest {
-
- private static final TransactionType RO = TransactionType.READ_ONLY;
- private static final TransactionType RW = TransactionType.READ_WRITE;
- private static final TransactionType WO = TransactionType.WRITE_ONLY;
-
- private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.create("inventory", MEMBER_NAME, "config");
- private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
-
- private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
-
- private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-
- private TestActorRef<Shard> shard;
- private ShardDataTree store;
- private TestKit testKit;
-
- @Before
- public void setUp() {
- shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
- .schemaContextProvider(() -> TEST_MODEL).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()));
- ShardTestKit.waitUntilLeader(shard);
- store = shard.underlyingActor().getDataStore();
- testKit = new TestKit(getSystem());
- }
-
- private ActorRef newTransactionActor(final TransactionType type,
- final AbstractShardDataTreeTransaction<?> transaction, final String name) {
- Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
- shard.underlyingActor().shardStats());
- return actorFactory.createActorNoVerify(props, name);
- }
-
- private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
- return store.newReadOnlyTransaction(nextTransactionId());
- }
-
- private ReadWriteShardDataTreeTransaction readWriteTransaction() {
- return store.newReadWriteTransaction(nextTransactionId());
- }
-
- @Test
- public void testOnReceiveReadData() {
- testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
- testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
- }
-
- private void testOnReceiveReadData(final ActorRef transaction) {
- transaction.tell(new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
- testKit.getRef());
-
- ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
-
- assertNotNull(reply.getNormalizedNode());
- }
-
- @Test
- public void testOnReceiveReadDataWhenDataNotFound() {
- testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
- "testReadDataWhenDataNotFoundRO"));
- testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
- "testReadDataWhenDataNotFoundRW"));
- }
-
- private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
- transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
-
- ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
-
- assertNull(reply.getNormalizedNode());
- }
-
- @Test
- public void testOnReceiveDataExistsPositive() {
- testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
- testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
- }
-
- private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
- transaction.tell(new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
- testKit.getRef());
-
- DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
-
- assertTrue(reply.exists());
- }
-
- @Test
- public void testOnReceiveDataExistsNegative() {
- testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
- testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
- }
-
- private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
- transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
-
- DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
-
- assertFalse(reply.exists());
- }
-
- @Test
- public void testOnReceiveBatchedModifications() {
- ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
- DataTreeModification mockModification = mock(DataTreeModification.class);
- ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
- nextTransactionId(), mockModification);
- final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
-
- YangInstanceIdentifier writePath = TestModel.TEST_PATH;
- NormalizedNode writeData = ImmutableNodes.newContainerBuilder()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
- .build();
-
- YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
- NormalizedNode mergeData = TestModel.EMPTY_OUTER_LIST;
-
- YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
-
- BatchedModifications batched = new BatchedModifications(nextTransactionId(),
- DataStoreVersions.CURRENT_VERSION);
- batched.addModification(new WriteModification(writePath, writeData));
- batched.addModification(new MergeModification(mergePath, mergeData));
- batched.addModification(new DeleteModification(deletePath));
-
- transaction.tell(batched, testKit.getRef());
-
- BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
- BatchedModificationsReply.class);
- assertEquals("getNumBatched", 3, reply.getNumBatched());
-
- InOrder inOrder = inOrder(mockModification);
- inOrder.verify(mockModification).write(writePath, writeData);
- inOrder.verify(mockModification).merge(mergePath, mergeData);
- inOrder.verify(mockModification).delete(deletePath);
- }
-
- @Test
- public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
-
- TestKit watcher = new TestKit(getSystem());
- watcher.watch(transaction);
-
- YangInstanceIdentifier writePath = TestModel.TEST_PATH;
- NormalizedNode writeData = ImmutableNodes.newContainerBuilder()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
- .build();
-
- final TransactionIdentifier tx1 = nextTransactionId();
- BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
- batched.addModification(new WriteModification(writePath, writeData));
-
- transaction.tell(batched, testKit.getRef());
- BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
- BatchedModificationsReply.class);
- assertEquals("getNumBatched", 1, reply.getNumBatched());
-
- batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
- batched.setReady();
- batched.setTotalMessagesSent(2);
-
- transaction.tell(batched, testKit.getRef());
- testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
- watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
- }
-
- @Test
- public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
-
- TestKit watcher = new TestKit(getSystem());
- watcher.watch(transaction);
-
- YangInstanceIdentifier writePath = TestModel.TEST_PATH;
- NormalizedNode writeData = ImmutableNodes.newContainerBuilder()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
- .build();
-
- BatchedModifications batched = new BatchedModifications(nextTransactionId(),
- DataStoreVersions.CURRENT_VERSION);
- batched.addModification(new WriteModification(writePath, writeData));
- batched.setReady();
- batched.setDoCommitOnReady(true);
- batched.setTotalMessagesSent(1);
-
- transaction.tell(batched, testKit.getRef());
- testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
- watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
- }
-
- @Test(expected = TestException.class)
- public void testOnReceiveBatchedModificationsFailure() throws Exception {
- ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
- DataTreeModification mockModification = mock(DataTreeModification.class);
- ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
- nextTransactionId(), mockModification);
- final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
- "testOnReceiveBatchedModificationsFailure");
-
- TestKit watcher = new TestKit(getSystem());
- watcher.watch(transaction);
-
- YangInstanceIdentifier path = TestModel.TEST_PATH;
- ContainerNode node = TestModel.EMPTY_TEST;
-
- doThrow(new TestException()).when(mockModification).write(path, node);
-
- final TransactionIdentifier tx1 = nextTransactionId();
- BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
- batched.addModification(new WriteModification(path, node));
-
- transaction.tell(batched, testKit.getRef());
- testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
-
- batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
- batched.setReady();
- batched.setTotalMessagesSent(2);
-
- transaction.tell(batched, testKit.getRef());
- Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
- watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
-
- if (failure != null) {
- Throwables.propagateIfPossible(failure.cause(), Exception.class);
- throw new RuntimeException(failure.cause());
- }
- }
-
- @Test(expected = IllegalStateException.class)
- public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
-
- TestKit watcher = new TestKit(getSystem());
- watcher.watch(transaction);
-
- BatchedModifications batched = new BatchedModifications(nextTransactionId(),
- DataStoreVersions.CURRENT_VERSION);
- batched.setReady();
- batched.setTotalMessagesSent(2);
-
- transaction.tell(batched, testKit.getRef());
-
- Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
- watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
-
- if (failure != null) {
- Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
- Throwables.throwIfUnchecked(failure.cause());
- throw new RuntimeException(failure.cause());
- }
- }
-
- @Test
- public void testReadWriteTxOnReceiveCloseTransaction() {
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testReadWriteTxOnReceiveCloseTransaction");
-
- testKit.watch(transaction);
-
- transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
-
- testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
- testKit.expectTerminated(Duration.ofSeconds(3), transaction);
- }
-
- @Test
- public void testWriteOnlyTxOnReceiveCloseTransaction() {
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testWriteTxOnReceiveCloseTransaction");
-
- testKit.watch(transaction);
-
- transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
-
- testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
- testKit.expectTerminated(Duration.ofSeconds(3), transaction);
- }
-
- @Test
- public void testReadOnlyTxOnReceiveCloseTransaction() {
- final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
- "testReadOnlyTxOnReceiveCloseTransaction");
-
- testKit.watch(transaction);
-
- transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
-
- testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
- }
-
- @Test
- public void testShardTransactionInactivity() {
- datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
- 500, TimeUnit.MILLISECONDS).build();
-
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testShardTransactionInactivity");
-
- testKit.watch(transaction);
-
- testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
- }
-
- public static class TestException extends RuntimeException {
- private static final long serialVersionUID = 1L;
- }
-}