1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10 import org.junit.BeforeClass;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
13 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
14 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
15 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
30 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
32 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
37 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.duration.Duration;
42 import java.util.Collections;
43 import java.util.concurrent.TimeUnit;
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.assertFalse;
46 import static org.junit.Assert.assertNotNull;
47 import static org.junit.Assert.assertTrue;
49 public class ShardTransactionTest extends AbstractActorTest {
50 private static ListeningExecutorService storeExecutor =
51 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
53 private static final InMemoryDOMDataStore store =
54 new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
56 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
58 private static final ShardIdentifier SHARD_IDENTIFIER =
59 ShardIdentifier.builder().memberName("member-1")
60 .shardName("inventory").type("config").build();
62 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
64 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
67 public static void staticSetup() {
68 store.onGlobalContextUpdated(testSchemaContext);
71 private ActorRef createShard(){
72 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
73 Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
77 public void testOnReceiveReadData() throws Exception {
78 new JavaTestKit(getSystem()) {{
79 final ActorRef shard = createShard();
80 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
81 testSchemaContext, datastoreContext, shardStats, "txn");
83 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
85 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
86 testSchemaContext, datastoreContext, shardStats, "txn");
88 testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
91 private void testOnReceiveReadData(final ActorRef subject) {
93 subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
96 ShardTransactionMessages.ReadDataReply replySerialized =
97 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
99 assertNotNull(ReadDataReply.fromSerializable(
100 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
101 .getNormalizedNode());
104 subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
106 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
108 assertNotNull(reply.getNormalizedNode());
113 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
114 new JavaTestKit(getSystem()) {{
115 final ActorRef shard = createShard();
116 Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
117 testSchemaContext, datastoreContext, shardStats, "txn");
119 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
120 props, "testReadDataWhenDataNotFoundRO"));
122 props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
123 testSchemaContext, datastoreContext, shardStats, "txn");
125 testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
126 props, "testReadDataWhenDataNotFoundRW"));
129 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
131 subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
133 ShardTransactionMessages.ReadDataReply replySerialized =
134 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
136 assertTrue(ReadDataReply.fromSerializable(
137 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
140 subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
142 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
144 assertTrue(reply.getNormalizedNode() == null);
149 public void testOnReceiveDataExistsPositive() throws Exception {
150 new JavaTestKit(getSystem()) {{
151 final ActorRef shard = createShard();
152 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
153 testSchemaContext, datastoreContext, shardStats, "txn");
155 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
157 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
158 testSchemaContext, datastoreContext, shardStats, "txn");
160 testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
163 private void testOnReceiveDataExistsPositive(final ActorRef subject) {
164 subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
167 ShardTransactionMessages.DataExistsReply replySerialized =
168 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
170 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
173 subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
175 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
177 assertTrue(reply.exists());
182 public void testOnReceiveDataExistsNegative() throws Exception {
183 new JavaTestKit(getSystem()) {{
184 final ActorRef shard = createShard();
185 Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
186 testSchemaContext, datastoreContext, shardStats, "txn");
188 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
190 props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
191 testSchemaContext, datastoreContext, shardStats, "txn");
193 testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
196 private void testOnReceiveDataExistsNegative(final ActorRef subject) {
197 subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
199 ShardTransactionMessages.DataExistsReply replySerialized =
200 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
202 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
205 subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
207 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
209 assertFalse(reply.exists());
213 private void assertModification(final ActorRef subject,
214 final Class<? extends Modification> modificationType) {
215 new JavaTestKit(getSystem()) {{
216 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
218 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
219 GetCompositeModificationReply.class).getModification();
221 assertTrue(compositeModification.getModifications().size() == 1);
222 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
227 public void testOnReceiveWriteData() throws Exception {
228 new JavaTestKit(getSystem()) {{
229 final ActorRef shard = createShard();
230 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
231 testSchemaContext, datastoreContext, shardStats, "txn");
232 final ActorRef subject =
233 getSystem().actorOf(props, "testWriteData");
235 subject.tell(new WriteData(TestModel.TEST_PATH,
236 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
239 ShardTransactionMessages.WriteDataReply replySerialized =
240 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
242 assertModification(subject, WriteModification.class);
245 subject.tell(new WriteData(TestModel.TEST_PATH,
246 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
247 TestModel.createTestContext()),
250 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
255 public void testOnReceiveMergeData() throws Exception {
256 new JavaTestKit(getSystem()) {{
257 final ActorRef shard = createShard();
258 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
259 testSchemaContext, datastoreContext, shardStats, "txn");
260 final ActorRef subject =
261 getSystem().actorOf(props, "testMergeData");
263 subject.tell(new MergeData(TestModel.TEST_PATH,
264 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
267 ShardTransactionMessages.MergeDataReply replySerialized =
268 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
270 assertModification(subject, MergeModification.class);
273 subject.tell(new MergeData(TestModel.TEST_PATH,
274 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
277 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
282 public void testOnReceiveDeleteData() throws Exception {
283 new JavaTestKit(getSystem()) {{
284 final ActorRef shard = createShard();
285 final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
286 testSchemaContext, datastoreContext, shardStats, "txn");
287 final ActorRef subject =
288 getSystem().actorOf(props, "testDeleteData");
290 subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
292 ShardTransactionMessages.DeleteDataReply replySerialized =
293 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
295 assertModification(subject, DeleteModification.class);
298 subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
300 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
306 public void testOnReceiveReadyTransaction() throws Exception {
307 new JavaTestKit(getSystem()) {{
308 final ActorRef shard = createShard();
309 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
310 testSchemaContext, datastoreContext, shardStats, "txn");
311 final ActorRef subject =
312 getSystem().actorOf(props, "testReadyTransaction");
314 subject.tell(new ReadyTransaction().toSerializable(), getRef());
316 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
320 new JavaTestKit(getSystem()) {{
321 final ActorRef shard = createShard();
322 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
323 testSchemaContext, datastoreContext, shardStats, "txn");
324 final ActorRef subject =
325 getSystem().actorOf(props, "testReadyTransaction2");
327 subject.tell(new ReadyTransaction(), getRef());
329 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
334 @SuppressWarnings("unchecked")
336 public void testOnReceiveCloseTransaction() throws Exception {
337 new JavaTestKit(getSystem()) {{
338 final ActorRef shard = createShard();
339 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
340 testSchemaContext, datastoreContext, shardStats, "txn");
341 final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
345 subject.tell(new CloseTransaction().toSerializable(), getRef());
347 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
348 expectMsgClass(duration("3 seconds"), Terminated.class);
352 @Test(expected=UnknownMessageException.class)
353 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
354 final ActorRef shard = createShard();
355 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
356 testSchemaContext, datastoreContext, shardStats, "txn");
357 final TestActorRef subject = TestActorRef.apply(props,getSystem());
359 subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
363 public void testShardTransactionInactivity() {
365 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
366 Duration.create(500, TimeUnit.MILLISECONDS)).build();
368 new JavaTestKit(getSystem()) {{
369 final ActorRef shard = createShard();
370 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
371 testSchemaContext, datastoreContext, shardStats, "txn");
372 final ActorRef subject =
373 getSystem().actorOf(props, "testShardTransactionInactivity");
377 // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
379 final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
380 // do not put code outside this method, will run afterwards
382 protected String match(Object in) {
383 if (in instanceof Terminated) {
389 }.get(); // this extracts the received message
391 assertEquals("match", termination);