2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 public class ShardTransactionTest extends AbstractActorTest {
58 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59 private static final TransactionType RO = TransactionType.READ_ONLY;
60 private static final TransactionType RW = TransactionType.READ_WRITE;
61 private static final TransactionType WO = TransactionType.WRITE_ONLY;
63 private static final ShardIdentifier SHARD_IDENTIFIER =
64 ShardIdentifier.builder().memberName("member-1")
65 .shardName("inventory").type("config").build();
67 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
69 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
73 private int txCounter = 0;
75 private ActorRef createShard() {
76 ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
77 schemaContext(TestModel.createTestContext()).props());
78 ShardTestKit.waitUntilLeader(shard);
82 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
83 return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
86 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
87 return newTransactionActor(type, transaction, null, name, version);
90 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
91 return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
94 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
96 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
97 datastoreContext, shardStats, "txn", version);
98 return getSystem().actorOf(props, name);
101 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
102 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
105 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
106 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
110 public void testOnReceiveReadData() throws Exception {
111 new JavaTestKit(getSystem()) {{
112 final ActorRef shard = createShard();
114 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
116 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
119 private void testOnReceiveReadData(final ActorRef transaction) {
120 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
121 DataStoreVersions.CURRENT_VERSION),getRef());
123 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
125 assertNotNull(reply.getNormalizedNode());
130 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
131 new JavaTestKit(getSystem()) {{
132 final ActorRef shard = createShard();
134 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
135 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
137 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
138 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
141 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
142 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
144 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
146 assertTrue(reply.getNormalizedNode() == null);
151 public void testOnReceiveDataExistsPositive() throws Exception {
152 new JavaTestKit(getSystem()) {{
153 final ActorRef shard = createShard();
155 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
156 "testDataExistsPositiveRO"));
158 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
159 "testDataExistsPositiveRW"));
162 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
163 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
164 DataStoreVersions.CURRENT_VERSION),getRef());
166 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
168 assertTrue(reply.exists());
173 public void testOnReceiveDataExistsNegative() throws Exception {
174 new JavaTestKit(getSystem()) {{
175 final ActorRef shard = createShard();
177 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
178 "testDataExistsNegativeRO"));
180 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
181 "testDataExistsNegativeRW"));
184 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
185 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
187 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
189 assertFalse(reply.exists());
194 public void testOnReceiveBatchedModifications() throws Exception {
195 new JavaTestKit(getSystem()) {{
197 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
198 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
199 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
200 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
202 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
203 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
204 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
205 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
207 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
208 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
209 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
211 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
213 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
214 batched.addModification(new WriteModification(writePath, writeData));
215 batched.addModification(new MergeModification(mergePath, mergeData));
216 batched.addModification(new DeleteModification(deletePath));
218 transaction.tell(batched, getRef());
220 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
221 assertEquals("getNumBatched", 3, reply.getNumBatched());
223 InOrder inOrder = Mockito.inOrder(mockModification);
224 inOrder.verify(mockModification).write(writePath, writeData);
225 inOrder.verify(mockModification).merge(mergePath, mergeData);
226 inOrder.verify(mockModification).delete(deletePath);
231 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
232 new JavaTestKit(getSystem()) {{
234 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
235 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
237 JavaTestKit watcher = new JavaTestKit(getSystem());
238 watcher.watch(transaction);
240 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
241 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
242 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
243 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
245 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
246 batched.addModification(new WriteModification(writePath, writeData));
248 transaction.tell(batched, getRef());
249 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
250 assertEquals("getNumBatched", 1, reply.getNumBatched());
252 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
253 batched.setReady(true);
254 batched.setTotalMessagesSent(2);
256 transaction.tell(batched, getRef());
257 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
258 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
263 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
264 new JavaTestKit(getSystem()) {{
266 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
267 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
269 JavaTestKit watcher = new JavaTestKit(getSystem());
270 watcher.watch(transaction);
272 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
273 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
274 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
275 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
277 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
278 batched.addModification(new WriteModification(writePath, writeData));
279 batched.setReady(true);
280 batched.setDoCommitOnReady(true);
281 batched.setTotalMessagesSent(1);
283 transaction.tell(batched, getRef());
284 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
285 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
289 @Test(expected=TestException.class)
290 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
291 new JavaTestKit(getSystem()) {{
293 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
294 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
295 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
296 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
297 "testOnReceiveBatchedModificationsFailure");
299 JavaTestKit watcher = new JavaTestKit(getSystem());
300 watcher.watch(transaction);
302 YangInstanceIdentifier path = TestModel.TEST_PATH;
303 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
305 doThrow(new TestException()).when(mockModification).write(path, node);
307 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
308 batched.addModification(new WriteModification(path, node));
310 transaction.tell(batched, getRef());
311 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
313 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
314 batched.setReady(true);
315 batched.setTotalMessagesSent(2);
317 transaction.tell(batched, getRef());
318 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
319 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
321 if(failure != null) {
322 throw failure.cause();
327 @Test(expected=IllegalStateException.class)
328 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
329 new JavaTestKit(getSystem()) {{
331 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
332 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
334 JavaTestKit watcher = new JavaTestKit(getSystem());
335 watcher.watch(transaction);
337 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
338 batched.setReady(true);
339 batched.setTotalMessagesSent(2);
341 transaction.tell(batched, getRef());
343 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
344 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
346 if(failure != null) {
347 throw failure.cause();
353 public void testOnReceiveCreateSnapshot() throws Exception {
354 new JavaTestKit(getSystem()) {{
355 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
356 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
358 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
359 YangInstanceIdentifier.builder().build());
361 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
362 "testOnReceiveCreateSnapshot");
366 transaction.tell(CreateSnapshot.INSTANCE, getRef());
368 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
370 assertNotNull("getSnapshot is null", reply.getSnapshot());
372 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
373 reply.getSnapshot());
375 assertEquals("Root node", expectedRoot, actualRoot);
377 expectTerminated(duration("3 seconds"), transaction);
382 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
383 new JavaTestKit(getSystem()) {{
384 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
385 "testReadWriteTxOnReceiveCloseTransaction");
389 transaction.tell(new CloseTransaction().toSerializable(), getRef());
391 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
392 expectTerminated(duration("3 seconds"), transaction);
397 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
398 new JavaTestKit(getSystem()) {{
399 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
400 "testWriteTxOnReceiveCloseTransaction");
404 transaction.tell(new CloseTransaction().toSerializable(), getRef());
406 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
407 expectTerminated(duration("3 seconds"), transaction);
412 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
413 new JavaTestKit(getSystem()) {{
414 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
415 "testReadOnlyTxOnReceiveCloseTransaction");
419 transaction.tell(new CloseTransaction().toSerializable(), getRef());
421 expectMsgClass(duration("3 seconds"), Terminated.class);
425 @Test(expected=UnknownMessageException.class)
426 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
427 final ActorRef shard = createShard();
428 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
429 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
430 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
432 transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
433 ActorRef.noSender());
437 public void testShardTransactionInactivity() {
439 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
440 500, TimeUnit.MILLISECONDS).build();
442 new JavaTestKit(getSystem()) {{
443 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
444 "testShardTransactionInactivity");
448 expectMsgClass(duration("3 seconds"), Terminated.class);
453 public void testOnReceivePreBoronReadData() throws Exception {
454 new JavaTestKit(getSystem()) {{
455 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
456 "testOnReceivePreBoronReadData");
458 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
459 toSerializable(), getRef());
461 Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
462 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
467 public void testOnReceivePreBoronDataExists() throws Exception {
468 new JavaTestKit(getSystem()) {{
469 ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
470 "testOnReceivePreBoronDataExists");
472 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
473 toSerializable(), getRef());
475 Object replySerialized = expectMsgClass(duration("5 seconds"),
476 ShardTransactionMessages.DataExistsReply.class);
477 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
481 public static class TestException extends RuntimeException {
482 private static final long serialVersionUID = 1L;