2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Ignore;
24 import org.junit.Test;
25 import org.mockito.InOrder;
26 import org.mockito.Mockito;
27 import org.opendaylight.controller.cluster.access.concepts.MemberName;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
46 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
47 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 public class ShardTransactionTest extends AbstractActorTest {
59 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
60 private static final TransactionType RO = TransactionType.READ_ONLY;
61 private static final TransactionType RW = TransactionType.READ_WRITE;
62 private static final TransactionType WO = TransactionType.WRITE_ONLY;
64 private static final ShardIdentifier SHARD_IDENTIFIER =
65 ShardIdentifier.create("inventory", MemberName.forName("member-1"), "config");
67 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
69 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
73 private int txCounter = 0;
75 private ActorRef createShard() {
76 ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
77 schemaContext(TestModel.createTestContext()).props());
78 ShardTestKit.waitUntilLeader(shard);
82 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
83 return newTransactionActor(type, transaction, null, name);
86 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
87 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
88 datastoreContext, shardStats);
89 return getSystem().actorOf(props, name);
92 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
93 return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
96 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
97 return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
101 public void testOnReceiveReadData() throws Exception {
102 new JavaTestKit(getSystem()) {{
103 final ActorRef shard = createShard();
105 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
107 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
110 private void testOnReceiveReadData(final ActorRef transaction) {
111 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
112 DataStoreVersions.CURRENT_VERSION), getRef());
114 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
116 assertNotNull(reply.getNormalizedNode());
121 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
122 new JavaTestKit(getSystem()) {{
123 final ActorRef shard = createShard();
125 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
126 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
128 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
129 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
132 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
133 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
135 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
137 assertTrue(reply.getNormalizedNode() == null);
142 public void testOnReceiveDataExistsPositive() throws Exception {
143 new JavaTestKit(getSystem()) {{
144 final ActorRef shard = createShard();
146 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
147 "testDataExistsPositiveRO"));
149 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
150 "testDataExistsPositiveRW"));
153 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
154 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
155 DataStoreVersions.CURRENT_VERSION), getRef());
157 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
159 assertTrue(reply.exists());
164 public void testOnReceiveDataExistsNegative() throws Exception {
165 new JavaTestKit(getSystem()) {{
166 final ActorRef shard = createShard();
168 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
169 "testDataExistsNegativeRO"));
171 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
172 "testDataExistsNegativeRW"));
175 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
176 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
178 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
180 assertFalse(reply.exists());
185 public void testOnReceiveBatchedModifications() throws Exception {
186 new JavaTestKit(getSystem()) {{
188 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
189 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
190 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
191 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
193 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
194 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
195 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
196 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
198 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
199 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
200 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
202 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
204 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
205 batched.addModification(new WriteModification(writePath, writeData));
206 batched.addModification(new MergeModification(mergePath, mergeData));
207 batched.addModification(new DeleteModification(deletePath));
209 transaction.tell(batched, getRef());
211 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
212 assertEquals("getNumBatched", 3, reply.getNumBatched());
214 InOrder inOrder = Mockito.inOrder(mockModification);
215 inOrder.verify(mockModification).write(writePath, writeData);
216 inOrder.verify(mockModification).merge(mergePath, mergeData);
217 inOrder.verify(mockModification).delete(deletePath);
222 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
223 new JavaTestKit(getSystem()) {{
225 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
226 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
228 JavaTestKit watcher = new JavaTestKit(getSystem());
229 watcher.watch(transaction);
231 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
232 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
233 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
234 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
236 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
237 batched.addModification(new WriteModification(writePath, writeData));
239 transaction.tell(batched, getRef());
240 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
241 assertEquals("getNumBatched", 1, reply.getNumBatched());
243 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
244 batched.setReady(true);
245 batched.setTotalMessagesSent(2);
247 transaction.tell(batched, getRef());
248 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
249 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
254 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
255 new JavaTestKit(getSystem()) {{
257 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
258 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
260 JavaTestKit watcher = new JavaTestKit(getSystem());
261 watcher.watch(transaction);
263 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
264 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
265 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
266 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
268 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
269 batched.addModification(new WriteModification(writePath, writeData));
270 batched.setReady(true);
271 batched.setDoCommitOnReady(true);
272 batched.setTotalMessagesSent(1);
274 transaction.tell(batched, getRef());
275 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
276 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
280 @Test(expected=TestException.class)
281 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
282 new JavaTestKit(getSystem()) {{
284 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
285 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
286 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
287 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
288 "testOnReceiveBatchedModificationsFailure");
290 JavaTestKit watcher = new JavaTestKit(getSystem());
291 watcher.watch(transaction);
293 YangInstanceIdentifier path = TestModel.TEST_PATH;
294 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
296 doThrow(new TestException()).when(mockModification).write(path, node);
298 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
299 batched.addModification(new WriteModification(path, node));
301 transaction.tell(batched, getRef());
302 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
304 batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
305 batched.setReady(true);
306 batched.setTotalMessagesSent(2);
308 transaction.tell(batched, getRef());
309 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
310 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
312 if(failure != null) {
313 throw failure.cause();
318 @Test(expected=IllegalStateException.class)
319 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
320 new JavaTestKit(getSystem()) {{
322 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
323 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
325 JavaTestKit watcher = new JavaTestKit(getSystem());
326 watcher.watch(transaction);
328 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
329 batched.setReady(true);
330 batched.setTotalMessagesSent(2);
332 transaction.tell(batched, getRef());
334 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
335 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
337 if(failure != null) {
338 throw failure.cause();
344 public void testOnReceiveCreateSnapshot() throws Exception {
345 new JavaTestKit(getSystem()) {{
346 ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
347 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
349 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
350 YangInstanceIdentifier.EMPTY);
352 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
353 "testOnReceiveCreateSnapshot");
357 transaction.tell(CreateSnapshot.INSTANCE, getRef());
359 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
361 assertNotNull("getSnapshot is null", reply.getSnapshot());
363 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
364 reply.getSnapshot());
366 assertEquals("Root node", expectedRoot, actualRoot);
368 expectTerminated(duration("3 seconds"), transaction);
373 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
374 new JavaTestKit(getSystem()) {{
375 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
376 "testReadWriteTxOnReceiveCloseTransaction");
380 transaction.tell(new CloseTransaction().toSerializable(), getRef());
382 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
383 expectTerminated(duration("3 seconds"), transaction);
388 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
389 new JavaTestKit(getSystem()) {{
390 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
391 "testWriteTxOnReceiveCloseTransaction");
395 transaction.tell(new CloseTransaction().toSerializable(), getRef());
397 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
398 expectTerminated(duration("3 seconds"), transaction);
403 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
404 new JavaTestKit(getSystem()) {{
405 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
406 "testReadOnlyTxOnReceiveCloseTransaction");
410 transaction.tell(new CloseTransaction().toSerializable(), getRef());
412 expectMsgClass(duration("3 seconds"), Terminated.class);
416 // Unknown operations are being logged
418 @Test(expected=UnknownMessageException.class)
419 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
420 final ActorRef shard = createShard();
421 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
422 datastoreContext, shardStats);
423 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
425 transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
426 ActorRef.noSender());
430 public void testShardTransactionInactivity() {
432 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
433 500, TimeUnit.MILLISECONDS).build();
435 new JavaTestKit(getSystem()) {{
436 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
437 "testShardTransactionInactivity");
441 expectMsgClass(duration("3 seconds"), Terminated.class);
444 public static class TestException extends RuntimeException {
445 private static final long serialVersionUID = 1L;