updateConfigParams(datastoreContext.getShardRaftConfig());
}
- boolean canSkipPayload() {
- // If we do not have any followers and we are not using persistence we can apply modification to the state
- // immediately
- return !hasFollowers() && !persistence().isRecoveryApplicable();
- }
-
// applyState() will be invoked once consensus is reached on the payload
void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
- // We are faking the sender
- persistData(self(), transactionId, payload, batchHint);
+ boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ if (canSkipPayload) {
+ applyState(self(), transactionId, payload);
+ } else {
+ // We are faking the sender
+ persistData(self(), transactionId, payload, batchHint);
+ }
}
private void handleCommitTransaction(final CommitTransaction commit) {
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTreeTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
}
private void processNextPending() {
- processNextPendingFinishCommit();
processNextPendingCommit();
processNextPendingTransaction();
}
entry -> startCommit(entry.cohort, entry.cohort.getCandidate()));
}
- private void processNextPendingFinishCommit() {
- processNextPending(pendingFinishCommits, State.FINISH_COMMIT_PENDING,
- entry -> payloadReplicationComplete(entry.cohort.getIdentifier()));
- }
-
private boolean peekNextPendingCommit() {
final CommitEntry first = pendingCommits.peek();
return first != null && first.cohort.getState() == State.COMMIT_PENDING;
LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
final TransactionIdentifier txId = cohort.getIdentifier();
- if (shard.canSkipPayload() || candidate.getRootNode().getModificationType() == ModificationType.UNMODIFIED) {
- LOG.debug("{}: No replication required, proceeding to finish commit", logContext);
- pendingCommits.remove();
- pendingFinishCommits.add(entry);
- cohort.finishCommitPending();
- payloadReplicationComplete(txId);
- return;
- }
-
final Payload payload;
try {
payload = CommitTransactionPayload.create(txId, candidate);
PRE_COMMIT_PENDING,
PRE_COMMIT_COMPLETE,
COMMIT_PENDING,
- FINISH_COMMIT_PENDING,
ABORTED,
COMMITTED,
switchState(State.FAILED).onFailure(cause);
}
- void finishCommitPending() {
- checkState(State.COMMIT_PENDING);
- // We want to switch the state but keep the callback.
- callback = switchState(State.FINISH_COMMIT_PENDING);
- }
-
@Override
public State getState() {
return state;
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import com.google.common.util.concurrent.FutureCallback;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
public final class ShardDataTreeMocking {
inOrder.verify(mock).preCommit(any(FutureCallback.class));
inOrder.verify(mock).commit(any(FutureCallback.class));
}
+
+ public static void immediatePayloadReplication(final ShardDataTree shardDataTree, final Shard mockShard) {
+ doAnswer(invocation -> {
+ shardDataTree.applyReplicatedPayload(invocation.getArgumentAt(0, TransactionIdentifier.class),
+ invocation.getArgumentAt(1, Payload.class));
+ return null;
+ }).when(mockShard).persistPayload(any(TransactionIdentifier.class), any(CommitTransactionPayload.class),
+ anyBoolean());
+ }
}
import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
+import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
import com.google.common.base.Optional;
@Before
public void setUp() {
- doReturn(true).when(mockShard).canSkipPayload();
doReturn(Ticker.systemTicker()).when(mockShard).ticker();
doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent)
throws ExecutionException, InterruptedException {
+ immediatePayloadReplication(shardDataTree, mockShard);
assertEquals(fullSchema, shardDataTree.getSchemaContext());
@Test
public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
+ immediatePayloadReplication(shardDataTree, mockShard);
+
final List<DataTreeCandidate> candidates = new ArrayList<>();
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
@Test
public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
+ immediatePayloadReplication(shardDataTree, mockShard);
+
final List<DataTreeCandidate> candidates = new ArrayList<>();
candidates.add(addCar(shardDataTree));
candidates.add(removeCar(shardDataTree));
@Test
public void testListenerNotifiedOnApplySnapshot() throws Exception {
+ immediatePayloadReplication(shardDataTree, mockShard);
+
DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener);
});
ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
+ immediatePayloadReplication(newDataTree, mockShard);
addCar(newDataTree, "optima");
addCar(newDataTree, "murano");
@Test
public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
- doReturn(false).when(mockShard).canSkipPayload();
-
final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
@Test
public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
- doReturn(false).when(mockShard).canSkipPayload();
-
final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
}
@Test
- public void testPipelinedTransactionsWithUnmodifiedCandidate() throws Exception {
- doReturn(false).when(mockShard).canSkipPayload();
+ public void testPipelinedTransactionsWithImmediateReplication() throws Exception {
+ immediatePayloadReplication(shardDataTree, mockShard);
final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
- snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
-
- final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
+ snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
- verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class), eq(false));
+ YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+ MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
+ final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
+ final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
+ final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
- verify(mockShard, never()).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
- anyBoolean());
-
- // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
- shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(),
- CommitTransactionPayload.create(nextTransactionId(), cohort1.getCandidate()));
-
- InOrder inOrder = inOrder(commitCallback1, commitCallback2);
+ InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
final DataTreeSnapshot snapshot =
shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
@SuppressWarnings("unchecked")
@Test
public void testAbortWithPendingCommits() throws Exception {
- doReturn(false).when(mockShard).canSkipPayload();
-
final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
@SuppressWarnings("unchecked")
@Test
public void testAbortWithFailedRebase() throws Exception {
+ immediatePayloadReplication(shardDataTree, mockShard);
+
final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
};
}
- @Test
- public void testReadWriteCommitWhenTransactionHasNoModifications() {
- testCommitWhenTransactionHasNoModifications(true);
- }
-
- @Test
- public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
- testCommitWhenTransactionHasNoModifications(false);
- }
-
- private void testCommitWhenTransactionHasNoModifications(final boolean readWrite) {
- // Note that persistence is enabled which would normally result in the
- // entry getting written to the journal
- // but here that need not happen
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitWhenTransactionHasNoModifications-" + readWrite);
-
- waitUntilLeader(shard);
-
- final TransactionIdentifier transactionID = nextTransactionId();
-
- final FiniteDuration duration = duration("5 seconds");
-
- if (readWrite) {
- final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore()
- .newReadWriteTransaction(transactionID);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
- } else {
- shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()),
- getRef());
- }
-
- expectMsgClass(duration, ReadyTransactionReply.class);
-
- // Send the CanCommitTransaction message.
-
- shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
- .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
- assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
- shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, CommitTransactionReply.class);
-
- shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
- final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
-
- // Use MBean for verification
- // Committed transaction count should increase as usual
- assertEquals(1, shardStats.getCommittedTransactionsCount());
-
- // Commit index should not advance because this does not go into
- // the journal
- assertEquals(-1, shardStats.getCommitIndex());
- }
- };
- }
-
@Test
public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
testCommitWhenTransactionHasModifications(true);