1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
40 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
47 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 public class ShardTransactionTest extends AbstractActorTest {
62 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
64 private static final ShardIdentifier SHARD_IDENTIFIER =
65 ShardIdentifier.builder().memberName("member-1")
66 .shardName("inventory").type("config").build();
68 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
70 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
72 private final InMemoryDOMDataStore store =
73 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
77 store.onGlobalContextUpdated(testSchemaContext);
80 private ActorRef createShard(){
81 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
85 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
86 return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
89 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
90 return newTransactionActor(transaction, null, name, version);
93 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
94 return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
97 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
99 Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
100 testSchemaContext, datastoreContext, shardStats, "txn", version);
101 return getSystem().actorOf(props, name);
105 public void testOnReceiveReadData() throws Exception {
106 new JavaTestKit(getSystem()) {{
107 final ActorRef shard = createShard();
109 testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
111 testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
114 private void testOnReceiveReadData(final ActorRef transaction) {
116 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
119 Object replySerialized =
120 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
122 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
125 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
127 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
129 assertNotNull(reply.getNormalizedNode());
134 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
135 new JavaTestKit(getSystem()) {{
136 final ActorRef shard = createShard();
138 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
139 store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
141 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
142 store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
145 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
147 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
149 Object replySerialized =
150 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
152 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
155 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
157 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
159 assertTrue(reply.getNormalizedNode() == null);
164 public void testOnReceiveReadDataHeliumR1() throws Exception {
165 new JavaTestKit(getSystem()) {{
166 ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
167 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
169 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
172 ShardTransactionMessages.ReadDataReply replySerialized =
173 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
175 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
180 public void testOnReceiveDataExistsPositive() throws Exception {
181 new JavaTestKit(getSystem()) {{
182 final ActorRef shard = createShard();
184 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
185 "testDataExistsPositiveRO"));
187 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
188 "testDataExistsPositiveRW"));
191 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
192 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
195 ShardTransactionMessages.DataExistsReply replySerialized =
196 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
198 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
201 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
203 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
205 assertTrue(reply.exists());
210 public void testOnReceiveDataExistsNegative() throws Exception {
211 new JavaTestKit(getSystem()) {{
212 final ActorRef shard = createShard();
214 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
215 "testDataExistsNegativeRO"));
217 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
218 "testDataExistsNegativeRW"));
221 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
222 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
224 ShardTransactionMessages.DataExistsReply replySerialized =
225 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
227 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
230 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
232 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
234 assertFalse(reply.exists());
238 private void assertModification(final ActorRef subject,
239 final Class<? extends Modification> modificationType) {
240 new JavaTestKit(getSystem()) {{
241 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
243 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
244 GetCompositeModificationReply.class).getModification();
246 assertTrue(compositeModification.getModifications().size() == 1);
247 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
252 public void testOnReceiveWriteData() throws Exception {
253 new JavaTestKit(getSystem()) {{
254 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
255 "testOnReceiveWriteData");
257 transaction.tell(new WriteData(TestModel.TEST_PATH,
258 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
259 toSerializable(), getRef());
261 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
263 assertModification(transaction, WriteModification.class);
265 // unserialized write
266 transaction.tell(new WriteData(TestModel.TEST_PATH,
267 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
270 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
275 public void testOnReceiveHeliumR1WriteData() throws Exception {
276 new JavaTestKit(getSystem()) {{
277 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
278 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
280 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
281 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
282 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
283 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
284 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
286 transaction.tell(serialized, getRef());
288 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
290 assertModification(transaction, WriteModification.class);
295 public void testOnReceiveMergeData() throws Exception {
296 new JavaTestKit(getSystem()) {{
297 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
300 transaction.tell(new MergeData(TestModel.TEST_PATH,
301 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
302 toSerializable(), getRef());
304 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
306 assertModification(transaction, MergeModification.class);
309 transaction.tell(new MergeData(TestModel.TEST_PATH,
310 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
313 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
318 public void testOnReceiveHeliumR1MergeData() throws Exception {
319 new JavaTestKit(getSystem()) {{
320 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
321 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
323 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
324 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
325 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
326 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
327 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
329 transaction.tell(serialized, getRef());
331 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
333 assertModification(transaction, MergeModification.class);
338 public void testOnReceiveDeleteData() throws Exception {
339 new JavaTestKit(getSystem()) {{
340 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
343 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
344 toSerializable(), getRef());
346 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
348 assertModification(transaction, DeleteModification.class);
351 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
353 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
358 public void testOnReceiveBatchedModifications() throws Exception {
359 new JavaTestKit(getSystem()) {{
361 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
362 final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
364 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
365 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
366 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
367 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
369 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
370 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
371 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
373 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
375 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
376 batched.addModification(new WriteModification(writePath, writeData));
377 batched.addModification(new MergeModification(mergePath, mergeData));
378 batched.addModification(new DeleteModification(deletePath));
380 transaction.tell(batched, getRef());
382 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
383 assertEquals("getNumBatched", 3, reply.getNumBatched());
385 JavaTestKit verification = new JavaTestKit(getSystem());
386 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
388 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
389 GetCompositeModificationReply.class).getModification();
391 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
393 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
394 assertEquals("getPath", writePath, write.getPath());
395 assertEquals("getData", writeData, write.getData());
397 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
398 assertEquals("getPath", mergePath, merge.getPath());
399 assertEquals("getData", mergeData, merge.getData());
401 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
402 assertEquals("getPath", deletePath, delete.getPath());
404 InOrder inOrder = Mockito.inOrder(mockWriteTx);
405 inOrder.verify(mockWriteTx).write(writePath, writeData);
406 inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
407 inOrder.verify(mockWriteTx).delete(deletePath);
412 public void testOnReceiveBatchedModificationsReady() throws Exception {
413 new JavaTestKit(getSystem()) {{
415 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
416 "testOnReceiveBatchedModificationsReady");
418 JavaTestKit watcher = new JavaTestKit(getSystem());
419 watcher.watch(transaction);
421 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
422 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
423 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
424 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
426 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
427 batched.setReady(true);
428 batched.addModification(new WriteModification(writePath, writeData));
430 transaction.tell(batched, getRef());
432 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
433 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
438 public void testOnReceivePreLithiumReadyTransaction() throws Exception {
439 new JavaTestKit(getSystem()) {{
440 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
441 "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
443 JavaTestKit watcher = new JavaTestKit(getSystem());
444 watcher.watch(transaction);
446 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
448 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
449 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
453 new JavaTestKit(getSystem()) {{
454 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
455 "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
457 JavaTestKit watcher = new JavaTestKit(getSystem());
458 watcher.watch(transaction);
460 transaction.tell(new ReadyTransaction(), getRef());
462 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
463 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
468 public void testOnReceiveCreateSnapshot() throws Exception {
469 new JavaTestKit(getSystem()) {{
470 ShardTest.writeToStore(store, TestModel.TEST_PATH,
471 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
473 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
474 YangInstanceIdentifier.builder().build());
476 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
477 "testOnReceiveCreateSnapshot");
481 transaction.tell(CreateSnapshot.INSTANCE, getRef());
483 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
485 assertNotNull("getSnapshot is null", reply.getSnapshot());
487 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
488 reply.getSnapshot());
490 assertEquals("Root node", expectedRoot, actualRoot);
492 expectTerminated(duration("3 seconds"), transaction);
497 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
498 new JavaTestKit(getSystem()) {{
499 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
500 "testReadWriteTxOnReceiveCloseTransaction");
504 transaction.tell(new CloseTransaction().toSerializable(), getRef());
506 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
507 expectTerminated(duration("3 seconds"), transaction);
512 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
513 new JavaTestKit(getSystem()) {{
514 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
515 "testWriteTxOnReceiveCloseTransaction");
519 transaction.tell(new CloseTransaction().toSerializable(), getRef());
521 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
522 expectTerminated(duration("3 seconds"), transaction);
527 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
528 new JavaTestKit(getSystem()) {{
529 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
530 "testReadOnlyTxOnReceiveCloseTransaction");
534 transaction.tell(new CloseTransaction().toSerializable(), getRef());
536 expectMsgClass(duration("3 seconds"), Terminated.class);
540 @Test(expected=UnknownMessageException.class)
541 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
542 final ActorRef shard = createShard();
543 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
544 testSchemaContext, datastoreContext, shardStats, "txn",
545 DataStoreVersions.CURRENT_VERSION);
546 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
548 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
549 toSerializable(), ActorRef.noSender());
553 public void testShardTransactionInactivity() {
555 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
556 500, TimeUnit.MILLISECONDS).build();
558 new JavaTestKit(getSystem()) {{
559 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
560 "testShardTransactionInactivity");
564 expectMsgClass(duration("3 seconds"), Terminated.class);