public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamReader {
- private DataInputStream reader;
-
private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeInputStreamReader.class);
- private Map<Integer, String> codedStringMap = new HashMap<>();
private static final String REVISION_ARG = "?revision=";
+ private final DataInputStream reader;
+
+ private final Map<Integer, String> codedStringMap = new HashMap<>();
+
+ private QName lastLeafSetQName;
+
public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
reader = new DataInputStream(stream);
}
-
+ @Override
public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
NormalizedNode<?, ?> node = null;
node = augmentationBuilder.build();
} else {
- QName qName = readQName();
-
if(nodeType == NodeTypes.LEAF_SET_ENTRY_NODE) {
LOG.debug("Reading leaf set entry node. Will create NodeWithValue instance identifier");
// Read the object value
Object value = readObject();
- YangInstanceIdentifier.NodeWithValue nodeWithValue = new YangInstanceIdentifier.NodeWithValue(qName, value);
- node = Builders.leafSetEntryBuilder().withNodeIdentifier(nodeWithValue).withValue(value).build();
+ YangInstanceIdentifier.NodeWithValue nodeWithValue = new YangInstanceIdentifier.NodeWithValue(
+ lastLeafSetQName, value);
+ node = Builders.leafSetEntryBuilder().withNodeIdentifier(nodeWithValue).
+ withValue(value).build();
} else if(nodeType == NodeTypes.MAP_ENTRY_NODE) {
LOG.debug("Reading map entry node. Will create node identifier with predicates.");
+ QName qName = readQName();
YangInstanceIdentifier.NodeIdentifierWithPredicates nodeIdentifier =
new YangInstanceIdentifier.NodeIdentifierWithPredicates(qName, readKeyValueMap());
DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> mapEntryBuilder
} else {
LOG.debug("Creating standard node identifier. ");
+
+ QName qName = readQName();
YangInstanceIdentifier.NodeIdentifier identifier = new YangInstanceIdentifier.NodeIdentifier(qName);
node = readNodeIdentifierDependentNode(nodeType, identifier);
LOG.debug("Read leaf set node");
ListNodeBuilder<Object, LeafSetEntryNode<Object>> leafSetBuilder =
Builders.leafSetBuilder().withNodeIdentifier(identifier);
- leafSetBuilder = addLeafSetChildren(leafSetBuilder);
+ leafSetBuilder = addLeafSetChildren(identifier.getNodeType(), leafSetBuilder);
return leafSetBuilder.build();
default :
}
}
- private ListNodeBuilder<Object, LeafSetEntryNode<Object>> addLeafSetChildren(ListNodeBuilder<Object,
- LeafSetEntryNode<Object>> builder)
+ private ListNodeBuilder<Object, LeafSetEntryNode<Object>> addLeafSetChildren(QName nodeType,
+ ListNodeBuilder<Object, LeafSetEntryNode<Object>> builder)
throws IOException {
LOG.debug("Reading children of leaf set");
+
+ lastLeafSetQName = nodeType;
+
LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
while(child != null) {
import com.google.common.collect.Iterables;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWriter{
- private DataOutputStream writer;
-
private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
- private Map<String, Integer> stringCodeMap = new HashMap<>();
+ private final DataOutputStream writer;
+
+ private final Map<String, Integer> stringCodeMap = new HashMap<>();
public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
}
@Override
- public void leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object value) throws IOException, IllegalArgumentException {
- Preconditions.checkNotNull(name, "Node identifier should not be null");
-
+ public void leafSetEntryNode(Object value) throws IOException, IllegalArgumentException {
LOG.debug("Writing a new leaf set entry node");
- startNode(name.getNodeType(), NodeTypes.LEAF_SET_ENTRY_NODE);
+ writer.writeByte(NodeTypes.LEAF_SET_ENTRY_NODE);
writeObject(value);
}
+++ /dev/null
-
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.node.utils.stream;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * Event Stream Writer based on Normalized Node tree representation
- *
- * <h3>Writing Event Stream</h3>
- *
- * <ul>
- * <li><code>container</code> - Container node representation, start event is
- * emitted using {@link #startContainerNode(YangInstanceIdentifier.NodeIdentifier, int)}
- * and node end event is
- * emitted using {@link #endNode()}. Container node is implementing
- * {@link org.opendaylight.yangtools.yang.binding.DataObject} interface.
- *
- * <li><code>list</code> - YANG list statement has two representation in event
- * stream - unkeyed list and map. Unkeyed list is YANG list which did not
- * specify key.</li>
- *
- * <ul>
- * <li><code>Map</code> - Map start event is emitted using
- * {@link #startMapNode(YangInstanceIdentifier.NodeIdentifier, int)}
- * and is ended using {@link #endNode()}. Each map entry start is emitted using
- * {@link #startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates, int)}
- * with Map of keys
- * and finished using {@link #endNode()}.</li>
- *
- * <li><code>UnkeyedList</code> - Unkeyed list represent list without keys,
- * unkeyed list start is emitted using
- * {@link #startUnkeyedList(YangInstanceIdentifier.NodeIdentifier, int)} list
- * end is emitted using {@link #endNode()}. Each list item is emitted using
- * {@link #startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier, int)}
- * and ended using {@link #endNode()}.</li>
- * </ul>
- *
- * <li><code>leaf</code> - Leaf node event is emitted using
- * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)}.
- * {@link #endNode()} MUST NOT BE emitted for
- * leaf node.</li>
- *
- * <li><code>leaf-list</code> - Leaf list start is emitted using
- * {@link #startLeafSet(YangInstanceIdentifier.NodeIdentifier, int)}.
- * Leaf list end is emitted using
- * {@link #endNode()}. Leaf list entries are emitted using
- * {@link #leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object).
- *
- * <li><code>anyxml - Anyxml node event is emitted using
- * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)}. {@link #endNode()} MUST NOT BE emitted
- * for anyxml node.</code></li>
- *
- *
- * <li><code>choice</code> Choice node event is emmited by
- * {@link #startChoiceNode(YangInstanceIdentifier.NodeIdentifier, int)} event and
- * finished by invoking {@link #endNode()}
- * <li>
- * <code>augment</code> - Represents augmentation, augmentation node is started
- * by invoking {@link #startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier)} and
- * finished by invoking {@link #endNode()}.</li>
- *
- * </ul>
- *
- * <h3>Implementation notes</h3>
- *
- * <p>
- * Implementations of this interface must not hold user suppled objects
- * and resources needlessly.
- *
- */
-
-public interface NormalizedNodeStreamWriter extends Closeable, Flushable {
-
- public final int UNKNOWN_SIZE = -1;
-
- /**
- * Write the leaf node identifier and value to the stream.
- * @param name
- * @param value
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing leaf Set node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startLeafSet(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Write the leaf Set Entry Node object to the stream with identifier and value.
- * @param name
- * @param value
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object value)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing container node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startContainerNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing unkeyed list node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startUnkeyedList(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing unkeyed list item. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalStateException
- */
- void startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalStateException;
-
- /**
- * Start writing map node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing map entry node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param identifier
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates identifier, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing ordered map node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startOrderedMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing choice node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param name
- * @param childSizeHint is the estimated children count. Usage is optional in implementation.
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startChoiceNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
- throws IOException, IllegalArgumentException;
-
- /**
- * Start writing augmentation node. You must call {@link #endNode()} once you are done writing all of its children.
- * @param identifier
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier identifier)
- throws IOException, IllegalArgumentException;
-
- /**
- * Write any xml node identifier and value to the stream
- * @param name
- * @param value
- * @throws IOException
- * @throws IllegalArgumentException
- */
- void anyxmlNode(YangInstanceIdentifier.NodeIdentifier name, Object value)
- throws IOException, IllegalArgumentException;
-
- /**
- * This method should be used to add end symbol/identifier of node in the stream.
- * @throws IOException
- * @throws IllegalStateException
- */
- void endNode() throws IOException, IllegalStateException;
-
- @Override
- void close() throws IOException;
-
- @Override
- void flush() throws IOException;
-}
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.util.TestModel;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import static org.junit.Assert.fail;
-
public class NormalizedNodeStreamReaderWriterTest {
final NormalizedNode<?, ?> input = TestModel.createTestContainer();
@Test
- public void testNormalizedNodeStreamReaderWriter() {
+ public void testNormalizedNodeStreamReaderWriter() throws IOException {
byte[] byteData = null;
normalizedNodeWriter.write(input);
byteData = byteArrayOutputStream.toByteArray();
- } catch (IOException e) {
- fail("Writing to OutputStream failed :" + e.toString());
}
- try(NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(new ByteArrayInputStream(byteData))) {
+ try(NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(byteData))) {
NormalizedNode<?,?> node = reader.readNormalizedNode();
Assert.assertEquals(input, node);
- } catch (IOException e) {
- fail("Reading from InputStream failed :" + e.toString());
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.node.utils.stream;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
-import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
-import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.OrderedMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListNode;
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamWriter.UNKNOWN_SIZE;
-
-
-/**
- * This class is used only for testing purpose for now, we may use similar logic while integrating
- * with cluster
- */
-
-public class NormalizedNodeWriter implements Closeable, Flushable {
- private final NormalizedNodeStreamWriter writer;
-
- private NormalizedNodeWriter(final NormalizedNodeStreamWriter writer) {
- this.writer = Preconditions.checkNotNull(writer);
- }
-
- protected final NormalizedNodeStreamWriter getWriter() {
- return writer;
- }
-
- /**
- * Create a new writer backed by a {@link org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter}.
- *
- * @param writer Back-end writer
- * @return A new instance.
- */
- public static NormalizedNodeWriter forStreamWriter(final NormalizedNodeStreamWriter writer) {
- return new NormalizedNodeWriter(writer);
- }
-
-
- /**
- * Iterate over the provided {@link org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode} and emit write
- * events to the encapsulated {@link org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter}.
- *
- * @param node Node
- * @return
- * @throws java.io.IOException when thrown from the backing writer.
- */
- public final NormalizedNodeWriter write(final NormalizedNode<?, ?> node) throws IOException {
- if (wasProcessedAsComplexNode(node)) {
- return this;
- }
-
- if (wasProcessAsSimpleNode(node)) {
- return this;
- }
-
- throw new IllegalStateException("It wasn't possible to serialize node " + node);
- }
-
- @Override
- public void flush() throws IOException {
- writer.flush();
- }
-
- @Override
- public void close() throws IOException {
- writer.flush();
- writer.close();
- }
-
- /**
- * Emit a best guess of a hint for a particular set of children. It evaluates the
- * iterable to see if the size can be easily gotten to. If it is, we hint at the
- * real number of child nodes. Otherwise we emit UNKNOWN_SIZE.
- *
- * @param children Child nodes
- * @return Best estimate of the collection size required to hold all the children.
- */
- static final int childSizeHint(final Iterable<?> children) {
- return (children instanceof Collection) ? ((Collection<?>) children).size() : UNKNOWN_SIZE;
- }
-
- private boolean wasProcessAsSimpleNode(final NormalizedNode<?, ?> node) throws IOException {
- if (node instanceof LeafSetEntryNode) {
- final LeafSetEntryNode<?> nodeAsLeafList = (LeafSetEntryNode<?>)node;
- writer.leafSetEntryNode(nodeAsLeafList.getIdentifier(), nodeAsLeafList.getValue());
- return true;
- } else if (node instanceof LeafNode) {
- final LeafNode<?> nodeAsLeaf = (LeafNode<?>)node;
- writer.leafNode(nodeAsLeaf.getIdentifier(), nodeAsLeaf.getValue());
- return true;
- } else if (node instanceof AnyXmlNode) {
- final AnyXmlNode anyXmlNode = (AnyXmlNode)node;
- writer.anyxmlNode(anyXmlNode.getIdentifier(), anyXmlNode.getValue());
- return true;
- }
-
- return false;
- }
-
- /**
- * Emit events for all children and then emit an endNode() event.
- *
- * @param children Child iterable
- * @return True
- * @throws java.io.IOException when the writer reports it
- */
- protected final boolean writeChildren(final Iterable<? extends NormalizedNode<?, ?>> children) throws IOException {
- for (NormalizedNode<?, ?> child : children) {
- write(child);
- }
-
- writer.endNode();
- return true;
- }
-
- protected boolean writeMapEntryNode(final MapEntryNode node) throws IOException {
- writer.startMapEntryNode(node.getIdentifier(), childSizeHint(node.getValue()));
- return writeChildren(node.getValue());
- }
-
- private boolean wasProcessedAsComplexNode(final NormalizedNode<?, ?> node) throws IOException {
- if (node instanceof ContainerNode) {
- final ContainerNode n = (ContainerNode) node;
- writer.startContainerNode(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
- if (node instanceof MapEntryNode) {
- return writeMapEntryNode((MapEntryNode) node);
- }
- if (node instanceof UnkeyedListEntryNode) {
- final UnkeyedListEntryNode n = (UnkeyedListEntryNode) node;
- writer.startUnkeyedListItem(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
- if (node instanceof ChoiceNode) {
- final ChoiceNode n = (ChoiceNode) node;
- writer.startChoiceNode(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
- if (node instanceof AugmentationNode) {
- final AugmentationNode n = (AugmentationNode) node;
- writer.startAugmentationNode(n.getIdentifier());
- return writeChildren(n.getValue());
- }
- if (node instanceof UnkeyedListNode) {
- final UnkeyedListNode n = (UnkeyedListNode) node;
- writer.startUnkeyedList(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
- if (node instanceof OrderedMapNode) {
- final OrderedMapNode n = (OrderedMapNode) node;
- writer.startOrderedMapNode(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
- if (node instanceof MapNode) {
- final MapNode n = (MapNode) node;
- writer.startMapNode(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
- if (node instanceof LeafSetNode) {
- //covers also OrderedLeafSetNode for which doesn't exist start* method
- final LeafSetNode<?> n = (LeafSetNode<?>) node;
- writer.startLeafSet(n.getIdentifier(), childSizeHint(n.getValue()));
- return writeChildren(n.getValue());
- }
-
- return false;
- }
-}
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URISyntaxException;
public class SampleNormalizedNodeSerializable implements Serializable {
+ private static final long serialVersionUID = 1L;
private NormalizedNode<?, ?> input;
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
}
}
- private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) {
+ private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
+ boolean returnSerialized) {
+ LOG.debug("writeData at path : {}", message.getPath());
+
modification.addModification(
new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
- if(LOG.isDebugEnabled()) {
- LOG.debug("writeData at path : " + message.getPath().toString());
- }
try {
transaction.write(message.getPath(), message.getData());
WriteDataReply writeDataReply = new WriteDataReply();
}
}
- private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, boolean returnSerialized) {
+ private void mergeData(DOMStoreWriteTransaction transaction, MergeData message,
+ boolean returnSerialized) {
+ LOG.debug("mergeData at path : {}", message.getPath());
+
modification.addModification(
new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
- if(LOG.isDebugEnabled()) {
- LOG.debug("mergeData at path : " + message.getPath().toString());
- }
+
try {
transaction.merge(message.getPath(), message.getData());
MergeDataReply mergeDataReply = new MergeDataReply();
}
}
- private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, boolean returnSerialized) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("deleteData at path : " + message.getPath().toString());
- }
+ private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message,
+ boolean returnSerialized) {
+ LOG.debug("deleteData at path : {}", message.getPath());
+
modification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, boolean returnSerialized) {
+ private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
+ boolean returnSerialized) {
+ String transactionID = getTransactionID();
+
+ LOG.debug("readyTransaction : {}", transactionID);
+
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(
- getTransactionID(), cohort, modification, returnSerialized),
- getContext());
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
+ returnSerialized), getContext());
+
+ // The shard will handle the commit from here so we're no longer needed - self-destruct.
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
}
// These classes are in here for test purposes only
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
import akka.testkit.TestActorRef;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
- @Test(expected = IllegalStateException.class)
+ @Test(expected = AskTimeoutException.class)
public void testNegativeWriteWithTransactionReady() throws Exception {
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
}
- @Test(expected = IllegalStateException.class)
+ @Test(expected = AskTimeoutException.class)
public void testNegativeReadWriteWithTransactionReady() throws Exception {
.serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
}
- @Test(expected = IllegalStateException.class)
+ @Test(expected = AskTimeoutException.class)
public void testNegativeMergeTransactionReady() throws Exception {
}
- @Test(expected = IllegalStateException.class)
+ @Test(expected = AskTimeoutException.class)
public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
- private void testOnReceiveReadData(final ActorRef subject) {
+ private void testOnReceiveReadData(final ActorRef transaction) {
//serialized read
- subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
ShardTransactionMessages.ReadDataReply replySerialized =
.getNormalizedNode());
// unserialized read
- subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
props, "testReadDataWhenDataNotFoundRW"));
}
- private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
+ private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
// serialized read
- subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
ShardTransactionMessages.ReadDataReply replySerialized =
expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
// unserialized read
- subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
- private void testOnReceiveDataExistsPositive(final ActorRef subject) {
- subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
+ private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
ShardTransactionMessages.DataExistsReply replySerialized =
assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
// unserialized read
- subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
- private void testOnReceiveDataExistsNegative(final ActorRef subject) {
- subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
+ private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+ transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
ShardTransactionMessages.DataExistsReply replySerialized =
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
// unserialized read
- subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
+ transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testWriteData");
+ final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
- subject.tell(new WriteData(TestModel.TEST_PATH,
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
getRef());
- ShardTransactionMessages.WriteDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
- assertModification(subject, WriteModification.class);
+ assertModification(transaction, WriteModification.class);
//unserialized write
- subject.tell(new WriteData(TestModel.TEST_PATH,
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME),
TestModel.createTestContext()),
getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testMergeData");
+ final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
- subject.tell(new MergeData(TestModel.TEST_PATH,
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
getRef());
- ShardTransactionMessages.MergeDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
- assertModification(subject, MergeModification.class);
+ assertModification(transaction, MergeModification.class);
//unserialized merge
- subject.tell(new MergeData(TestModel.TEST_PATH,
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testDeleteData");
+ final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
- subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
- ShardTransactionMessages.DeleteDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
- assertModification(subject, DeleteModification.class);
+ assertModification(transaction, DeleteModification.class);
//unserialized merge
- subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
}};
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testReadyTransaction");
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
- subject.tell(new ReadyTransaction().toSerializable(), getRef());
+ watch(transaction);
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+ transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
}};
// test
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testReadyTransaction2");
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+ watch(transaction);
- subject.tell(new ReadyTransaction(), getRef());
+ transaction.tell(new ReadyTransaction(), getRef());
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
}};
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+ final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
- watch(subject);
+ watch(transaction);
- subject.tell(new CloseTransaction().toSerializable(), getRef());
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
- expectMsgClass(duration("3 seconds"), Terminated.class);
+ expectTerminated(duration("3 seconds"), transaction);
}};
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final TestActorRef subject = TestActorRef.apply(props,getSystem());
+ final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+ transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
}
@Test
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
+ final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
- watch(subject);
+ watch(transaction);
- // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
-
- final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof Terminated) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", termination);
+ expectMsgClass(duration("3 seconds"), Terminated.class);
}};
}
}