2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Ignore;
24 import org.junit.Test;
25 import org.mockito.InOrder;
26 import org.mockito.Mockito;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
51 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 public class ShardTransactionTest extends AbstractActorTest {
56 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
57 private static final TransactionType RO = TransactionType.READ_ONLY;
58 private static final TransactionType RW = TransactionType.READ_WRITE;
59 private static final TransactionType WO = TransactionType.WRITE_ONLY;
61 private static final ShardIdentifier SHARD_IDENTIFIER =
62 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
65 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
67 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
69 private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
71 private ActorRef createShard() {
72 ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
73 schemaContext(TestModel.createTestContext()).props());
74 ShardTestKit.waitUntilLeader(shard);
78 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
79 return newTransactionActor(type, transaction, null, name);
82 private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
83 Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
84 datastoreContext, shardStats);
85 return getSystem().actorOf(props, name);
88 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
89 return store.newReadOnlyTransaction(nextTransactionId());
92 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
93 return store.newReadWriteTransaction(nextTransactionId());
97 public void testOnReceiveReadData() throws Exception {
98 new JavaTestKit(getSystem()) {{
99 final ActorRef shard = createShard();
101 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
103 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
106 private void testOnReceiveReadData(final ActorRef transaction) {
107 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY,
108 DataStoreVersions.CURRENT_VERSION), getRef());
110 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
112 assertNotNull(reply.getNormalizedNode());
117 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
118 new JavaTestKit(getSystem()) {{
119 final ActorRef shard = createShard();
121 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
122 RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
124 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
125 RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
128 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
129 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
131 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
133 assertTrue(reply.getNormalizedNode() == null);
138 public void testOnReceiveDataExistsPositive() throws Exception {
139 new JavaTestKit(getSystem()) {{
140 final ActorRef shard = createShard();
142 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
143 "testDataExistsPositiveRO"));
145 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
146 "testDataExistsPositiveRW"));
149 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
150 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY,
151 DataStoreVersions.CURRENT_VERSION), getRef());
153 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
155 assertTrue(reply.exists());
160 public void testOnReceiveDataExistsNegative() throws Exception {
161 new JavaTestKit(getSystem()) {{
162 final ActorRef shard = createShard();
164 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
165 "testDataExistsNegativeRO"));
167 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
168 "testDataExistsNegativeRW"));
171 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
172 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
174 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
176 assertFalse(reply.exists());
181 public void testOnReceiveBatchedModifications() throws Exception {
182 new JavaTestKit(getSystem()) {{
184 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
185 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
186 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
187 nextTransactionId(), mockModification);
188 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
190 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
191 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
192 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
193 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
195 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
196 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
197 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
199 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
201 BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
202 batched.addModification(new WriteModification(writePath, writeData));
203 batched.addModification(new MergeModification(mergePath, mergeData));
204 batched.addModification(new DeleteModification(deletePath));
206 transaction.tell(batched, getRef());
208 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
209 assertEquals("getNumBatched", 3, reply.getNumBatched());
211 InOrder inOrder = Mockito.inOrder(mockModification);
212 inOrder.verify(mockModification).write(writePath, writeData);
213 inOrder.verify(mockModification).merge(mergePath, mergeData);
214 inOrder.verify(mockModification).delete(deletePath);
219 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
220 new JavaTestKit(getSystem()) {{
222 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
223 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
225 JavaTestKit watcher = new JavaTestKit(getSystem());
226 watcher.watch(transaction);
228 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
229 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
230 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
231 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
233 final TransactionIdentifier tx1 = nextTransactionId();
234 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
235 batched.addModification(new WriteModification(writePath, writeData));
237 transaction.tell(batched, getRef());
238 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
239 assertEquals("getNumBatched", 1, reply.getNumBatched());
241 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
242 batched.setReady(true);
243 batched.setTotalMessagesSent(2);
245 transaction.tell(batched, getRef());
246 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
247 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
252 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
253 new JavaTestKit(getSystem()) {{
255 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
256 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
258 JavaTestKit watcher = new JavaTestKit(getSystem());
259 watcher.watch(transaction);
261 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
262 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
263 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
264 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
266 BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
267 batched.addModification(new WriteModification(writePath, writeData));
268 batched.setReady(true);
269 batched.setDoCommitOnReady(true);
270 batched.setTotalMessagesSent(1);
272 transaction.tell(batched, getRef());
273 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
274 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
278 @Test(expected=TestException.class)
279 public void testOnReceiveBatchedModificationsFailure() throws Throwable {
280 new JavaTestKit(getSystem()) {{
282 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
283 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
284 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
285 nextTransactionId(), mockModification);
286 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
287 "testOnReceiveBatchedModificationsFailure");
289 JavaTestKit watcher = new JavaTestKit(getSystem());
290 watcher.watch(transaction);
292 YangInstanceIdentifier path = TestModel.TEST_PATH;
293 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
295 doThrow(new TestException()).when(mockModification).write(path, node);
297 final TransactionIdentifier tx1 = nextTransactionId();
298 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
299 batched.addModification(new WriteModification(path, node));
301 transaction.tell(batched, getRef());
302 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
304 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
305 batched.setReady(true);
306 batched.setTotalMessagesSent(2);
308 transaction.tell(batched, getRef());
309 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
310 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
312 if(failure != null) {
313 throw failure.cause();
318 @Test(expected=IllegalStateException.class)
319 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
320 new JavaTestKit(getSystem()) {{
322 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
323 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
325 JavaTestKit watcher = new JavaTestKit(getSystem());
326 watcher.watch(transaction);
328 BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
329 batched.setReady(true);
330 batched.setTotalMessagesSent(2);
332 transaction.tell(batched, getRef());
334 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
335 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
337 if(failure != null) {
338 throw failure.cause();
344 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
345 new JavaTestKit(getSystem()) {{
346 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
347 "testReadWriteTxOnReceiveCloseTransaction");
351 transaction.tell(new CloseTransaction().toSerializable(), getRef());
353 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
354 expectTerminated(duration("3 seconds"), transaction);
359 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
360 new JavaTestKit(getSystem()) {{
361 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
362 "testWriteTxOnReceiveCloseTransaction");
366 transaction.tell(new CloseTransaction().toSerializable(), getRef());
368 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
369 expectTerminated(duration("3 seconds"), transaction);
374 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
375 new JavaTestKit(getSystem()) {{
376 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
377 "testReadOnlyTxOnReceiveCloseTransaction");
381 transaction.tell(new CloseTransaction().toSerializable(), getRef());
383 expectMsgClass(duration("3 seconds"), Terminated.class);
387 // Unknown operations are being logged
389 @Test(expected=UnknownMessageException.class)
390 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
391 final ActorRef shard = createShard();
392 final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
393 datastoreContext, shardStats);
394 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
396 transaction.receive(new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION),
397 ActorRef.noSender());
401 public void testShardTransactionInactivity() {
403 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
404 500, TimeUnit.MILLISECONDS).build();
406 new JavaTestKit(getSystem()) {{
407 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
408 "testShardTransactionInactivity");
412 expectMsgClass(duration("3 seconds"), Terminated.class);
415 public static class TestException extends RuntimeException {
416 private static final long serialVersionUID = 1L;