import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
} else if (message instanceof ForwardedReadyTransaction) {
commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
getSender(), this);
+ } else if (message instanceof ReadyLocalTransaction) {
+ handleReadyLocalTransaction((ReadyLocalTransaction)message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
+ private void noLeaderError(Object message) {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+
private void handleBatchedModifications(BatchedModifications batched) {
// This message is sent to prepare the modificationsa transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find the leader for shard %s. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ noLeaderError(batched);
+ }
+ }
+ }
+
+ private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+ if (isLeader()) {
+ try {
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ message.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ leader.forward(message, getContext());
+ } else {
+ noLeaderError(message);
}
}
}
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
return batched.isReady();
}
+ /**
+ * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
+ * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+ *
+ * @param message
+ * @param sender
+ * @param shard
+ */
+ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+ final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification());
+ final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+ cohortCache.put(message.getTransactionID(), cohortEntry);
+ cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+ log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+ if (message.isDoCommitOnReady()) {
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }
+
private void handleCanCommit(CohortEntry cohortEntry) {
String transactionID = cohortEntry.getTransactionID();
this.transaction = null;
}
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+ this.transactionID = transactionID;
+ this.cohort = cohort;
+ this.transaction = null;
+ }
+
void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis();
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+/**
+ * Message notifying the shard leader to apply modifications which have been
+ * prepared locally against its DataTree. This message is not directly serializable,
+ * simply because the leader and sender need to be on the same system.
+ */
+public final class ReadyLocalTransaction {
+ private final DataTreeModification modification;
+ private final String transactionID;
+ private final boolean doCommitOnReady;
+
+ public ReadyLocalTransaction(final String transactionID, DataTreeModification modification, boolean doCommitOnReady) {
+ this.transactionID = Preconditions.checkNotNull(transactionID);
+ this.modification = Preconditions.checkNotNull(modification);
+ this.doCommitOnReady = doCommitOnReady;
+ }
+
+ public String getTransactionID() {
+ return transactionID;
+ }
+
+ public DataTreeModification getModification() {
+ return modification;
+ }
+
+ public boolean isDoCommitOnReady() {
+ return doCommitOnReady;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.serialization.JSerializer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+/**
+ * Specialized message transformer, which transforms a {@link ReadyLocalTransaction}
+ * into a {@link BatchedModifications} message. This serializer needs to be plugged
+ * into akka serialization to allow forwarding of ReadyLocalTransaction to remote
+ * shards.
+ */
+public final class ReadyLocalTransactionSerializer extends JSerializer {
+ @Override
+ public int identifier() {
+ return 97439437;
+ }
+
+ @Override
+ public boolean includeManifest() {
+ return false;
+ }
+
+ @Override
+ public byte[] toBinary(final Object obj) {
+ Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
+ final ReadyLocalTransaction msg = (ReadyLocalTransaction) obj;
+ final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(),
+ DataStoreVersions.CURRENT_VERSION, "");
+ batched.setDoCommitOnReady(msg.isDoCommitOnReady());
+ batched.setReady(true);
+
+ msg.getModification().applyToCursor(new BatchedCursor(batched));
+
+ return SerializationUtils.serialize(batched);
+ }
+
+ @Override
+ public Object fromBinaryJava(final byte[] bytes, final Class<?> clazz) {
+ return SerializationUtils.deserialize(bytes);
+ }
+
+ private static final class BatchedCursor implements DataTreeModificationCursor {
+ private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+ private final BatchedModifications message;
+
+ BatchedCursor(final BatchedModifications message) {
+ this.message = Preconditions.checkNotNull(message);
+ stack.push(YangInstanceIdentifier.EMPTY);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ message.addModification(new DeleteModification(stack.peek().node(child)));
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ message.addModification(new MergeModification(stack.peek().node(child), data));
+ }
+
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ message.addModification(new WriteModification(stack.peek().node(child), data));
+ }
+
+ @Override
+ public void enter(@Nonnull final PathArgument child) {
+ stack.push(stack.peek().node(child));
+ }
+
+ @Override
+ public void enter(@Nonnull final PathArgument... path) {
+ for (PathArgument arg : path) {
+ enter(arg);
+ }
+ }
+
+ @Override
+ public void enter(@Nonnull final Iterable<PathArgument> path) {
+ for (PathArgument arg : path) {
+ enter(arg);
+ }
+ }
+
+ @Override
+ public void exit() {
+ stack.pop();
+ }
+
+ @Override
+ public void exit(final int depth) {
+ Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
+ for (int i = 0; i < depth; ++i) {
+ stack.pop();
+ }
+ }
+
+ @Override
+ public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close() {
+ // No-op
+ }
+ }
+}
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
+ readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
}
serialization-bindings {
"com.google.protobuf.Message" = proto
-
+ "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
}
}
remote {
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
}};
}
+ @Test
+ public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyLocalTransactionWithImmediateCommit");
+
+ waitUntilLeader(shard);
+
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+
+ ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ String txId = "tx1";
+ ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyLocalTransactionWithThreePhaseCommit");
+
+ waitUntilLeader(shard);
+
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+
+ ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ String txId = "tx1";
+ ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
+ expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
@Test
public void testCommitWithPersistenceDisabled() throws Throwable {
dataStoreContextBuilder.persistent(false);
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+
+/**
+ * Unit tests for ReadyLocalTransactionSerializer.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadyLocalTransactionSerializerTest {
+
+ @Test
+ public void testToAndFromBinary() {
+ TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
+ dataTree.setSchemaContext(TestModel.createTestContext());
+ DataTreeModification modification = dataTree.takeSnapshot().newModification();
+
+ ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ String txId = "tx-id";
+ ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+
+ ReadyLocalTransactionSerializer serializer = new ReadyLocalTransactionSerializer();
+
+ byte[] bytes = serializer.toBinary(readyMessage);
+
+ Object deserialized = serializer.fromBinary(bytes, ReadyLocalTransaction.class);
+
+ assertNotNull("fromBinary returned null", deserialized);
+ assertEquals("fromBinary return type", BatchedModifications.class, deserialized.getClass());
+ BatchedModifications batched = (BatchedModifications)deserialized;
+ assertEquals("getTransactionID", txId, batched.getTransactionID());
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, batched.getVersion());
+
+ List<Modification> batchedMods = batched.getModifications();
+ assertEquals("getModifications size", 2, batchedMods.size());
+
+ Modification mod = batchedMods.get(0);
+ assertEquals("Modification type", WriteModification.class, mod.getClass());
+ assertEquals("Modification getPath", TestModel.TEST_PATH, ((WriteModification)mod).getPath());
+ assertEquals("Modification getData", writeData, ((WriteModification)mod).getData());
+
+ mod = batchedMods.get(1);
+ assertEquals("Modification type", MergeModification.class, mod.getClass());
+ assertEquals("Modification getPath", TestModel.OUTER_LIST_PATH, ((MergeModification)mod).getPath());
+ assertEquals("Modification getData", mergeData, ((MergeModification)mod).getData());
+ }
+}