2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.time.Duration;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
40 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.raft.TestActorFactory;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57 public class ShardTransactionTest extends AbstractActorTest {
59 private static final TransactionType RO = TransactionType.READ_ONLY;
60 private static final TransactionType RW = TransactionType.READ_WRITE;
61 private static final TransactionType WO = TransactionType.WRITE_ONLY;
63 private static final ShardIdentifier SHARD_IDENTIFIER =
64 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
65 private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
67 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
69 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
71 private TestActorRef<Shard> shard;
72 private ShardDataTree store;
73 private TestKit testKit;
77 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
78 .schemaContextProvider(() -> TEST_MODEL).props()
79 .withDispatcher(Dispatchers.DefaultDispatcherId()));
80 ShardTestKit.waitUntilLeader(shard);
81 store = shard.underlyingActor().getDataStore();
82 testKit = new TestKit(getSystem());
85 private ActorRef newTransactionActor(final TransactionType type,
86 final AbstractShardDataTreeTransaction<?> transaction, final String name) {
87 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
88 shard.underlyingActor().getShardMBean());
89 return actorFactory.createActorNoVerify(props, name);
92 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
93 return store.newReadOnlyTransaction(nextTransactionId());
96 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
97 return store.newReadWriteTransaction(nextTransactionId());
101 public void testOnReceiveReadData() {
102 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
103 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
106 private void testOnReceiveReadData(final ActorRef transaction) {
107 transaction.tell(new ReadData(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
110 ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
112 assertNotNull(reply.getNormalizedNode());
116 public void testOnReceiveReadDataWhenDataNotFound() {
117 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
118 "testReadDataWhenDataNotFoundRO"));
119 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
120 "testReadDataWhenDataNotFoundRW"));
123 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
124 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
126 ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
128 assertNull(reply.getNormalizedNode());
132 public void testOnReceiveDataExistsPositive() {
133 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
134 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
137 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
138 transaction.tell(new DataExists(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
141 DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
143 assertTrue(reply.exists());
147 public void testOnReceiveDataExistsNegative() {
148 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
149 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
152 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
153 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
155 DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
157 assertFalse(reply.exists());
161 public void testOnReceiveBatchedModifications() {
162 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
163 DataTreeModification mockModification = mock(DataTreeModification.class);
164 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
165 nextTransactionId(), mockModification);
166 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
168 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
169 NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
170 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
171 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
173 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
174 NormalizedNode mergeData = ImmutableContainerNodeBuilder.create()
175 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
178 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
180 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
181 DataStoreVersions.CURRENT_VERSION);
182 batched.addModification(new WriteModification(writePath, writeData));
183 batched.addModification(new MergeModification(mergePath, mergeData));
184 batched.addModification(new DeleteModification(deletePath));
186 transaction.tell(batched, testKit.getRef());
188 BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
189 BatchedModificationsReply.class);
190 assertEquals("getNumBatched", 3, reply.getNumBatched());
192 InOrder inOrder = inOrder(mockModification);
193 inOrder.verify(mockModification).write(writePath, writeData);
194 inOrder.verify(mockModification).merge(mergePath, mergeData);
195 inOrder.verify(mockModification).delete(deletePath);
199 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
200 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
201 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
203 TestKit watcher = new TestKit(getSystem());
204 watcher.watch(transaction);
206 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
207 NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
208 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
209 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
211 final TransactionIdentifier tx1 = nextTransactionId();
212 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
213 batched.addModification(new WriteModification(writePath, writeData));
215 transaction.tell(batched, testKit.getRef());
216 BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
217 BatchedModificationsReply.class);
218 assertEquals("getNumBatched", 1, reply.getNumBatched());
220 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
222 batched.setTotalMessagesSent(2);
224 transaction.tell(batched, testKit.getRef());
225 testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
226 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
230 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
231 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
232 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
234 TestKit watcher = new TestKit(getSystem());
235 watcher.watch(transaction);
237 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
238 NormalizedNode writeData = ImmutableContainerNodeBuilder.create()
239 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
240 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
242 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
243 DataStoreVersions.CURRENT_VERSION);
244 batched.addModification(new WriteModification(writePath, writeData));
246 batched.setDoCommitOnReady(true);
247 batched.setTotalMessagesSent(1);
249 transaction.tell(batched, testKit.getRef());
250 testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
251 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
254 @Test(expected = TestException.class)
255 public void testOnReceiveBatchedModificationsFailure() throws Exception {
256 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
257 DataTreeModification mockModification = mock(DataTreeModification.class);
258 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
259 nextTransactionId(), mockModification);
260 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
261 "testOnReceiveBatchedModificationsFailure");
263 TestKit watcher = new TestKit(getSystem());
264 watcher.watch(transaction);
266 YangInstanceIdentifier path = TestModel.TEST_PATH;
267 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
269 doThrow(new TestException()).when(mockModification).write(path, node);
271 final TransactionIdentifier tx1 = nextTransactionId();
272 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
273 batched.addModification(new WriteModification(path, node));
275 transaction.tell(batched, testKit.getRef());
276 testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
278 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
280 batched.setTotalMessagesSent(2);
282 transaction.tell(batched, testKit.getRef());
283 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
284 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
286 if (failure != null) {
287 Throwables.propagateIfPossible(failure.cause(), Exception.class);
288 throw new RuntimeException(failure.cause());
292 @Test(expected = IllegalStateException.class)
293 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
294 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
295 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
297 TestKit watcher = new TestKit(getSystem());
298 watcher.watch(transaction);
300 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
301 DataStoreVersions.CURRENT_VERSION);
303 batched.setTotalMessagesSent(2);
305 transaction.tell(batched, testKit.getRef());
307 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
308 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
310 if (failure != null) {
311 Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
312 Throwables.throwIfUnchecked(failure.cause());
313 throw new RuntimeException(failure.cause());
318 public void testReadWriteTxOnReceiveCloseTransaction() {
319 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
320 "testReadWriteTxOnReceiveCloseTransaction");
322 testKit.watch(transaction);
324 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
326 testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
327 testKit.expectTerminated(Duration.ofSeconds(3), transaction);
331 public void testWriteOnlyTxOnReceiveCloseTransaction() {
332 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
333 "testWriteTxOnReceiveCloseTransaction");
335 testKit.watch(transaction);
337 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
339 testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
340 testKit.expectTerminated(Duration.ofSeconds(3), transaction);
344 public void testReadOnlyTxOnReceiveCloseTransaction() {
345 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
346 "testReadOnlyTxOnReceiveCloseTransaction");
348 testKit.watch(transaction);
350 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
352 testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
356 public void testShardTransactionInactivity() {
357 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
358 500, TimeUnit.MILLISECONDS).build();
360 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
361 "testShardTransactionInactivity");
363 testKit.watch(transaction);
365 testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
368 public static class TestException extends RuntimeException {
369 private static final long serialVersionUID = 1L;