2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.compat;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.dispatch.Dispatchers;
15 import akka.testkit.TestActorRef;
16 import org.junit.Assert;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
19 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
20 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
21 import org.opendaylight.controller.cluster.datastore.Shard;
22 import org.opendaylight.controller.cluster.datastore.ShardTest;
23 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
24 import org.opendaylight.controller.cluster.datastore.TransactionType;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
33 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
36 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
37 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.duration.FiniteDuration;
44 * Tests backwards compatibility support from Helium-1 to Helium.
46 * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
47 * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
48 * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
49 * would be sans transactionId so this test verifies the Shard handles that properly.
51 * @author Thomas Pantelis
53 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
56 public void testTransactionCommit() throws Exception {
57 new ShardTestKit(getSystem()) {{
58 SchemaContext schemaContext = TestModel.createTestContext();
59 Props shardProps = Shard.builder().id(ShardIdentifier.builder().memberName("member-1").
60 shardName("inventory").type("config").build()).datastoreContext(DatastoreContext.newBuilder().
61 shardHeartbeatIntervalInMillis(100).build()).schemaContext(schemaContext).props().
62 withDispatcher(Dispatchers.DefaultDispatcherId());
64 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
65 "testTransactionCommit");
67 waitUntilLeader(shard);
69 // Send CreateTransaction message with no messages version
71 String transactionID = "txn-1";
72 shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
73 .setTransactionId(transactionID)
74 .setTransactionType(TransactionType.WRITE_ONLY.ordinal())
75 .setTransactionChainId("").build(), getRef());
77 final FiniteDuration duration = duration("5 seconds");
79 CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
81 ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
83 // Write data to the Tx
85 txActor.tell(new WriteData(TestModel.TEST_PATH,
86 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
87 toSerializable(), getRef());
89 expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
93 txActor.tell(new ReadyTransaction().toSerializable(), getRef());
95 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
96 duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
98 ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
100 // Send the CanCommitTransaction message with no transactionId.
102 cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
105 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
107 // Send the PreCommitTransaction message with no transactionId.
109 cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
112 expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
114 // Send the CommitTransaction message with no transactionId.
116 cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
119 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
121 NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
122 Assert.assertNotNull("Data not found in store", node);
124 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
129 public void testTransactionAbort() throws Exception {
130 new ShardTestKit(getSystem()) {{
131 SchemaContext schemaContext = TestModel.createTestContext();
132 Props shardProps = Shard.builder().id(ShardIdentifier.builder().memberName("member-1").
133 shardName("inventory").type("config").build()).datastoreContext(DatastoreContext.newBuilder().
134 shardHeartbeatIntervalInMillis(100).build()).schemaContext(schemaContext).props().
135 withDispatcher(Dispatchers.DefaultDispatcherId());
137 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
138 "testTransactionAbort");
140 waitUntilLeader(shard);
142 // Send CreateTransaction message with no messages version
144 String transactionID = "txn-1";
145 shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
146 .setTransactionId(transactionID)
147 .setTransactionType(TransactionType.WRITE_ONLY.ordinal())
148 .setTransactionChainId("").build(), getRef());
150 final FiniteDuration duration = duration("5 seconds");
152 CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
154 ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
156 // Write data to the Tx
158 txActor.tell(new WriteData(TestModel.TEST_PATH,
159 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
160 DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
162 expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
163 DataStoreVersions.BASE_HELIUM_VERSION).getClass());
167 txActor.tell(new ReadyTransaction().toSerializable(), getRef());
169 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
170 duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
172 ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
174 // Send the CanCommitTransaction message with no transactionId.
176 cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
179 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
181 // Send the AbortTransaction message with no transactionId.
183 cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
186 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
188 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());