Handle leader change on ForwardedReadyTransaction in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionFailureTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.pattern.AskTimeoutException;
14 import akka.testkit.TestActorRef;
15 import java.util.concurrent.TimeUnit;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
18 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
19 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
20 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
21 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
22 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
23 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
26 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import scala.concurrent.Await;
29 import scala.concurrent.Future;
30 import scala.concurrent.duration.Duration;
31
32 /**
33  * Covers negative test cases
34  *
35  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
36  */
37 public class ShardTransactionFailureTest extends AbstractActorTest {
38     private static final SchemaContext testSchemaContext =
39             TestModel.createTestContext();
40     private static final TransactionType RO = TransactionType.READ_ONLY;
41     private static final TransactionType RW = TransactionType.READ_WRITE;
42     private static final TransactionType WO = TransactionType.WRITE_ONLY;
43
44     private static final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
45
46     private static final ShardIdentifier SHARD_IDENTIFIER =
47         ShardIdentifier.builder().memberName("member-1")
48             .shardName("inventory").type("operational").build();
49
50     private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
51
52     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
53
54     private ActorRef createShard(){
55         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
56                 schemaContext(TestModel.createTestContext()).props());
57         ShardTestKit.waitUntilLeader(shard);
58         return shard;
59     }
60
61     @Test(expected = ReadFailedException.class)
62     public void testNegativeReadWithReadOnlyTransactionClosed()
63         throws Throwable {
64
65         final ActorRef shard = createShard();
66         final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction("test-txn", null), shard,
67                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
68
69         final TestActorRef<ShardTransaction> subject = TestActorRef
70             .create(getSystem(), props,
71                 "testNegativeReadWithReadOnlyTransactionClosed");
72
73         ShardTransactionMessages.ReadData readData =
74             ShardTransactionMessages.ReadData.newBuilder()
75                 .setInstanceIdentifierPathArguments(
76                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
77                         .build()
78                 ).build();
79         Future<Object> future =
80             akka.pattern.Patterns.ask(subject, readData, 3000);
81         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
82
83         subject.underlyingActor().getDOMStoreTransaction().abort();
84
85         future = akka.pattern.Patterns.ask(subject, readData, 3000);
86         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
87     }
88
89
90     @Test(expected = ReadFailedException.class)
91     public void testNegativeReadWithReadWriteTransactionClosed()
92         throws Throwable {
93
94         final ActorRef shard = createShard();
95         final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
96                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
97
98         final TestActorRef<ShardTransaction> subject = TestActorRef
99             .create(getSystem(), props,
100                 "testNegativeReadWithReadWriteTransactionClosed");
101
102         ShardTransactionMessages.ReadData readData =
103             ShardTransactionMessages.ReadData.newBuilder()
104                 .setInstanceIdentifierPathArguments(
105                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
106                         .build()
107                 ).build();
108
109         Future<Object> future =
110             akka.pattern.Patterns.ask(subject, readData, 3000);
111         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
112
113         subject.underlyingActor().getDOMStoreTransaction().abort();
114
115         future = akka.pattern.Patterns.ask(subject, readData, 3000);
116         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
117     }
118
119     @Test(expected = ReadFailedException.class)
120     public void testNegativeExistsWithReadWriteTransactionClosed()
121         throws Throwable {
122
123         final ActorRef shard = createShard();
124         final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
125                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
126
127         final TestActorRef<ShardTransaction> subject = TestActorRef
128             .create(getSystem(), props,
129                 "testNegativeExistsWithReadWriteTransactionClosed");
130
131         ShardTransactionMessages.DataExists dataExists =
132             ShardTransactionMessages.DataExists.newBuilder()
133                 .setInstanceIdentifierPathArguments(
134                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
135                         .build()
136                 ).build();
137
138         Future<Object> future =
139             akka.pattern.Patterns.ask(subject, dataExists, 3000);
140         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
141
142         subject.underlyingActor().getDOMStoreTransaction().abort();
143
144         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
145         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
146     }
147
148     @Test(expected = AskTimeoutException.class)
149     public void testNegativeWriteWithTransactionReady() throws Exception {
150
151
152         final ActorRef shard = createShard();
153         final Props props = ShardTransaction.props(WO, store.newReadWriteTransaction("test-txn", null), shard,
154                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
155
156         final TestActorRef<ShardTransaction> subject = TestActorRef
157             .create(getSystem(), props,
158                 "testNegativeWriteWithTransactionReady");
159
160         ShardTransactionMessages.ReadyTransaction readyTransaction =
161             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
162
163         Future<Object> future =
164             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
165         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
166
167         ShardTransactionMessages.WriteData writeData =
168             ShardTransactionMessages.WriteData.newBuilder()
169                 .setInstanceIdentifierPathArguments(
170                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
171                         .build()).setNormalizedNode(
172                 buildNormalizedNode()
173
174             ).build();
175
176         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
177         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
178     }
179
180     @Test(expected = AskTimeoutException.class)
181     public void testNegativeReadWriteWithTransactionReady() throws Exception {
182
183
184         final ActorRef shard = createShard();
185         final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
186                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
187
188         final TestActorRef<ShardTransaction> subject = TestActorRef
189             .create(getSystem(), props,
190                 "testNegativeReadWriteWithTransactionReady");
191
192         ShardTransactionMessages.ReadyTransaction readyTransaction =
193             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
194
195         Future<Object> future =
196             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
197         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
198
199         ShardTransactionMessages.WriteData writeData =
200             ShardTransactionMessages.WriteData.newBuilder()
201                 .setInstanceIdentifierPathArguments(
202                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
203                         .build()
204                 )
205                 .setNormalizedNode(buildNormalizedNode())
206                 .build();
207
208         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
209         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
210     }
211
212     private static NormalizedNodeMessages.Node buildNormalizedNode() {
213         return NormalizedNodeSerializer
214             .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
215     }
216
217     @Test(expected = AskTimeoutException.class)
218     public void testNegativeMergeTransactionReady() throws Exception {
219
220
221         final ActorRef shard = createShard();
222         final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
223                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
224
225         final TestActorRef<ShardTransaction> subject = TestActorRef
226             .create(getSystem(), props, "testNegativeMergeTransactionReady");
227
228         ShardTransactionMessages.ReadyTransaction readyTransaction =
229             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
230
231         Future<Object> future =
232             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
233         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
234
235         ShardTransactionMessages.MergeData mergeData =
236             ShardTransactionMessages.MergeData.newBuilder()
237                 .setInstanceIdentifierPathArguments(
238                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
239                         .build()).setNormalizedNode(
240                 buildNormalizedNode()
241
242             ).build();
243
244         future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
245         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
246     }
247
248
249     @Test(expected = AskTimeoutException.class)
250     public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
251
252
253         final ActorRef shard = createShard();
254         final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
255                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
256
257         final TestActorRef<ShardTransaction> subject = TestActorRef
258             .create(getSystem(), props,
259                 "testNegativeDeleteDataWhenTransactionReady");
260
261         ShardTransactionMessages.ReadyTransaction readyTransaction =
262             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
263
264         Future<Object> future =
265             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
266         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
267
268         ShardTransactionMessages.DeleteData deleteData =
269             ShardTransactionMessages.DeleteData.newBuilder()
270                 .setInstanceIdentifierPathArguments(
271                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
272                         .build()).build();
273
274         future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
275         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
276     }
277 }