*/
@NotThreadSafe
final class FrontendReadWriteTransaction extends FrontendTransaction {
+ private enum CommitStage {
+ READY,
+ CAN_COMMIT_PENDING,
+ CAN_COMMIT_COMPLETE,
+ PRE_COMMIT_PENDING,
+ PRE_COMMIT_COMPLETE,
+ COMMIT_PENDING,
+ }
+
+ private abstract static class State {
+ @Override
+ public abstract String toString();
+ }
+
+ private static final class Failed extends State {
+ final RequestException cause;
+
+ Failed(final RequestException cause) {
+ this.cause = Preconditions.checkNotNull(cause);
+ }
+
+ @Override
+ public String toString() {
+ return "FAILED (" + cause.getMessage() + ")";
+ }
+ }
+
+ private static final class Open extends State {
+ final ReadWriteShardDataTreeTransaction openTransaction;
+
+ Open(final ReadWriteShardDataTreeTransaction openTransaction) {
+ this.openTransaction = Preconditions.checkNotNull(openTransaction);
+ }
+
+ @Override
+ public String toString() {
+ return "OPEN";
+ }
+ }
+
+ private static final class Ready extends State {
+ final ShardDataTreeCohort readyCohort;
+ CommitStage stage;
+
+ Ready(final ShardDataTreeCohort readyCohort) {
+ this.readyCohort = Preconditions.checkNotNull(readyCohort);
+ this.stage = CommitStage.READY;
+ }
+
+ @Override
+ public String toString() {
+ return "READY (" + stage + ")";
+ }
+ }
+
+ private static final class Sealed extends State {
+ final DataTreeModification sealedModification;
+
+ Sealed(final DataTreeModification sealedModification) {
+ this.sealedModification = Preconditions.checkNotNull(sealedModification);
+ }
+
+ @Override
+ public String toString() {
+ return "SEALED";
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
+ private static final State ABORTED = new State() {
+ @Override
+ public String toString() {
+ return "ABORTED";
+ }
+ };
+ private static final State ABORTING = new State() {
+ @Override
+ public String toString() {
+ return "ABORTING";
+ }
+ };
+ private static final State COMMITTED = new State() {
+ @Override
+ public String toString() {
+ return "COMMITTED";
+ }
+ };
- private ReadWriteShardDataTreeTransaction openTransaction;
- private DataTreeModification sealedModification;
- private ShardDataTreeCohort readyCohort;
+ private State state;
private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
final ReadWriteShardDataTreeTransaction transaction) {
super(history, id);
- this.openTransaction = Preconditions.checkNotNull(transaction);
+ this.state = new Open(transaction);
}
private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
final DataTreeModification mod) {
super(history, id);
- this.sealedModification = Preconditions.checkNotNull(mod);
+ this.state = new Sealed(mod);
}
static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
- handleTransactionAbort(request.getSequence(), envelope, now);
- return null;
+ return handleTransactionAbort(request.getSequence(), envelope, now);
} else if (request instanceof AbortLocalTransactionRequest) {
handleLocalTransactionAbort(request.getSequence(), envelope, now);
return null;
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
- request.getSequence()));
- }
+ throwIfFailed();
+
+ final Ready ready = checkReady();
+ switch (ready.stage) {
+ case PRE_COMMIT_PENDING:
+ LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
+ break;
+ case CAN_COMMIT_COMPLETE:
+ ready.stage = CommitStage.PRE_COMMIT_PENDING;
+ LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
+ ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+ @Override
+ public void onSuccess(final DataTreeCandidate result) {
+ LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+ recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(),
+ request.getSequence()));
+ ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
+ }
+ });
+ break;
+ case CAN_COMMIT_PENDING:
+ case COMMIT_PENDING:
+ case PRE_COMMIT_COMPLETE:
+ case READY:
+ throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
+ default:
+ throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ }
+ }
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
- readyCohort = null;
- }
- });
+ private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+ recordAndSendFailure(envelope, now, cause);
+ state = new Failed(cause);
+ LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
}
private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
final long now) throws RequestException {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
- readyCohort = null;
- }
- });
+ throwIfFailed();
+
+ final Ready ready = checkReady();
+ switch (ready.stage) {
+ case COMMIT_PENDING:
+ LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
+ break;
+ case PRE_COMMIT_COMPLETE:
+ ready.stage = CommitStage.COMMIT_PENDING;
+ LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
+ ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
+ @Override
+ public void onSuccess(final UnsignedLong result) {
+ successfulCommit(envelope, now);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
+ }
+ });
+ break;
+ case CAN_COMMIT_COMPLETE:
+ case CAN_COMMIT_PENDING:
+ case PRE_COMMIT_PENDING:
+ case READY:
+ throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
+ default:
+ throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ }
}
private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
- Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway",
- getIdentifier());
- openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+ checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
sequence)));
}
- private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
- if (readyCohort == null) {
- openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
- sequence)));
- return;
+ private void startAbort() {
+ state = ABORTING;
+ LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
+ }
+
+ private void finishAbort() {
+ state = ABORTED;
+ LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
+ }
+
+ private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
+ final long now) {
+ if (state instanceof Open) {
+ final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+ startAbort();
+ openTransaction.abort(() -> {
+ recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+ sequence));
+ finishAbort();
+ });
+ return null;
+ }
+ if (ABORTING.equals(state)) {
+ LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+ return null;
+ }
+ if (ABORTED.equals(state)) {
+ // We should have recorded the reply
+ LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
+ return new TransactionAbortSuccess(getIdentifier(), sequence);
}
- readyCohort.abort(new FutureCallback<Void>() {
+ final Ready ready = checkReady();
+ startAbort();
+ ready.readyCohort.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- readyCohort = null;
recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
- LOG.debug("Transaction {} aborted", getIdentifier());
+ finishAbort();
}
@Override
public void onFailure(final Throwable failure) {
- readyCohort = null;
- LOG.warn("Transaction {} abort failed", getIdentifier(), failure);
recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
+ LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
+ finishAbort();
}
});
+ return null;
}
- private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- }
+ private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+ throwIfFailed();
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
+ final Ready ready = checkReady();
+ switch (ready.stage) {
+ case CAN_COMMIT_PENDING:
+ LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
+ break;
+ case READY:
+ ready.stage = CommitStage.CAN_COMMIT_PENDING;
+ LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
+ checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(),
+ envelope.getMessage().getSequence()));
+ ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+ LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+ }
+ });
+ break;
+ case CAN_COMMIT_COMPLETE:
+ case COMMIT_PENDING:
+ case PRE_COMMIT_COMPLETE:
+ case PRE_COMMIT_PENDING:
+ throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
+ default:
+ throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ }
}
- private void directCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
+ private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+ throwIfFailed();
+
+ final Ready ready = checkReady();
+ switch (ready.stage) {
+ case CAN_COMMIT_COMPLETE:
+ case CAN_COMMIT_PENDING:
+ case COMMIT_PENDING:
+ case PRE_COMMIT_COMPLETE:
+ case PRE_COMMIT_PENDING:
+ LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
+ getIdentifier(), state, envelope);
+ break;
+ case READY:
+ ready.stage = CommitStage.CAN_COMMIT_PENDING;
+ LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
+ ready.readyCohort.canCommit(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ successfulDirectCanCommit(envelope, now);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+ }
+ });
+ break;
+ default:
+ throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ }
}
void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+ final Ready ready = checkReady();
+ ready.stage = CommitStage.PRE_COMMIT_PENDING;
+ LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
+ ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
successfulDirectPreCommit(envelope, startTime);
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
- readyCohort = null;
+ failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
}
});
}
void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
+ final Ready ready = checkReady();
+ ready.stage = CommitStage.COMMIT_PENDING;
+ LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
+ ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
successfulCommit(envelope, startTime);
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
- readyCohort = null;
+ failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
}
});
}
void successfulCommit(final RequestEnvelope envelope, final long startTime) {
- recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
envelope.getMessage().getSequence()));
- readyCohort = null;
+ state = COMMITTED;
}
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
+ final DataTreeModification sealedModification = checkSealed();
if (!sealedModification.equals(request.getModification())) {
LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
throw new UnsupportedRequestException(request);
final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
if (optFailure.isPresent()) {
- readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+ state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
} else {
- readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+ state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification));
}
if (request.isCoordinated()) {
private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
data.isPresent()));
}
private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+ final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
data));
}
return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
}
- private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
-
- final Collection<TransactionModification> mods = request.getModifications();
- if (!mods.isEmpty()) {
- final DataTreeModification modification = openTransaction.getSnapshot();
- for (TransactionModification m : mods) {
+ private void applyModifications(final Collection<TransactionModification> modifications) {
+ if (!modifications.isEmpty()) {
+ final DataTreeModification modification = checkOpen().getSnapshot();
+ for (TransactionModification m : modifications) {
if (m instanceof TransactionDelete) {
modification.delete(m.getPath());
} else if (m instanceof TransactionWrite) {
}
}
}
+ }
+ private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
+ // protocol, there is nothing for us to do.
final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
if (!maybeProto.isPresent()) {
+ applyModifications(request.getModifications());
return replyModifySuccess(request.getSequence());
}
switch (maybeProto.get()) {
case ABORT:
- openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
- openTransaction = null;
+ if (ABORTING.equals(state)) {
+ LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+ return null;
+ }
+ final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+ startAbort();
+ openTransaction.abort(() -> {
+ recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
+ request.getSequence()));
+ finishAbort();
+ });
return null;
case READY:
- ensureReady();
+ ensureReady(request.getModifications());
return replyModifySuccess(request.getSequence());
case SIMPLE:
- ensureReady();
+ ensureReady(request.getModifications());
directCommit(envelope, now);
return null;
case THREE_PHASE:
- ensureReady();
+ ensureReady(request.getModifications());
coordinatedCommit(envelope, now);
return null;
default:
}
}
- private void ensureReady() {
+ private void ensureReady(final Collection<TransactionModification> modifications) {
// We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
// only once.
- if (readyCohort == null) {
- readyCohort = openTransaction.ready();
- LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier());
- openTransaction = null;
+ if (state instanceof Ready) {
+ LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
+ return;
+ }
+
+ applyModifications(modifications);
+ state = new Ready(checkOpen().ready());
+ LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
+ }
+
+ private void throwIfFailed() throws RequestException {
+ if (state instanceof Failed) {
+ LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
+ throw ((Failed) state).cause;
}
}
+
+ private ReadWriteShardDataTreeTransaction checkOpen() {
+ Preconditions.checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(),
+ state);
+ return ((Open) state).openTransaction;
+ }
+
+ private Ready checkReady() {
+ Preconditions.checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(),
+ state);
+ return (Ready) state;
+ }
+
+ private DataTreeModification checkSealed() {
+ Preconditions.checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(),
+ state);
+ return ((Sealed) state).sealedModification;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+public class FrontendReadWriteTransactionTest {
+
+ private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FrontendIdentifier.create(
+ MemberName.forName("mock"), FrontendType.forName("mock")), 0);
+ private static final LocalHistoryIdentifier HISTORY_ID = new LocalHistoryIdentifier(CLIENT_ID, 0);
+ private static final TransactionIdentifier TX_ID = new TransactionIdentifier(HISTORY_ID, 0);
+
+ private AbstractFrontendHistory mockHistory;
+ private ReadWriteShardDataTreeTransaction shardTransaction;
+ private DataTreeModification mockModification;
+ private ShardDataTreeTransactionParent mockParent;
+ private FrontendReadWriteTransaction openTx;
+ private ShardDataTreeCohort mockCohort;
+
+ @Before
+ public void setup() {
+ mockHistory = mock(AbstractFrontendHistory.class);
+ mockParent = mock(ShardDataTreeTransactionParent.class);
+ mockModification = mock(DataTreeModification.class);
+ mockCohort = mock(ShardDataTreeCohort.class);
+
+ shardTransaction = new ReadWriteShardDataTreeTransaction(mockParent, TX_ID, mockModification);
+ openTx = FrontendReadWriteTransaction.createOpen(mockHistory, shardTransaction);
+
+ when(mockParent.finishTransaction(same(shardTransaction))).thenReturn(mockCohort);
+ }
+
+ private TransactionSuccess<?> handleRequest(final TransactionRequest<?> request) throws RequestException {
+ return openTx.doHandleRequest(request, new RequestEnvelope(request, 0, 0), 0);
+ }
+
+ @Test
+ public void testDuplicateModifyAbort() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setAbort();
+ final TransactionRequest<?> abortReq = b.build();
+ assertNull(handleRequest(abortReq));
+ verify(mockParent).abortTransaction(same(shardTransaction), any(Runnable.class));
+
+ assertNull(handleRequest(abortReq));
+ verifyNoMoreInteractions(mockParent);
+ }
+
+ @Test
+ public void testDuplicateReady() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setReady();
+ final TransactionRequest<?> readyReq = b.build();
+
+ assertNotNull(handleRequest(readyReq));
+ verify(mockParent).finishTransaction(same(shardTransaction));
+
+ assertNotNull(handleRequest(readyReq));
+ verifyNoMoreInteractions(mockParent);
+ }
+
+ @Test
+ public void testDuplicateDirect() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setCommit(false);
+ final TransactionRequest<?> readyReq = b.build();
+
+ assertNull(handleRequest(readyReq));
+ verify(mockParent).finishTransaction(same(shardTransaction));
+
+ assertNull(handleRequest(readyReq));
+ verifyNoMoreInteractions(mockParent);
+ }
+
+ @Test
+ public void testDuplicateCoordinated() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setCommit(true);
+ final TransactionRequest<?> readyReq = b.build();
+
+ assertNull(handleRequest(readyReq));
+ verify(mockParent).finishTransaction(same(shardTransaction));
+
+ assertNull(handleRequest(readyReq));
+ verifyNoMoreInteractions(mockParent);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReadAfterReady() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setReady();
+ final TransactionRequest<?> readyReq = b.build();
+
+ assertNotNull(handleRequest(readyReq));
+ verify(mockParent).finishTransaction(same(shardTransaction));
+
+ handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testModifyAfterReady() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setReady();
+ final TransactionRequest<?> readyReq = b.build();
+
+ assertNotNull(handleRequest(readyReq));
+ verify(mockParent).finishTransaction(same(shardTransaction));
+
+ b.setSequence(1);
+ b.addModification(mock(TransactionModification.class));
+ handleRequest(b.build());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReadAfterAbort() throws RequestException {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+ b.setSequence(0);
+ b.setAbort();
+ final TransactionRequest<?> abortReq = b.build();
+ assertNull(handleRequest(abortReq));
+ verify(mockParent).abortTransaction(same(shardTransaction), any(Runnable.class));
+
+ handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true));
+ }
+}