3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.pattern.AskTimeoutException;
16 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collections;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.BeforeClass;
21 import org.junit.Test;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
26 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
29 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
30 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.duration.Duration;
39 * Covers negative test cases
41 * @author Basheeruddin Ahmed <syedbahm@cisco.com>
43 public class ShardTransactionFailureTest extends AbstractActorTest {
44 private static final InMemoryDOMDataStore store =
45 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
47 private static final SchemaContext testSchemaContext =
48 TestModel.createTestContext();
50 private static final ShardIdentifier SHARD_IDENTIFIER =
51 ShardIdentifier.builder().memberName("member-1")
52 .shardName("inventory").type("operational").build();
54 private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
56 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
59 public static void staticSetup() {
60 store.onGlobalContextUpdated(testSchemaContext);
63 private ActorRef createShard(){
64 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
65 TestModel.createTestContext()));
68 @Test(expected = ReadFailedException.class)
69 public void testNegativeReadWithReadOnlyTransactionClosed()
72 final ActorRef shard = createShard();
73 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
74 testSchemaContext, datastoreContext, shardStats, "txn",
75 CreateTransaction.CURRENT_VERSION);
77 final TestActorRef<ShardTransaction> subject = TestActorRef
78 .create(getSystem(), props,
79 "testNegativeReadWithReadOnlyTransactionClosed");
81 ShardTransactionMessages.ReadData readData =
82 ShardTransactionMessages.ReadData.newBuilder()
83 .setInstanceIdentifierPathArguments(
84 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
87 Future<Object> future =
88 akka.pattern.Patterns.ask(subject, readData, 3000);
89 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
91 subject.underlyingActor().getDOMStoreTransaction().close();
93 future = akka.pattern.Patterns.ask(subject, readData, 3000);
94 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
98 @Test(expected = ReadFailedException.class)
99 public void testNegativeReadWithReadWriteTransactionClosed()
102 final ActorRef shard = createShard();
103 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
104 testSchemaContext, datastoreContext, shardStats, "txn",
105 CreateTransaction.CURRENT_VERSION);
107 final TestActorRef<ShardTransaction> subject = TestActorRef
108 .create(getSystem(), props,
109 "testNegativeReadWithReadWriteTransactionClosed");
111 ShardTransactionMessages.ReadData readData =
112 ShardTransactionMessages.ReadData.newBuilder()
113 .setInstanceIdentifierPathArguments(
114 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
118 Future<Object> future =
119 akka.pattern.Patterns.ask(subject, readData, 3000);
120 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
122 subject.underlyingActor().getDOMStoreTransaction().close();
124 future = akka.pattern.Patterns.ask(subject, readData, 3000);
125 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
128 @Test(expected = ReadFailedException.class)
129 public void testNegativeExistsWithReadWriteTransactionClosed()
132 final ActorRef shard = createShard();
133 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
134 testSchemaContext, datastoreContext, shardStats, "txn",
135 CreateTransaction.CURRENT_VERSION);
137 final TestActorRef<ShardTransaction> subject = TestActorRef
138 .create(getSystem(), props,
139 "testNegativeExistsWithReadWriteTransactionClosed");
141 ShardTransactionMessages.DataExists dataExists =
142 ShardTransactionMessages.DataExists.newBuilder()
143 .setInstanceIdentifierPathArguments(
144 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
148 Future<Object> future =
149 akka.pattern.Patterns.ask(subject, dataExists, 3000);
150 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
152 subject.underlyingActor().getDOMStoreTransaction().close();
154 future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
155 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
158 @Test(expected = AskTimeoutException.class)
159 public void testNegativeWriteWithTransactionReady() throws Exception {
162 final ActorRef shard = createShard();
163 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
164 testSchemaContext, datastoreContext, shardStats, "txn",
165 CreateTransaction.CURRENT_VERSION);
167 final TestActorRef<ShardTransaction> subject = TestActorRef
168 .create(getSystem(), props,
169 "testNegativeWriteWithTransactionReady");
171 ShardTransactionMessages.ReadyTransaction readyTransaction =
172 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
174 Future<Object> future =
175 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
176 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
178 ShardTransactionMessages.WriteData writeData =
179 ShardTransactionMessages.WriteData.newBuilder()
180 .setInstanceIdentifierPathArguments(
181 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
182 .build()).setNormalizedNode(
183 buildNormalizedNode()
187 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
188 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
191 @Test(expected = AskTimeoutException.class)
192 public void testNegativeReadWriteWithTransactionReady() throws Exception {
195 final ActorRef shard = createShard();
196 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
197 testSchemaContext, datastoreContext, shardStats, "txn",
198 CreateTransaction.CURRENT_VERSION);
200 final TestActorRef<ShardTransaction> subject = TestActorRef
201 .create(getSystem(), props,
202 "testNegativeReadWriteWithTransactionReady");
204 ShardTransactionMessages.ReadyTransaction readyTransaction =
205 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
207 Future<Object> future =
208 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
209 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
211 ShardTransactionMessages.WriteData writeData =
212 ShardTransactionMessages.WriteData.newBuilder()
213 .setInstanceIdentifierPathArguments(
214 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
217 .setNormalizedNode(buildNormalizedNode())
220 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
221 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
224 private NormalizedNodeMessages.Node buildNormalizedNode() {
225 return NormalizedNodeSerializer
226 .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
229 @Test(expected = AskTimeoutException.class)
230 public void testNegativeMergeTransactionReady() throws Exception {
233 final ActorRef shard = createShard();
234 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
235 testSchemaContext, datastoreContext, shardStats, "txn",
236 CreateTransaction.CURRENT_VERSION);
238 final TestActorRef<ShardTransaction> subject = TestActorRef
239 .create(getSystem(), props, "testNegativeMergeTransactionReady");
241 ShardTransactionMessages.ReadyTransaction readyTransaction =
242 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
244 Future<Object> future =
245 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
246 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
248 ShardTransactionMessages.MergeData mergeData =
249 ShardTransactionMessages.MergeData.newBuilder()
250 .setInstanceIdentifierPathArguments(
251 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
252 .build()).setNormalizedNode(
253 buildNormalizedNode()
257 future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
258 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
262 @Test(expected = AskTimeoutException.class)
263 public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
266 final ActorRef shard = createShard();
267 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
268 testSchemaContext, datastoreContext, shardStats, "txn",
269 CreateTransaction.CURRENT_VERSION);
271 final TestActorRef<ShardTransaction> subject = TestActorRef
272 .create(getSystem(), props,
273 "testNegativeDeleteDataWhenTransactionReady");
275 ShardTransactionMessages.ReadyTransaction readyTransaction =
276 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
278 Future<Object> future =
279 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
280 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
282 ShardTransactionMessages.DeleteData deleteData =
283 ShardTransactionMessages.DeleteData.newBuilder()
284 .setInstanceIdentifierPathArguments(
285 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
288 future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
289 Await.result(future, Duration.create(3, TimeUnit.SECONDS));