1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.BeforeClass;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
18 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
27 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
36 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
37 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
38 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
39 import org.opendaylight.controller.cluster.datastore.modification.Modification;
40 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
43 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import scala.concurrent.duration.Duration;
49 public class ShardTransactionTest extends AbstractActorTest {
50 private static final InMemoryDOMDataStore store =
51 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
53 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
55 private static final ShardIdentifier SHARD_IDENTIFIER =
56 ShardIdentifier.builder().memberName("member-1")
57 .shardName("inventory").type("config").build();
59 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
61 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
64 public static void staticSetup() {
65 store.onGlobalContextUpdated(testSchemaContext);
68 private ActorRef createShard(){
69 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
70 Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
74 public void testOnReceiveReadData() throws Exception {
75 new JavaTestKit(getSystem()) {{
76 final ActorRef shard = createShard();
77 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
78 testSchemaContext, datastoreContext, shardStats, "txn",
79 CreateTransaction.CURRENT_VERSION);
81 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
83 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
84 testSchemaContext, datastoreContext, shardStats, "txn",
85 CreateTransaction.CURRENT_VERSION);
87 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
90 private void testOnReceiveReadData(final ActorRef transaction) {
92 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
95 ShardTransactionMessages.ReadDataReply replySerialized =
96 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
98 assertNotNull(ReadDataReply.fromSerializable(
99 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
100 .getNormalizedNode());
103 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
105 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
107 assertNotNull(reply.getNormalizedNode());
112 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
113 new JavaTestKit(getSystem()) {{
114 final ActorRef shard = createShard();
115 Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
116 testSchemaContext, datastoreContext, shardStats, "txn",
117 CreateTransaction.CURRENT_VERSION);
119 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
120 props, "testReadDataWhenDataNotFoundRO"));
122 props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
123 testSchemaContext, datastoreContext, shardStats, "txn",
124 CreateTransaction.CURRENT_VERSION);
126 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
127 props, "testReadDataWhenDataNotFoundRW"));
130 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
132 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
134 ShardTransactionMessages.ReadDataReply replySerialized =
135 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
137 assertTrue(ReadDataReply.fromSerializable(
138 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
141 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
143 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
145 assertTrue(reply.getNormalizedNode() == null);
150 public void testOnReceiveDataExistsPositive() throws Exception {
151 new JavaTestKit(getSystem()) {{
152 final ActorRef shard = createShard();
153 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
154 testSchemaContext, datastoreContext, shardStats, "txn",
155 CreateTransaction.CURRENT_VERSION);
157 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
159 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
160 testSchemaContext, datastoreContext, shardStats, "txn",
161 CreateTransaction.CURRENT_VERSION);
163 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
166 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
167 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
170 ShardTransactionMessages.DataExistsReply replySerialized =
171 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
173 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
176 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
178 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
180 assertTrue(reply.exists());
185 public void testOnReceiveDataExistsNegative() throws Exception {
186 new JavaTestKit(getSystem()) {{
187 final ActorRef shard = createShard();
188 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
189 testSchemaContext, datastoreContext, shardStats, "txn",
190 CreateTransaction.CURRENT_VERSION);
192 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
194 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
195 testSchemaContext, datastoreContext, shardStats, "txn",
196 CreateTransaction.CURRENT_VERSION);
198 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
201 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
202 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
204 ShardTransactionMessages.DataExistsReply replySerialized =
205 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
207 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
210 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
212 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
214 assertFalse(reply.exists());
218 private void assertModification(final ActorRef subject,
219 final Class<? extends Modification> modificationType) {
220 new JavaTestKit(getSystem()) {{
221 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
223 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
224 GetCompositeModificationReply.class).getModification();
226 assertTrue(compositeModification.getModifications().size() == 1);
227 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
232 public void testOnReceiveWriteData() throws Exception {
233 new JavaTestKit(getSystem()) {{
234 final ActorRef shard = createShard();
235 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
236 testSchemaContext, datastoreContext, shardStats, "txn",
237 CreateTransaction.CURRENT_VERSION);
238 final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
240 transaction.tell(new WriteData(TestModel.TEST_PATH,
241 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
244 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
246 assertModification(transaction, WriteModification.class);
249 transaction.tell(new WriteData(TestModel.TEST_PATH,
250 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
251 TestModel.createTestContext()),
254 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
259 public void testOnReceiveMergeData() throws Exception {
260 new JavaTestKit(getSystem()) {{
261 final ActorRef shard = createShard();
262 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
263 testSchemaContext, datastoreContext, shardStats, "txn",
264 CreateTransaction.CURRENT_VERSION);
265 final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
267 transaction.tell(new MergeData(TestModel.TEST_PATH,
268 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
271 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
273 assertModification(transaction, MergeModification.class);
276 transaction.tell(new MergeData(TestModel.TEST_PATH,
277 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
280 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
285 public void testOnReceiveDeleteData() throws Exception {
286 new JavaTestKit(getSystem()) {{
287 final ActorRef shard = createShard();
288 final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
289 testSchemaContext, datastoreContext, shardStats, "txn",
290 CreateTransaction.CURRENT_VERSION);
291 final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
293 transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
295 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
297 assertModification(transaction, DeleteModification.class);
300 transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
302 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
308 public void testOnReceiveReadyTransaction() throws Exception {
309 new JavaTestKit(getSystem()) {{
310 final ActorRef shard = createShard();
311 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
312 testSchemaContext, datastoreContext, shardStats, "txn",
313 CreateTransaction.CURRENT_VERSION);
314 final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
318 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
320 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
322 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
327 new JavaTestKit(getSystem()) {{
328 final ActorRef shard = createShard();
329 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
330 testSchemaContext, datastoreContext, shardStats, "txn",
331 CreateTransaction.CURRENT_VERSION);
332 final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
336 transaction.tell(new ReadyTransaction(), getRef());
338 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
340 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
347 public void testOnReceiveCloseTransaction() throws Exception {
348 new JavaTestKit(getSystem()) {{
349 final ActorRef shard = createShard();
350 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
351 testSchemaContext, datastoreContext, shardStats, "txn",
352 CreateTransaction.CURRENT_VERSION);
353 final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
357 transaction.tell(new CloseTransaction().toSerializable(), getRef());
359 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
360 expectTerminated(duration("3 seconds"), transaction);
364 @Test(expected=UnknownMessageException.class)
365 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
366 final ActorRef shard = createShard();
367 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
368 testSchemaContext, datastoreContext, shardStats, "txn",
369 CreateTransaction.CURRENT_VERSION);
370 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
372 transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
376 public void testShardTransactionInactivity() {
378 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
379 Duration.create(500, TimeUnit.MILLISECONDS)).build();
381 new JavaTestKit(getSystem()) {{
382 final ActorRef shard = createShard();
383 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
384 testSchemaContext, datastoreContext, shardStats, "txn",
385 CreateTransaction.CURRENT_VERSION);
386 final ActorRef transaction =
387 getSystem().actorOf(props, "testShardTransactionInactivity");
391 expectMsgClass(duration("3 seconds"), Terminated.class);