Merge "Bug 2252: Terminate ShardWriteTransaction actor on ready"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionFailureTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.pattern.AskTimeoutException;
16 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import org.junit.BeforeClass;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
24 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
25 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
26 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
27 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
28 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.Duration;
35 import java.util.Collections;
36 import java.util.concurrent.TimeUnit;
37
38 /**
39  * Covers negative test cases
40  *
41  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
42  */
43 public class ShardTransactionFailureTest extends AbstractActorTest {
44     private static ListeningExecutorService storeExecutor =
45         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
46
47     private static final InMemoryDOMDataStore store =
48         new InMemoryDOMDataStore("OPER", storeExecutor,
49             MoreExecutors.sameThreadExecutor());
50
51     private static final SchemaContext testSchemaContext =
52         TestModel.createTestContext();
53
54     private static final ShardIdentifier SHARD_IDENTIFIER =
55         ShardIdentifier.builder().memberName("member-1")
56             .shardName("inventory").type("operational").build();
57
58     private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
59
60     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
61
62     @BeforeClass
63     public static void staticSetup() {
64         store.onGlobalContextUpdated(testSchemaContext);
65     }
66
67     private ActorRef createShard(){
68         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext,
69                 TestModel.createTestContext()));
70     }
71
72     @Test(expected = ReadFailedException.class)
73     public void testNegativeReadWithReadOnlyTransactionClosed()
74         throws Throwable {
75
76         final ActorRef shard = createShard();
77         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
78                 testSchemaContext, datastoreContext, shardStats, "txn");
79
80         final TestActorRef<ShardTransaction> subject = TestActorRef
81             .create(getSystem(), props,
82                 "testNegativeReadWithReadOnlyTransactionClosed");
83
84         ShardTransactionMessages.ReadData readData =
85             ShardTransactionMessages.ReadData.newBuilder()
86                 .setInstanceIdentifierPathArguments(
87                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
88                         .build()
89                 ).build();
90         Future<Object> future =
91             akka.pattern.Patterns.ask(subject, readData, 3000);
92         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
93
94         subject.underlyingActor().getDOMStoreTransaction().close();
95
96         future = akka.pattern.Patterns.ask(subject, readData, 3000);
97         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
98     }
99
100
101     @Test(expected = ReadFailedException.class)
102     public void testNegativeReadWithReadWriteTransactionClosed()
103         throws Throwable {
104
105         final ActorRef shard = createShard();
106         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
107                 testSchemaContext, datastoreContext, shardStats, "txn");
108
109         final TestActorRef<ShardTransaction> subject = TestActorRef
110             .create(getSystem(), props,
111                 "testNegativeReadWithReadWriteTransactionClosed");
112
113         ShardTransactionMessages.ReadData readData =
114             ShardTransactionMessages.ReadData.newBuilder()
115                 .setInstanceIdentifierPathArguments(
116                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
117                         .build()
118                 ).build();
119
120         Future<Object> future =
121             akka.pattern.Patterns.ask(subject, readData, 3000);
122         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
123
124         subject.underlyingActor().getDOMStoreTransaction().close();
125
126         future = akka.pattern.Patterns.ask(subject, readData, 3000);
127         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
128     }
129
130     @Test(expected = ReadFailedException.class)
131     public void testNegativeExistsWithReadWriteTransactionClosed()
132         throws Throwable {
133
134         final ActorRef shard = createShard();
135         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
136                 testSchemaContext, datastoreContext, shardStats, "txn");
137
138         final TestActorRef<ShardTransaction> subject = TestActorRef
139             .create(getSystem(), props,
140                 "testNegativeExistsWithReadWriteTransactionClosed");
141
142         ShardTransactionMessages.DataExists dataExists =
143             ShardTransactionMessages.DataExists.newBuilder()
144                 .setInstanceIdentifierPathArguments(
145                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
146                         .build()
147                 ).build();
148
149         Future<Object> future =
150             akka.pattern.Patterns.ask(subject, dataExists, 3000);
151         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
152
153         subject.underlyingActor().getDOMStoreTransaction().close();
154
155         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
156         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
157     }
158
159     @Test(expected = AskTimeoutException.class)
160     public void testNegativeWriteWithTransactionReady() throws Exception {
161
162
163         final ActorRef shard = createShard();
164         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
165                 testSchemaContext, datastoreContext, shardStats, "txn");
166
167         final TestActorRef<ShardTransaction> subject = TestActorRef
168             .create(getSystem(), props,
169                 "testNegativeWriteWithTransactionReady");
170
171         ShardTransactionMessages.ReadyTransaction readyTransaction =
172             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
173
174         Future<Object> future =
175             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
176         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
177
178         ShardTransactionMessages.WriteData writeData =
179             ShardTransactionMessages.WriteData.newBuilder()
180                 .setInstanceIdentifierPathArguments(
181                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
182                         .build()).setNormalizedNode(
183                 buildNormalizedNode()
184
185             ).build();
186
187         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
188         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
189     }
190
191     @Test(expected = AskTimeoutException.class)
192     public void testNegativeReadWriteWithTransactionReady() throws Exception {
193
194
195         final ActorRef shard = createShard();
196         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
197                 testSchemaContext, datastoreContext, shardStats, "txn");
198
199         final TestActorRef<ShardTransaction> subject = TestActorRef
200             .create(getSystem(), props,
201                 "testNegativeReadWriteWithTransactionReady");
202
203         ShardTransactionMessages.ReadyTransaction readyTransaction =
204             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
205
206         Future<Object> future =
207             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
208         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
209
210         ShardTransactionMessages.WriteData writeData =
211             ShardTransactionMessages.WriteData.newBuilder()
212                 .setInstanceIdentifierPathArguments(
213                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
214                         .build()
215                 )
216                 .setNormalizedNode(buildNormalizedNode())
217                 .build();
218
219         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
220         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
221     }
222
223     private NormalizedNodeMessages.Node buildNormalizedNode() {
224         return NormalizedNodeSerializer
225             .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
226     }
227
228     @Test(expected = AskTimeoutException.class)
229     public void testNegativeMergeTransactionReady() throws Exception {
230
231
232         final ActorRef shard = createShard();
233         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
234                 testSchemaContext, datastoreContext, shardStats, "txn");
235
236         final TestActorRef<ShardTransaction> subject = TestActorRef
237             .create(getSystem(), props, "testNegativeMergeTransactionReady");
238
239         ShardTransactionMessages.ReadyTransaction readyTransaction =
240             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
241
242         Future<Object> future =
243             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
244         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
245
246         ShardTransactionMessages.MergeData mergeData =
247             ShardTransactionMessages.MergeData.newBuilder()
248                 .setInstanceIdentifierPathArguments(
249                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
250                         .build()).setNormalizedNode(
251                 buildNormalizedNode()
252
253             ).build();
254
255         future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
256         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
257     }
258
259
260     @Test(expected = AskTimeoutException.class)
261     public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
262
263
264         final ActorRef shard = createShard();
265         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
266                 testSchemaContext, datastoreContext, shardStats, "txn");
267
268         final TestActorRef<ShardTransaction> subject = TestActorRef
269             .create(getSystem(), props,
270                 "testNegativeDeleteDataWhenTransactionReady");
271
272         ShardTransactionMessages.ReadyTransaction readyTransaction =
273             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
274
275         Future<Object> future =
276             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
277         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
278
279         ShardTransactionMessages.DeleteData deleteData =
280             ShardTransactionMessages.DeleteData.newBuilder()
281                 .setInstanceIdentifierPathArguments(
282                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
283                         .build()).build();
284
285         future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
286         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
287     }
288 }