Merge "Bug 1029: Remove dead code: p2site"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionFailureTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.pattern.AskTimeoutException;
16 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.Collections;
20 import java.util.concurrent.TimeUnit;
21 import org.junit.BeforeClass;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
27 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
30 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37 import scala.concurrent.duration.Duration;
38
39 /**
40  * Covers negative test cases
41  *
42  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
43  */
44 public class ShardTransactionFailureTest extends AbstractActorTest {
45     private static ListeningExecutorService storeExecutor =
46         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
47
48     private static final InMemoryDOMDataStore store =
49         new InMemoryDOMDataStore("OPER", storeExecutor,
50             MoreExecutors.sameThreadExecutor());
51
52     private static final SchemaContext testSchemaContext =
53         TestModel.createTestContext();
54
55     private static final ShardIdentifier SHARD_IDENTIFIER =
56         ShardIdentifier.builder().memberName("member-1")
57             .shardName("inventory").type("operational").build();
58
59     private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
60
61     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
62
63     @BeforeClass
64     public static void staticSetup() {
65         store.onGlobalContextUpdated(testSchemaContext);
66     }
67
68     private ActorRef createShard(){
69         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
70                 TestModel.createTestContext()));
71     }
72
73     @Test(expected = ReadFailedException.class)
74     public void testNegativeReadWithReadOnlyTransactionClosed()
75         throws Throwable {
76
77         final ActorRef shard = createShard();
78         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
79                 testSchemaContext, datastoreContext, shardStats, "txn",
80                 CreateTransaction.CURRENT_VERSION);
81
82         final TestActorRef<ShardTransaction> subject = TestActorRef
83             .create(getSystem(), props,
84                 "testNegativeReadWithReadOnlyTransactionClosed");
85
86         ShardTransactionMessages.ReadData readData =
87             ShardTransactionMessages.ReadData.newBuilder()
88                 .setInstanceIdentifierPathArguments(
89                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
90                         .build()
91                 ).build();
92         Future<Object> future =
93             akka.pattern.Patterns.ask(subject, readData, 3000);
94         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
95
96         subject.underlyingActor().getDOMStoreTransaction().close();
97
98         future = akka.pattern.Patterns.ask(subject, readData, 3000);
99         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
100     }
101
102
103     @Test(expected = ReadFailedException.class)
104     public void testNegativeReadWithReadWriteTransactionClosed()
105         throws Throwable {
106
107         final ActorRef shard = createShard();
108         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
109                 testSchemaContext, datastoreContext, shardStats, "txn",
110                 CreateTransaction.CURRENT_VERSION);
111
112         final TestActorRef<ShardTransaction> subject = TestActorRef
113             .create(getSystem(), props,
114                 "testNegativeReadWithReadWriteTransactionClosed");
115
116         ShardTransactionMessages.ReadData readData =
117             ShardTransactionMessages.ReadData.newBuilder()
118                 .setInstanceIdentifierPathArguments(
119                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
120                         .build()
121                 ).build();
122
123         Future<Object> future =
124             akka.pattern.Patterns.ask(subject, readData, 3000);
125         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
126
127         subject.underlyingActor().getDOMStoreTransaction().close();
128
129         future = akka.pattern.Patterns.ask(subject, readData, 3000);
130         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
131     }
132
133     @Test(expected = ReadFailedException.class)
134     public void testNegativeExistsWithReadWriteTransactionClosed()
135         throws Throwable {
136
137         final ActorRef shard = createShard();
138         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
139                 testSchemaContext, datastoreContext, shardStats, "txn",
140                 CreateTransaction.CURRENT_VERSION);
141
142         final TestActorRef<ShardTransaction> subject = TestActorRef
143             .create(getSystem(), props,
144                 "testNegativeExistsWithReadWriteTransactionClosed");
145
146         ShardTransactionMessages.DataExists dataExists =
147             ShardTransactionMessages.DataExists.newBuilder()
148                 .setInstanceIdentifierPathArguments(
149                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
150                         .build()
151                 ).build();
152
153         Future<Object> future =
154             akka.pattern.Patterns.ask(subject, dataExists, 3000);
155         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
156
157         subject.underlyingActor().getDOMStoreTransaction().close();
158
159         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
160         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
161     }
162
163     @Test(expected = AskTimeoutException.class)
164     public void testNegativeWriteWithTransactionReady() throws Exception {
165
166
167         final ActorRef shard = createShard();
168         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
169                 testSchemaContext, datastoreContext, shardStats, "txn",
170                 CreateTransaction.CURRENT_VERSION);
171
172         final TestActorRef<ShardTransaction> subject = TestActorRef
173             .create(getSystem(), props,
174                 "testNegativeWriteWithTransactionReady");
175
176         ShardTransactionMessages.ReadyTransaction readyTransaction =
177             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
178
179         Future<Object> future =
180             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
181         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
182
183         ShardTransactionMessages.WriteData writeData =
184             ShardTransactionMessages.WriteData.newBuilder()
185                 .setInstanceIdentifierPathArguments(
186                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
187                         .build()).setNormalizedNode(
188                 buildNormalizedNode()
189
190             ).build();
191
192         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
193         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
194     }
195
196     @Test(expected = AskTimeoutException.class)
197     public void testNegativeReadWriteWithTransactionReady() throws Exception {
198
199
200         final ActorRef shard = createShard();
201         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
202                 testSchemaContext, datastoreContext, shardStats, "txn",
203                 CreateTransaction.CURRENT_VERSION);
204
205         final TestActorRef<ShardTransaction> subject = TestActorRef
206             .create(getSystem(), props,
207                 "testNegativeReadWriteWithTransactionReady");
208
209         ShardTransactionMessages.ReadyTransaction readyTransaction =
210             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
211
212         Future<Object> future =
213             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
214         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
215
216         ShardTransactionMessages.WriteData writeData =
217             ShardTransactionMessages.WriteData.newBuilder()
218                 .setInstanceIdentifierPathArguments(
219                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
220                         .build()
221                 )
222                 .setNormalizedNode(buildNormalizedNode())
223                 .build();
224
225         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
226         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
227     }
228
229     private NormalizedNodeMessages.Node buildNormalizedNode() {
230         return NormalizedNodeSerializer
231             .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
232     }
233
234     @Test(expected = AskTimeoutException.class)
235     public void testNegativeMergeTransactionReady() throws Exception {
236
237
238         final ActorRef shard = createShard();
239         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
240                 testSchemaContext, datastoreContext, shardStats, "txn",
241                 CreateTransaction.CURRENT_VERSION);
242
243         final TestActorRef<ShardTransaction> subject = TestActorRef
244             .create(getSystem(), props, "testNegativeMergeTransactionReady");
245
246         ShardTransactionMessages.ReadyTransaction readyTransaction =
247             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
248
249         Future<Object> future =
250             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
251         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
252
253         ShardTransactionMessages.MergeData mergeData =
254             ShardTransactionMessages.MergeData.newBuilder()
255                 .setInstanceIdentifierPathArguments(
256                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
257                         .build()).setNormalizedNode(
258                 buildNormalizedNode()
259
260             ).build();
261
262         future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
263         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
264     }
265
266
267     @Test(expected = AskTimeoutException.class)
268     public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
269
270
271         final ActorRef shard = createShard();
272         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
273                 testSchemaContext, datastoreContext, shardStats, "txn",
274                 CreateTransaction.CURRENT_VERSION);
275
276         final TestActorRef<ShardTransaction> subject = TestActorRef
277             .create(getSystem(), props,
278                 "testNegativeDeleteDataWhenTransactionReady");
279
280         ShardTransactionMessages.ReadyTransaction readyTransaction =
281             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
282
283         Future<Object> future =
284             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
285         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
286
287         ShardTransactionMessages.DeleteData deleteData =
288             ShardTransactionMessages.DeleteData.newBuilder()
289                 .setInstanceIdentifierPathArguments(
290                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
291                         .build()).build();
292
293         future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
294         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
295     }
296 }