3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.pattern.AskTimeoutException;
16 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import org.junit.BeforeClass;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
24 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
25 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
26 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
27 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
28 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.Duration;
35 import java.util.Collections;
36 import java.util.concurrent.TimeUnit;
39 * Covers negative test cases
41 * @author Basheeruddin Ahmed <syedbahm@cisco.com>
43 public class ShardTransactionFailureTest extends AbstractActorTest {
44 private static ListeningExecutorService storeExecutor =
45 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
47 private static final InMemoryDOMDataStore store =
48 new InMemoryDOMDataStore("OPER", storeExecutor,
49 MoreExecutors.sameThreadExecutor());
51 private static final SchemaContext testSchemaContext =
52 TestModel.createTestContext();
54 private static final ShardIdentifier SHARD_IDENTIFIER =
55 ShardIdentifier.builder().memberName("member-1")
56 .shardName("inventory").type("operational").build();
58 private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
60 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
63 public static void staticSetup() {
64 store.onGlobalContextUpdated(testSchemaContext);
67 private ActorRef createShard(){
68 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext,
69 TestModel.createTestContext()));
72 @Test(expected = ReadFailedException.class)
73 public void testNegativeReadWithReadOnlyTransactionClosed()
76 final ActorRef shard = createShard();
77 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
78 testSchemaContext, datastoreContext, shardStats, "txn");
80 final TestActorRef<ShardTransaction> subject = TestActorRef
81 .create(getSystem(), props,
82 "testNegativeReadWithReadOnlyTransactionClosed");
84 ShardTransactionMessages.ReadData readData =
85 ShardTransactionMessages.ReadData.newBuilder()
86 .setInstanceIdentifierPathArguments(
87 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
90 Future<Object> future =
91 akka.pattern.Patterns.ask(subject, readData, 3000);
92 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
94 subject.underlyingActor().getDOMStoreTransaction().close();
96 future = akka.pattern.Patterns.ask(subject, readData, 3000);
97 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
101 @Test(expected = ReadFailedException.class)
102 public void testNegativeReadWithReadWriteTransactionClosed()
105 final ActorRef shard = createShard();
106 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
107 testSchemaContext, datastoreContext, shardStats, "txn");
109 final TestActorRef<ShardTransaction> subject = TestActorRef
110 .create(getSystem(), props,
111 "testNegativeReadWithReadWriteTransactionClosed");
113 ShardTransactionMessages.ReadData readData =
114 ShardTransactionMessages.ReadData.newBuilder()
115 .setInstanceIdentifierPathArguments(
116 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
120 Future<Object> future =
121 akka.pattern.Patterns.ask(subject, readData, 3000);
122 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
124 subject.underlyingActor().getDOMStoreTransaction().close();
126 future = akka.pattern.Patterns.ask(subject, readData, 3000);
127 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
130 @Test(expected = ReadFailedException.class)
131 public void testNegativeExistsWithReadWriteTransactionClosed()
134 final ActorRef shard = createShard();
135 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
136 testSchemaContext, datastoreContext, shardStats, "txn");
138 final TestActorRef<ShardTransaction> subject = TestActorRef
139 .create(getSystem(), props,
140 "testNegativeExistsWithReadWriteTransactionClosed");
142 ShardTransactionMessages.DataExists dataExists =
143 ShardTransactionMessages.DataExists.newBuilder()
144 .setInstanceIdentifierPathArguments(
145 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
149 Future<Object> future =
150 akka.pattern.Patterns.ask(subject, dataExists, 3000);
151 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
153 subject.underlyingActor().getDOMStoreTransaction().close();
155 future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
156 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
159 @Test(expected = AskTimeoutException.class)
160 public void testNegativeWriteWithTransactionReady() throws Exception {
163 final ActorRef shard = createShard();
164 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
165 testSchemaContext, datastoreContext, shardStats, "txn");
167 final TestActorRef<ShardTransaction> subject = TestActorRef
168 .create(getSystem(), props,
169 "testNegativeWriteWithTransactionReady");
171 ShardTransactionMessages.ReadyTransaction readyTransaction =
172 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
174 Future<Object> future =
175 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
176 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
178 ShardTransactionMessages.WriteData writeData =
179 ShardTransactionMessages.WriteData.newBuilder()
180 .setInstanceIdentifierPathArguments(
181 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
182 .build()).setNormalizedNode(
183 buildNormalizedNode()
187 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
188 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
191 @Test(expected = AskTimeoutException.class)
192 public void testNegativeReadWriteWithTransactionReady() throws Exception {
195 final ActorRef shard = createShard();
196 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
197 testSchemaContext, datastoreContext, shardStats, "txn");
199 final TestActorRef<ShardTransaction> subject = TestActorRef
200 .create(getSystem(), props,
201 "testNegativeReadWriteWithTransactionReady");
203 ShardTransactionMessages.ReadyTransaction readyTransaction =
204 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
206 Future<Object> future =
207 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
208 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
210 ShardTransactionMessages.WriteData writeData =
211 ShardTransactionMessages.WriteData.newBuilder()
212 .setInstanceIdentifierPathArguments(
213 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
216 .setNormalizedNode(buildNormalizedNode())
219 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
220 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
223 private NormalizedNodeMessages.Node buildNormalizedNode() {
224 return NormalizedNodeSerializer
225 .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
228 @Test(expected = AskTimeoutException.class)
229 public void testNegativeMergeTransactionReady() throws Exception {
232 final ActorRef shard = createShard();
233 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
234 testSchemaContext, datastoreContext, shardStats, "txn");
236 final TestActorRef<ShardTransaction> subject = TestActorRef
237 .create(getSystem(), props, "testNegativeMergeTransactionReady");
239 ShardTransactionMessages.ReadyTransaction readyTransaction =
240 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
242 Future<Object> future =
243 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
244 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
246 ShardTransactionMessages.MergeData mergeData =
247 ShardTransactionMessages.MergeData.newBuilder()
248 .setInstanceIdentifierPathArguments(
249 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
250 .build()).setNormalizedNode(
251 buildNormalizedNode()
255 future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
256 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
260 @Test(expected = AskTimeoutException.class)
261 public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
264 final ActorRef shard = createShard();
265 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
266 testSchemaContext, datastoreContext, shardStats, "txn");
268 final TestActorRef<ShardTransaction> subject = TestActorRef
269 .create(getSystem(), props,
270 "testNegativeDeleteDataWhenTransactionReady");
272 ShardTransactionMessages.ReadyTransaction readyTransaction =
273 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
275 Future<Object> future =
276 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
277 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
279 ShardTransactionMessages.DeleteData deleteData =
280 ShardTransactionMessages.DeleteData.newBuilder()
281 .setInstanceIdentifierPathArguments(
282 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
285 future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
286 Await.result(future, Duration.create(3, TimeUnit.SECONDS));