1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
18 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
27 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
36 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
37 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
38 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
39 import org.opendaylight.controller.cluster.datastore.modification.Modification;
40 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
41 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
42 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
43 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
47 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import scala.concurrent.duration.Duration;
55 public class ShardTransactionTest extends AbstractActorTest {
57 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59 private static final ShardIdentifier SHARD_IDENTIFIER =
60 ShardIdentifier.builder().memberName("member-1")
61 .shardName("inventory").type("config").build();
63 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
65 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
67 private final InMemoryDOMDataStore store =
68 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
72 store.onGlobalContextUpdated(testSchemaContext);
75 private ActorRef createShard(){
76 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
77 Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
80 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
81 return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
84 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
85 return newTransactionActor(transaction, null, name, version);
88 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
89 return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
92 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
94 Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
95 testSchemaContext, datastoreContext, shardStats, "txn", version);
96 return getSystem().actorOf(props, name);
100 public void testOnReceiveReadData() throws Exception {
101 new JavaTestKit(getSystem()) {{
102 final ActorRef shard = createShard();
104 testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
106 testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
109 private void testOnReceiveReadData(final ActorRef transaction) {
111 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
114 Object replySerialized =
115 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
117 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
120 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
122 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
124 assertNotNull(reply.getNormalizedNode());
129 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
130 new JavaTestKit(getSystem()) {{
131 final ActorRef shard = createShard();
133 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
134 store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
136 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
137 store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
140 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
142 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
144 Object replySerialized =
145 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
147 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
150 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
152 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
154 assertTrue(reply.getNormalizedNode() == null);
159 public void testOnReceiveReadDataHeliumR1() throws Exception {
160 new JavaTestKit(getSystem()) {{
161 ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
162 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
164 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
167 ShardTransactionMessages.ReadDataReply replySerialized =
168 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
170 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
175 public void testOnReceiveDataExistsPositive() throws Exception {
176 new JavaTestKit(getSystem()) {{
177 final ActorRef shard = createShard();
179 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
180 "testDataExistsPositiveRO"));
182 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
183 "testDataExistsPositiveRW"));
186 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
187 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
190 ShardTransactionMessages.DataExistsReply replySerialized =
191 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
193 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
196 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
198 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
200 assertTrue(reply.exists());
205 public void testOnReceiveDataExistsNegative() throws Exception {
206 new JavaTestKit(getSystem()) {{
207 final ActorRef shard = createShard();
209 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
210 "testDataExistsNegativeRO"));
212 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
213 "testDataExistsNegativeRW"));
216 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
217 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
219 ShardTransactionMessages.DataExistsReply replySerialized =
220 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
222 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
225 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
227 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
229 assertFalse(reply.exists());
233 private void assertModification(final ActorRef subject,
234 final Class<? extends Modification> modificationType) {
235 new JavaTestKit(getSystem()) {{
236 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
238 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
239 GetCompositeModificationReply.class).getModification();
241 assertTrue(compositeModification.getModifications().size() == 1);
242 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
247 public void testOnReceiveWriteData() throws Exception {
248 new JavaTestKit(getSystem()) {{
249 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
250 "testOnReceiveWriteData");
252 transaction.tell(new WriteData(TestModel.TEST_PATH,
253 ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
254 DataStoreVersions.HELIUM_2_VERSION), getRef());
256 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
258 assertModification(transaction, WriteModification.class);
260 // unserialized write
261 transaction.tell(new WriteData(TestModel.TEST_PATH,
262 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
265 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
270 public void testOnReceiveHeliumR1WriteData() throws Exception {
271 new JavaTestKit(getSystem()) {{
272 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
273 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
275 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
276 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
277 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
278 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
279 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
281 transaction.tell(serialized, getRef());
283 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
285 assertModification(transaction, WriteModification.class);
290 public void testOnReceiveMergeData() throws Exception {
291 new JavaTestKit(getSystem()) {{
292 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
295 transaction.tell(new MergeData(TestModel.TEST_PATH,
296 ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
297 DataStoreVersions.HELIUM_2_VERSION), getRef());
299 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
301 assertModification(transaction, MergeModification.class);
304 transaction.tell(new MergeData(TestModel.TEST_PATH,
305 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
308 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
313 public void testOnReceiveHeliumR1MergeData() throws Exception {
314 new JavaTestKit(getSystem()) {{
315 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
316 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
318 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
319 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
320 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
321 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
322 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
324 transaction.tell(serialized, getRef());
326 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
328 assertModification(transaction, MergeModification.class);
333 public void testOnReceiveDeleteData() throws Exception {
334 new JavaTestKit(getSystem()) {{
335 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
338 transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
339 DataStoreVersions.HELIUM_2_VERSION), getRef());
341 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
343 assertModification(transaction, DeleteModification.class);
346 transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
348 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
354 public void testOnReceiveReadyTransaction() throws Exception {
355 new JavaTestKit(getSystem()) {{
356 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
357 "testReadyTransaction");
361 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
363 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
365 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
370 new JavaTestKit(getSystem()) {{
371 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
372 "testReadyTransaction2");
376 transaction.tell(new ReadyTransaction(), getRef());
378 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
380 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
386 public void testOnReceiveCreateSnapshot() throws Exception {
387 new JavaTestKit(getSystem()) {{
388 ShardTest.writeToStore(store, TestModel.TEST_PATH,
389 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
391 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
392 YangInstanceIdentifier.builder().build());
394 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
395 "testOnReceiveCreateSnapshot");
399 transaction.tell(CreateSnapshot.INSTANCE, getRef());
401 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
403 assertNotNull("getSnapshot is null", reply.getSnapshot());
405 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
406 reply.getSnapshot());
408 assertEquals("Root node", expectedRoot, actualRoot);
410 expectTerminated(duration("3 seconds"), transaction);
415 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
416 new JavaTestKit(getSystem()) {{
417 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
418 "testReadWriteTxOnReceiveCloseTransaction");
422 transaction.tell(new CloseTransaction().toSerializable(), getRef());
424 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
425 expectTerminated(duration("3 seconds"), transaction);
430 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
431 new JavaTestKit(getSystem()) {{
432 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
433 "testWriteTxOnReceiveCloseTransaction");
437 transaction.tell(new CloseTransaction().toSerializable(), getRef());
439 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
440 expectTerminated(duration("3 seconds"), transaction);
445 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
446 new JavaTestKit(getSystem()) {{
447 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
448 "testReadOnlyTxOnReceiveCloseTransaction");
452 transaction.tell(new CloseTransaction().toSerializable(), getRef());
454 expectMsgClass(duration("3 seconds"), Terminated.class);
458 @Test(expected=UnknownMessageException.class)
459 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
460 final ActorRef shard = createShard();
461 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
462 testSchemaContext, datastoreContext, shardStats, "txn",
463 DataStoreVersions.CURRENT_VERSION);
464 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
466 transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
467 DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
471 public void testShardTransactionInactivity() {
473 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
474 Duration.create(500, TimeUnit.MILLISECONDS)).build();
476 new JavaTestKit(getSystem()) {{
477 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
478 "testShardTransactionInactivity");
482 expectMsgClass(duration("3 seconds"), Terminated.class);