2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.dispatch.Dispatchers;
15 import akka.testkit.TestActorRef;
16 import java.util.Collections;
17 import org.junit.Assert;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
28 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
29 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
30 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.duration.FiniteDuration;
38 * Tests backwards compatibility support from Helium-1 to Helium.
40 * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
41 * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
42 * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
43 * would be sans transactionId so this test verifies the Shard handles that properly.
45 * @author Thomas Pantelis
47 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
49 @SuppressWarnings("unchecked")
51 public void testTransactionCommit() throws Exception {
52 new ShardTestKit(getSystem()) {{
53 SchemaContext schemaContext = TestModel.createTestContext();
54 Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
55 shardName("inventory").type("config").build(),
56 Collections.<ShardIdentifier,String>emptyMap(),
57 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
58 schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
60 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
61 "testTransactionCommit");
63 waitUntilLeader(shard);
65 // Send CreateTransaction message with no messages version
67 String transactionID = "txn-1";
68 shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
69 .setTransactionId(transactionID)
70 .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
71 .setTransactionChainId("").build(), getRef());
73 final FiniteDuration duration = duration("5 seconds");
75 CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
77 ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
79 // Write data to the Tx
81 txActor.tell(new WriteData(TestModel.TEST_PATH,
82 ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
83 DataStoreVersions.BASE_HELIUM_VERSION), getRef());
85 expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
89 txActor.tell(new ReadyTransaction().toSerializable(), getRef());
91 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
92 duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
94 ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
96 // Send the CanCommitTransaction message with no transactionId.
98 cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
101 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
103 // Send the PreCommitTransaction message with no transactionId.
105 cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
108 expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
110 // Send the CommitTransaction message with no transactionId.
112 cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
115 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
117 NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
118 Assert.assertNotNull("Data not found in store", node);
120 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
125 public void testTransactionAbort() throws Exception {
126 new ShardTestKit(getSystem()) {{
127 SchemaContext schemaContext = TestModel.createTestContext();
128 Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
129 shardName("inventory").type("config").build(),
130 Collections.<ShardIdentifier,String>emptyMap(),
131 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
132 schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
134 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
135 "testTransactionAbort");
137 waitUntilLeader(shard);
139 // Send CreateTransaction message with no messages version
141 String transactionID = "txn-1";
142 shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
143 .setTransactionId(transactionID)
144 .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
145 .setTransactionChainId("").build(), getRef());
147 final FiniteDuration duration = duration("5 seconds");
149 CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
151 ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
153 // Write data to the Tx
155 txActor.tell(new WriteData(TestModel.TEST_PATH,
156 ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
158 expectMsgClass(duration, WriteDataReply.class);
162 txActor.tell(new ReadyTransaction().toSerializable(), getRef());
164 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
165 duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
167 ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
169 // Send the CanCommitTransaction message with no transactionId.
171 cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
174 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
176 // Send the AbortTransaction message with no transactionId.
178 cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
181 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
183 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());