1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import java.util.Collections;
8 import java.util.concurrent.TimeUnit;
9 import org.junit.BeforeClass;
10 import org.junit.Test;
11 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
12 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
13 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
14 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
15 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
17 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
30 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
32 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
37 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.duration.Duration;
42 import akka.actor.ActorRef;
43 import akka.actor.Props;
44 import akka.actor.Terminated;
45 import akka.testkit.JavaTestKit;
46 import akka.testkit.TestActorRef;
47 import com.google.common.util.concurrent.ListeningExecutorService;
48 import com.google.common.util.concurrent.MoreExecutors;
50 public class ShardTransactionTest extends AbstractActorTest {
51 private static ListeningExecutorService storeExecutor =
52 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
54 private static final InMemoryDOMDataStore store =
55 new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
57 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
59 private static final ShardIdentifier SHARD_IDENTIFIER =
60 ShardIdentifier.builder().memberName("member-1")
61 .shardName("inventory").type("config").build();
63 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
65 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
68 public static void staticSetup() {
69 store.onGlobalContextUpdated(testSchemaContext);
72 private ActorRef createShard(){
73 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
74 Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
78 public void testOnReceiveReadData() throws Exception {
79 new JavaTestKit(getSystem()) {{
80 final ActorRef shard = createShard();
81 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
82 testSchemaContext, datastoreContext, shardStats, "txn",
83 CreateTransaction.CURRENT_CLIENT_VERSION);
85 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
87 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
88 testSchemaContext, datastoreContext, shardStats, "txn",
89 CreateTransaction.CURRENT_CLIENT_VERSION);
91 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
94 private void testOnReceiveReadData(final ActorRef transaction) {
96 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
99 ShardTransactionMessages.ReadDataReply replySerialized =
100 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
102 assertNotNull(ReadDataReply.fromSerializable(
103 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
104 .getNormalizedNode());
107 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
109 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
111 assertNotNull(reply.getNormalizedNode());
116 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
117 new JavaTestKit(getSystem()) {{
118 final ActorRef shard = createShard();
119 Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
120 testSchemaContext, datastoreContext, shardStats, "txn",
121 CreateTransaction.CURRENT_CLIENT_VERSION);
123 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
124 props, "testReadDataWhenDataNotFoundRO"));
126 props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
127 testSchemaContext, datastoreContext, shardStats, "txn",
128 CreateTransaction.CURRENT_CLIENT_VERSION);
130 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
131 props, "testReadDataWhenDataNotFoundRW"));
134 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
136 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
138 ShardTransactionMessages.ReadDataReply replySerialized =
139 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
141 assertTrue(ReadDataReply.fromSerializable(
142 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
145 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
147 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
149 assertTrue(reply.getNormalizedNode() == null);
154 public void testOnReceiveDataExistsPositive() throws Exception {
155 new JavaTestKit(getSystem()) {{
156 final ActorRef shard = createShard();
157 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
158 testSchemaContext, datastoreContext, shardStats, "txn",
159 CreateTransaction.CURRENT_CLIENT_VERSION);
161 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
163 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
164 testSchemaContext, datastoreContext, shardStats, "txn",
165 CreateTransaction.CURRENT_CLIENT_VERSION);
167 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
170 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
171 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
174 ShardTransactionMessages.DataExistsReply replySerialized =
175 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
177 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
180 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
182 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
184 assertTrue(reply.exists());
189 public void testOnReceiveDataExistsNegative() throws Exception {
190 new JavaTestKit(getSystem()) {{
191 final ActorRef shard = createShard();
192 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
193 testSchemaContext, datastoreContext, shardStats, "txn",
194 CreateTransaction.CURRENT_CLIENT_VERSION);
196 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
198 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
199 testSchemaContext, datastoreContext, shardStats, "txn",
200 CreateTransaction.CURRENT_CLIENT_VERSION);
202 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
205 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
206 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
208 ShardTransactionMessages.DataExistsReply replySerialized =
209 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
211 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
214 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
216 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
218 assertFalse(reply.exists());
222 private void assertModification(final ActorRef subject,
223 final Class<? extends Modification> modificationType) {
224 new JavaTestKit(getSystem()) {{
225 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
227 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
228 GetCompositeModificationReply.class).getModification();
230 assertTrue(compositeModification.getModifications().size() == 1);
231 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
236 public void testOnReceiveWriteData() throws Exception {
237 new JavaTestKit(getSystem()) {{
238 final ActorRef shard = createShard();
239 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
240 testSchemaContext, datastoreContext, shardStats, "txn",
241 CreateTransaction.CURRENT_CLIENT_VERSION);
242 final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
244 transaction.tell(new WriteData(TestModel.TEST_PATH,
245 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
248 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
250 assertModification(transaction, WriteModification.class);
253 transaction.tell(new WriteData(TestModel.TEST_PATH,
254 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
255 TestModel.createTestContext()),
258 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
263 public void testOnReceiveMergeData() throws Exception {
264 new JavaTestKit(getSystem()) {{
265 final ActorRef shard = createShard();
266 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
267 testSchemaContext, datastoreContext, shardStats, "txn",
268 CreateTransaction.CURRENT_CLIENT_VERSION);
269 final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
271 transaction.tell(new MergeData(TestModel.TEST_PATH,
272 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
275 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
277 assertModification(transaction, MergeModification.class);
280 transaction.tell(new MergeData(TestModel.TEST_PATH,
281 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
284 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
289 public void testOnReceiveDeleteData() throws Exception {
290 new JavaTestKit(getSystem()) {{
291 final ActorRef shard = createShard();
292 final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
293 testSchemaContext, datastoreContext, shardStats, "txn",
294 CreateTransaction.CURRENT_CLIENT_VERSION);
295 final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
297 transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
299 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
301 assertModification(transaction, DeleteModification.class);
304 transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
306 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
312 public void testOnReceiveReadyTransaction() throws Exception {
313 new JavaTestKit(getSystem()) {{
314 final ActorRef shard = createShard();
315 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
316 testSchemaContext, datastoreContext, shardStats, "txn",
317 CreateTransaction.CURRENT_CLIENT_VERSION);
318 final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
322 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
324 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
326 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
331 new JavaTestKit(getSystem()) {{
332 final ActorRef shard = createShard();
333 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
334 testSchemaContext, datastoreContext, shardStats, "txn",
335 CreateTransaction.CURRENT_CLIENT_VERSION);
336 final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
340 transaction.tell(new ReadyTransaction(), getRef());
342 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
344 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
350 @SuppressWarnings("unchecked")
352 public void testOnReceiveCloseTransaction() throws Exception {
353 new JavaTestKit(getSystem()) {{
354 final ActorRef shard = createShard();
355 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
356 testSchemaContext, datastoreContext, shardStats, "txn",
357 CreateTransaction.CURRENT_CLIENT_VERSION);
358 final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
362 transaction.tell(new CloseTransaction().toSerializable(), getRef());
364 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
365 expectTerminated(duration("3 seconds"), transaction);
369 @Test(expected=UnknownMessageException.class)
370 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
371 final ActorRef shard = createShard();
372 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
373 testSchemaContext, datastoreContext, shardStats, "txn",
374 CreateTransaction.CURRENT_CLIENT_VERSION);
375 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
377 transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
381 public void testShardTransactionInactivity() {
383 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
384 Duration.create(500, TimeUnit.MILLISECONDS)).build();
386 new JavaTestKit(getSystem()) {{
387 final ActorRef shard = createShard();
388 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
389 testSchemaContext, datastoreContext, shardStats, "txn",
390 CreateTransaction.CURRENT_CLIENT_VERSION);
391 final ActorRef transaction =
392 getSystem().actorOf(props, "testShardTransactionInactivity");
396 expectMsgClass(duration("3 seconds"), Terminated.class);