1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.BeforeClass;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
18 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
27 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
28 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
35 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
36 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
37 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
38 import org.opendaylight.controller.cluster.datastore.modification.Modification;
39 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
40 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
41 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
44 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import scala.concurrent.duration.Duration;
50 public class ShardTransactionTest extends AbstractActorTest {
51 private static final InMemoryDOMDataStore store =
52 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
54 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
56 private static final ShardIdentifier SHARD_IDENTIFIER =
57 ShardIdentifier.builder().memberName("member-1")
58 .shardName("inventory").type("config").build();
60 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
62 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
65 public static void staticSetup() {
66 store.onGlobalContextUpdated(testSchemaContext);
69 private ActorRef createShard(){
70 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
71 Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
75 public void testOnReceiveReadData() throws Exception {
76 new JavaTestKit(getSystem()) {{
77 final ActorRef shard = createShard();
78 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
79 testSchemaContext, datastoreContext, shardStats, "txn",
80 DataStoreVersions.CURRENT_VERSION);
82 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
84 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
85 testSchemaContext, datastoreContext, shardStats, "txn",
86 DataStoreVersions.CURRENT_VERSION);
88 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
91 private void testOnReceiveReadData(final ActorRef transaction) {
93 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
96 Object replySerialized =
97 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
99 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
102 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
104 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
106 assertNotNull(reply.getNormalizedNode());
111 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
112 new JavaTestKit(getSystem()) {{
113 final ActorRef shard = createShard();
114 Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
115 testSchemaContext, datastoreContext, shardStats, "txn",
116 DataStoreVersions.CURRENT_VERSION);
118 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
119 props, "testReadDataWhenDataNotFoundRO"));
121 props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
122 testSchemaContext, datastoreContext, shardStats, "txn",
123 DataStoreVersions.CURRENT_VERSION);
125 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
126 props, "testReadDataWhenDataNotFoundRW"));
129 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
131 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
133 Object replySerialized =
134 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
136 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
139 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
141 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
143 assertTrue(reply.getNormalizedNode() == null);
148 public void testOnReceiveReadDataHeliumR1() throws Exception {
149 new JavaTestKit(getSystem()) {{
150 final ActorRef shard = createShard();
151 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
152 testSchemaContext, datastoreContext, shardStats, "txn",
153 DataStoreVersions.HELIUM_1_VERSION);
155 ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
157 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
160 ShardTransactionMessages.ReadDataReply replySerialized =
161 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
163 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
168 public void testOnReceiveDataExistsPositive() throws Exception {
169 new JavaTestKit(getSystem()) {{
170 final ActorRef shard = createShard();
171 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
172 testSchemaContext, datastoreContext, shardStats, "txn",
173 DataStoreVersions.CURRENT_VERSION);
175 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
177 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
178 testSchemaContext, datastoreContext, shardStats, "txn",
179 DataStoreVersions.CURRENT_VERSION);
181 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
184 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
185 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
188 ShardTransactionMessages.DataExistsReply replySerialized =
189 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
191 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
194 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
196 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
198 assertTrue(reply.exists());
203 public void testOnReceiveDataExistsNegative() throws Exception {
204 new JavaTestKit(getSystem()) {{
205 final ActorRef shard = createShard();
206 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
207 testSchemaContext, datastoreContext, shardStats, "txn",
208 DataStoreVersions.CURRENT_VERSION);
210 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
212 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
213 testSchemaContext, datastoreContext, shardStats, "txn",
214 DataStoreVersions.CURRENT_VERSION);
216 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
219 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
220 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
222 ShardTransactionMessages.DataExistsReply replySerialized =
223 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
225 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
228 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
230 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
232 assertFalse(reply.exists());
236 private void assertModification(final ActorRef subject,
237 final Class<? extends Modification> modificationType) {
238 new JavaTestKit(getSystem()) {{
239 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
241 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
242 GetCompositeModificationReply.class).getModification();
244 assertTrue(compositeModification.getModifications().size() == 1);
245 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
250 public void testOnReceiveWriteData() throws Exception {
251 new JavaTestKit(getSystem()) {{
252 final ActorRef shard = createShard();
253 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
254 testSchemaContext, datastoreContext, shardStats, "txn",
255 DataStoreVersions.CURRENT_VERSION);
256 final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
258 transaction.tell(new WriteData(TestModel.TEST_PATH,
259 ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
260 DataStoreVersions.HELIUM_2_VERSION), getRef());
262 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
264 assertModification(transaction, WriteModification.class);
266 // unserialized write
267 transaction.tell(new WriteData(TestModel.TEST_PATH,
268 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
271 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
276 public void testOnReceiveHeliumR1WriteData() throws Exception {
277 new JavaTestKit(getSystem()) {{
278 final ActorRef shard = createShard();
279 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
280 testSchemaContext, datastoreContext, shardStats, "txn",
281 DataStoreVersions.HELIUM_1_VERSION);
282 final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
284 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
285 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
287 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
288 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
290 transaction.tell(serialized, getRef());
292 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
294 assertModification(transaction, WriteModification.class);
299 public void testOnReceiveMergeData() throws Exception {
300 new JavaTestKit(getSystem()) {{
301 final ActorRef shard = createShard();
302 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
303 testSchemaContext, datastoreContext, shardStats, "txn",
304 DataStoreVersions.CURRENT_VERSION);
305 final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
307 transaction.tell(new MergeData(TestModel.TEST_PATH,
308 ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
309 DataStoreVersions.HELIUM_2_VERSION), getRef());
311 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
313 assertModification(transaction, MergeModification.class);
316 transaction.tell(new MergeData(TestModel.TEST_PATH,
317 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
320 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
325 public void testOnReceiveHeliumR1MergeData() throws Exception {
326 new JavaTestKit(getSystem()) {{
327 final ActorRef shard = createShard();
328 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
329 testSchemaContext, datastoreContext, shardStats, "txn",
330 DataStoreVersions.HELIUM_1_VERSION);
331 final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
333 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
334 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
335 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
336 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
337 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
339 transaction.tell(serialized, getRef());
341 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
343 assertModification(transaction, MergeModification.class);
348 public void testOnReceiveDeleteData() throws Exception {
349 new JavaTestKit(getSystem()) {{
350 final ActorRef shard = createShard();
351 final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
352 testSchemaContext, datastoreContext, shardStats, "txn",
353 DataStoreVersions.CURRENT_VERSION);
354 final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
356 transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
358 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
360 assertModification(transaction, DeleteModification.class);
363 transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
365 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
371 public void testOnReceiveReadyTransaction() throws Exception {
372 new JavaTestKit(getSystem()) {{
373 final ActorRef shard = createShard();
374 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
375 testSchemaContext, datastoreContext, shardStats, "txn",
376 DataStoreVersions.CURRENT_VERSION);
377 final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
381 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
383 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
385 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
390 new JavaTestKit(getSystem()) {{
391 final ActorRef shard = createShard();
392 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
393 testSchemaContext, datastoreContext, shardStats, "txn",
394 DataStoreVersions.CURRENT_VERSION);
395 final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
399 transaction.tell(new ReadyTransaction(), getRef());
401 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
403 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
409 @SuppressWarnings("unchecked")
411 public void testOnReceiveCloseTransaction() throws Exception {
412 new JavaTestKit(getSystem()) {{
413 final ActorRef shard = createShard();
414 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
415 testSchemaContext, datastoreContext, shardStats, "txn",
416 DataStoreVersions.CURRENT_VERSION);
417 final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
421 transaction.tell(new CloseTransaction().toSerializable(), getRef());
423 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
424 expectTerminated(duration("3 seconds"), transaction);
428 @Test(expected=UnknownMessageException.class)
429 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
430 final ActorRef shard = createShard();
431 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
432 testSchemaContext, datastoreContext, shardStats, "txn",
433 DataStoreVersions.CURRENT_VERSION);
434 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
436 transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
440 public void testShardTransactionInactivity() {
442 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
443 Duration.create(500, TimeUnit.MILLISECONDS)).build();
445 new JavaTestKit(getSystem()) {{
446 final ActorRef shard = createShard();
447 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
448 testSchemaContext, datastoreContext, shardStats, "txn",
449 DataStoreVersions.CURRENT_VERSION);
450 final ActorRef transaction =
451 getSystem().actorOf(props, "testShardTransactionInactivity");
455 expectMsgClass(duration("3 seconds"), Terminated.class);