package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
InstanceIdentifierUtils.toSerializable((YangInstanceIdentifier) value, context));
} else if(value instanceof Set) {
Set set = (Set) value;
- if(!set.isEmpty()){
- for(Object o : set){
- if(o instanceof String){
+ if (!set.isEmpty()) {
+ for (Object o : set) {
+ if (o instanceof String) {
builder.addBitsValue(o.toString());
} else {
throw new IllegalArgumentException("Expected value type to be Bits but was : " +
- value.toString());
+ value.toString());
}
}
}
+ } else if(value instanceof byte[]){
+ builder.setBytesValue(ByteString.copyFrom((byte[]) value));
} else {
builder.setValue(value.toString());
}
node.getInstanceIdentifierValue(), context);
} else if(node.getIntValueType() == ValueType.BITS_TYPE.ordinal()){
return new HashSet(node.getBitsValueList());
+ } else if(node.getIntValueType() == ValueType.BINARY_TYPE.ordinal()){
+ return node.getBytesValue().toByteArray();
}
return deSerializeBasicTypes(node.getIntValueType(), node.getValue());
}
YANG_IDENTIFIER_TYPE,
STRING_TYPE,
BIG_INTEGER_TYPE,
- BIG_DECIMAL_TYPE;
+ BIG_DECIMAL_TYPE,
+ BINARY_TYPE;
private static Map<Class, ValueType> types = new HashMap<>();
types.put(Short.class,SHORT_TYPE);
types.put(BigInteger.class, BIG_INTEGER_TYPE);
types.put(BigDecimal.class, BIG_DECIMAL_TYPE);
+ types.put(byte[].class, BINARY_TYPE);
}
public static final ValueType getSerializableType(Object node){
*/
com.google.protobuf.ByteString
getCodeBytes(int index);
+
+ // optional bytes bytesValue = 13;
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ boolean hasBytesValue();
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ com.google.protobuf.ByteString getBytesValue();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.Node}
code_.add(input.readBytes());
break;
}
+ case 106: {
+ bitField0_ |= 0x00000100;
+ bytesValue_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return code_.getByteString(index);
}
+ // optional bytes bytesValue = 13;
+ public static final int BYTESVALUE_FIELD_NUMBER = 13;
+ private com.google.protobuf.ByteString bytesValue_;
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ public boolean hasBytesValue() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ public com.google.protobuf.ByteString getBytesValue() {
+ return bytesValue_;
+ }
+
private void initFields() {
path_ = "";
type_ = "";
instanceIdentifierValue_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance();
bitsValue_ = com.google.protobuf.LazyStringArrayList.EMPTY;
code_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bytesValue_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
for (int i = 0; i < code_.size(); i++) {
output.writeBytes(12, code_.getByteString(i));
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ output.writeBytes(13, bytesValue_);
+ }
getUnknownFields().writeTo(output);
}
size += dataSize;
size += 1 * getCodeList().size();
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(13, bytesValue_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000400);
code_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000800);
+ bytesValue_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00001000);
return this;
}
bitField0_ = (bitField0_ & ~0x00000800);
}
result.code_ = code_;
+ if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+ to_bitField0_ |= 0x00000100;
+ }
+ result.bytesValue_ = bytesValue_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
onChanged();
}
+ if (other.hasBytesValue()) {
+ setBytesValue(other.getBytesValue());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional bytes bytesValue = 13;
+ private com.google.protobuf.ByteString bytesValue_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ public boolean hasBytesValue() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ public com.google.protobuf.ByteString getBytesValue() {
+ return bytesValue_;
+ }
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ public Builder setBytesValue(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00001000;
+ bytesValue_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes bytesValue = 13;</code>
+ */
+ public Builder clearBytesValue() {
+ bitField0_ = (bitField0_ & ~0x00001000);
+ bytesValue_ = getDefaultInstance().getBytesValue();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.Node)
}
"ntroller.mdsal.Attribute\022\017\n\007intType\030\006 \001(" +
"\005\"f\n\022InstanceIdentifier\022B\n\targuments\030\001 \003" +
"(\0132/.org.opendaylight.controller.mdsal.P" +
- "athArgument\022\014\n\004code\030\002 \003(\t\"\245\003\n\004Node\022\014\n\004pa" +
+ "athArgument\022\014\n\004code\030\002 \003(\t\"\271\003\n\004Node\022\014\n\004pa" +
"th\030\001 \001(\t\022\014\n\004type\030\002 \001(\t\022E\n\014pathArgument\030\003" +
" \001(\0132/.org.opendaylight.controller.mdsal" +
".PathArgument\022\017\n\007intType\030\004 \001(\005\022@\n\nattrib",
"pe\030\t \001(\005\022V\n\027instanceIdentifierValue\030\n \001(" +
"\01325.org.opendaylight.controller.mdsal.In" +
"stanceIdentifier\022\021\n\tbitsValue\030\013 \003(\t\022\014\n\004c" +
- "ode\030\014 \003(\t\"`\n\tContainer\022\022\n\nparentPath\030\001 \002" +
- "(\t\022?\n\016normalizedNode\030\002 \001(\0132\'.org.openday" +
- "light.controller.mdsal.Node\"\246\001\n\014NodeMapE",
- "ntry\022U\n\026instanceIdentifierPath\030\001 \002(\01325.o" +
- "rg.opendaylight.controller.mdsal.Instanc" +
- "eIdentifier\022?\n\016normalizedNode\030\002 \001(\0132\'.or" +
- "g.opendaylight.controller.mdsal.Node\"N\n\007" +
- "NodeMap\022C\n\nmapEntries\030\001 \003(\0132/.org.openda" +
- "ylight.controller.mdsal.NodeMapEntryBO\n5" +
- "org.opendaylight.controller.protobuff.me" +
- "ssages.commonB\026NormalizedNodeMessages"
+ "ode\030\014 \003(\t\022\022\n\nbytesValue\030\r \001(\014\"`\n\tContain" +
+ "er\022\022\n\nparentPath\030\001 \002(\t\022?\n\016normalizedNode" +
+ "\030\002 \001(\0132\'.org.opendaylight.controller.mds",
+ "al.Node\"\246\001\n\014NodeMapEntry\022U\n\026instanceIden" +
+ "tifierPath\030\001 \002(\01325.org.opendaylight.cont" +
+ "roller.mdsal.InstanceIdentifier\022?\n\016norma" +
+ "lizedNode\030\002 \001(\0132\'.org.opendaylight.contr" +
+ "oller.mdsal.Node\"N\n\007NodeMap\022C\n\nmapEntrie" +
+ "s\030\001 \003(\0132/.org.opendaylight.controller.md" +
+ "sal.NodeMapEntryBO\n5org.opendaylight.con" +
+ "troller.protobuff.messages.commonB\026Norma" +
+ "lizedNodeMessages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_Node_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_Node_descriptor,
- new java.lang.String[] { "Path", "Type", "PathArgument", "IntType", "Attributes", "Child", "Value", "ValueType", "IntValueType", "InstanceIdentifierValue", "BitsValue", "Code", });
+ new java.lang.String[] { "Path", "Type", "PathArgument", "IntType", "Attributes", "Child", "Value", "ValueType", "IntValueType", "InstanceIdentifierValue", "BitsValue", "Code", "BytesValue", });
internal_static_org_opendaylight_controller_mdsal_Container_descriptor =
getDescriptor().getMessageTypes().get(6);
internal_static_org_opendaylight_controller_mdsal_Container_fieldAccessorTable = new
repeated string bitsValue = 11; // intValueType = Bits
repeated string code = 12; // A list of string codes which can be used for any repeated strings in the NormalizedNode
+
+ optional bytes bytesValue = 13;
}
message Container{
package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
+import com.google.common.base.Optional;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.opendaylight.controller.cluster.datastore.util.TestModel;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class NormalizedNodeSerializerTest {
// created by serializing the original node and deSerializing it back.
assertEquals(expectedNode, actualNode);
+ byte[] binaryData = new byte[5];
+ for(byte i=0;i<5;i++){
+ binaryData[i] = i;
+ }
+
+ ContainerNode node1 = TestModel.createBaseTestContainerBuilder()
+ .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATE_QNAME, binaryData))
+ .build();
+
+ NormalizedNodeMessages.Node serializedNode1 = NormalizedNodeSerializer
+ .serialize(node1);
+
+ ContainerNode node2 =
+ (ContainerNode) NormalizedNodeSerializer.deSerialize(serializedNode1);
+
+
+ // FIXME: This will not work due to BUG 2326. Once that is fixed we can uncomment this assertion
+ // assertEquals(node1, node2);
+
+ Optional<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATE_QNAME));
+
+ Object value = child.get().getValue();
+
+ assertTrue("value should be of type byte[]", value instanceof byte[]);
+
+ byte[] bytesValue = (byte[]) value;
+
+ for(byte i=0;i<5;i++){
+ assertEquals(i, bytesValue[i]);
+ }
+
}
@Test(expected = NullPointerException.class)
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ValueTypeTest {
+
+ @Test
+ public void testGetSerializableType(){
+ byte[] b = new byte[10];
+ b[0] = 1;
+ b[2] = 2;
+
+ ValueType serializableType = ValueType.getSerializableType(b);
+ assertEquals(ValueType.BINARY_TYPE, serializableType);
+ }
+}
\ No newline at end of file
public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
public static final QName POINTER_QNAME = QName.create(TEST_QNAME, "pointer");
+ public static final QName SOME_BINARY_DATE_QNAME = QName.create(TEST_QNAME, "some-binary-data");
public static final QName SOME_REF_QNAME = QName.create(TEST_QNAME,
"some-ref");
public static final QName MYIDENTITY_QNAME = QName.create(TEST_QNAME,
}
- public static ContainerNode createTestContainer() {
-
-
- // Create a list of shoes
- // This is to test leaf list entry
- final LeafSetEntryNode<Object> nike =
- ImmutableLeafSetEntryNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeWithValue(QName.create(
- TEST_QNAME, "shoe"), "nike")).withValue("nike").build();
-
- final LeafSetEntryNode<Object> puma =
- ImmutableLeafSetEntryNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeWithValue(QName.create(
- TEST_QNAME, "shoe"), "puma")).withValue("puma").build();
-
- final LeafSetNode<Object> shoes =
- ImmutableLeafSetNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(QName.create(
- TEST_QNAME, "shoe"))).withChild(nike).withChild(puma)
- .build();
-
-
- // Test a leaf-list where each entry contains an identity
- final LeafSetEntryNode<Object> cap1 =
- ImmutableLeafSetEntryNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeWithValue(QName.create(
- TEST_QNAME, "capability"), DESC_QNAME))
- .withValue(DESC_QNAME).build();
-
- final LeafSetNode<Object> capabilities =
- ImmutableLeafSetNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(QName.create(
- TEST_QNAME, "capability"))).withChild(cap1).build();
-
- ContainerNode switchFeatures =
- ImmutableContainerNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(SWITCH_FEATURES_QNAME))
- .withChild(capabilities).build();
-
- // Create a leaf list with numbers
- final LeafSetEntryNode<Object> five =
- ImmutableLeafSetEntryNodeBuilder
- .create()
- .withNodeIdentifier(
- (new YangInstanceIdentifier.NodeWithValue(QName.create(
- TEST_QNAME, "number"), 5))).withValue(5).build();
- final LeafSetEntryNode<Object> fifteen =
- ImmutableLeafSetEntryNodeBuilder
- .create()
- .withNodeIdentifier(
- (new YangInstanceIdentifier.NodeWithValue(QName.create(
- TEST_QNAME, "number"), 15))).withValue(15).build();
- final LeafSetNode<Object> numbers =
- ImmutableLeafSetNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(QName.create(
- TEST_QNAME, "number"))).withChild(five).withChild(fifteen)
- .build();
-
-
- // Create augmentations
- MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test");
-
- // Create a bits leaf
+ public static DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> createBaseTestContainerBuilder() {
+ // Create a list of shoes
+ // This is to test leaf list entry
+ final LeafSetEntryNode<Object> nike =
+ ImmutableLeafSetEntryNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeWithValue(QName.create(
+ TEST_QNAME, "shoe"), "nike")).withValue("nike").build();
+
+ final LeafSetEntryNode<Object> puma =
+ ImmutableLeafSetEntryNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeWithValue(QName.create(
+ TEST_QNAME, "shoe"), "puma")).withValue("puma").build();
+
+ final LeafSetNode<Object> shoes =
+ ImmutableLeafSetNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(
+ TEST_QNAME, "shoe"))).withChild(nike).withChild(puma)
+ .build();
+
+
+ // Test a leaf-list where each entry contains an identity
+ final LeafSetEntryNode<Object> cap1 =
+ ImmutableLeafSetEntryNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeWithValue(QName.create(
+ TEST_QNAME, "capability"), DESC_QNAME))
+ .withValue(DESC_QNAME).build();
+
+ final LeafSetNode<Object> capabilities =
+ ImmutableLeafSetNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(
+ TEST_QNAME, "capability"))).withChild(cap1).build();
+
+ ContainerNode switchFeatures =
+ ImmutableContainerNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(SWITCH_FEATURES_QNAME))
+ .withChild(capabilities).build();
+
+ // Create a leaf list with numbers
+ final LeafSetEntryNode<Object> five =
+ ImmutableLeafSetEntryNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ (new YangInstanceIdentifier.NodeWithValue(QName.create(
+ TEST_QNAME, "number"), 5))).withValue(5).build();
+ final LeafSetEntryNode<Object> fifteen =
+ ImmutableLeafSetEntryNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ (new YangInstanceIdentifier.NodeWithValue(QName.create(
+ TEST_QNAME, "number"), 15))).withValue(15).build();
+ final LeafSetNode<Object> numbers =
+ ImmutableLeafSetNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(
+ TEST_QNAME, "number"))).withChild(five).withChild(fifteen)
+ .build();
+
+
+ // Create augmentations
+ MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test");
+
+ // Create a bits leaf
NormalizedNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, Object, LeafNode<Object>>
- myBits = Builders.leafBuilder().withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(
- QName.create(TEST_QNAME, "my-bits"))).withValue(
- ImmutableSet.of("foo", "bar"));
+ myBits = Builders.leafBuilder().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(
+ QName.create(TEST_QNAME, "my-bits"))).withValue(
+ ImmutableSet.of("foo", "bar"));
// Create the document
- return ImmutableContainerNodeBuilder
- .create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
- .withChild(myBits.build())
- .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
- .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer"))
- .withChild(
- ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier
- .builder().build()))
- .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
-
- // .withChild(augmentationNode)
- .withChild(shoes)
- .withChild(numbers)
- .withChild(switchFeatures)
- .withChild(
- mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
- .withChild(
- mapNodeBuilder(OUTER_LIST_QNAME)
- .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
- .withChild(BAR_NODE).build()).build();
+ return ImmutableContainerNodeBuilder
+ .create()
+ .withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+ .withChild(myBits.build())
+ .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
+ .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer"))
+ .withChild(
+ ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier
+ .builder().build()))
+ .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
+
+ // .withChild(augmentationNode)
+ .withChild(shoes)
+ .withChild(numbers)
+ .withChild(switchFeatures)
+ .withChild(
+ mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
+ .withChild(
+ mapNodeBuilder(OUTER_LIST_QNAME)
+ .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+ .withChild(BAR_NODE).build());
+ }
+ public static ContainerNode createTestContainer() {
+ return createBaseTestContainerBuilder().build();
}
public static MapEntryNode createAugmentedListEntry(int id, String name) {
}
}
+ leaf some-binary-data {
+ type binary;
+ }
+
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import scala.concurrent.Await;
import scala.concurrent.Future;
-
-import java.util.Collections;
-import java.util.List;
+import scala.concurrent.Promise;
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
private final String transactionChainId;
- private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
+ private volatile SimpleEntry<Object, List<Future<ActorSelection>>> previousTxReadyFutures;
public TransactionChainProxy(ActorContext actorContext) {
this.actorContext = actorContext;
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, this);
+ return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE, this);
+ return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY, this);
+ return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
return transactionChainId;
}
- public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
- this.cohortFutures = cohortFutures;
- }
+ private class ChainedTransactionProxy extends TransactionProxy {
+
+ ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+ super(actorContext, transactionType, transactionChainId);
+ }
+
+ @Override
+ protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+ if(!cohortFutures.isEmpty()) {
+ previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures);
+ } else {
+ previousTxReadyFutures = null;
+ }
+ }
+
+ /**
+ * This method is overridden to ensure the previous Tx's ready operations complete
+ * before we create the next shard Tx in the chain to avoid creation failures if the
+ * previous Tx's ready operations haven't completed yet.
+ */
+ @Override
+ protected Future<Object> sendCreateTransaction(final ActorSelection shard,
+ final Object serializedCreateMessage) {
+ // Check if there are any previous ready Futures. Also make sure the previous ready
+ // Futures aren't for this Tx as deadlock would occur if tried to wait on our own
+ // Futures. This may happen b/c the shard Tx creates are done async so it's possible
+ // for the client to ready this Tx before we've even attempted to create a shard Tx.
+ if(previousTxReadyFutures == null ||
+ previousTxReadyFutures.getKey().equals(getIdentifier())) {
+ return super.sendCreateTransaction(shard, serializedCreateMessage);
+ }
+
+ // Combine the ready Futures into 1.
+ Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+ previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher());
+
+ // Add a callback for completion of the combined Futures.
+ final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+ OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+ if(failure != null) {
+ // A Ready Future failed so fail the returned Promise.
+ createTxPromise.failure(failure);
+ } else {
+ // Send the CreateTx message and use the resulting Future to complete the
+ // returned Promise.
+ createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
+ serializedCreateMessage));
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
- public void waitTillCurrentTransactionReady(){
- try {
- Await.result(Futures
- .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
- actorContext.getOperationDuration());
- } catch (Exception e) {
- throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
+ return createTxPromise.future();
}
}
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
* <p>
private final TransactionType transactionType;
private final ActorContext actorContext;
private final TransactionIdentifier identifier;
- private final TransactionChainProxy transactionChainProxy;
+ private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
- this(actorContext, transactionType, null);
+ this(actorContext, transactionType, "");
}
public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
- TransactionChainProxy transactionChainProxy) {
+ String transactionChainId) {
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
"transactionType should not be null");
this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
"schemaContext should not be null");
- this.transactionChainProxy = transactionChainProxy;
+ this.transactionChainId = transactionChainId;
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
return recordedOperationFutures;
}
+ @VisibleForTesting
+ boolean hasTransactionContext() {
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
}
}
- if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohortFutures);
- }
+ onTransactionReady(cohortFutures);
return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
+ /**
+ * Method for derived classes to be notified when the transaction has been readied.
+ *
+ * @param cohortFutures the cohort Futures for each shard transaction.
+ */
+ protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+ }
+
+ /**
+ * Method called to send a CreateTransaction message to a shard.
+ *
+ * @param shard the shard actor to send to
+ * @param serializedCreateMessage the serialized message to send
+ * @return the response Future
+ */
+ protected Future<Object> sendCreateTransaction(ActorSelection shard,
+ Object serializedCreateMessage) {
+ return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+ }
+
@Override
public Object getIdentifier() {
return this.identifier;
}
public String getTransactionChainId() {
- if(transactionChainProxy == null){
- return "";
- }
- return transactionChainProxy.getTransactionChainId();
+ return transactionChainId;
}
/**
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+ Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable());
// respect to #addTxOperationOnComplete to handle timing issues and ensure no
// TransactionOperation is missed and that they are processed in the order they occurred.
synchronized(txOperationsOnComplete) {
+ // Store the new TransactionContext locally until we've completed invoking the
+ // TransactionOperations. This avoids thread timing issues which could cause
+ // out-of-order TransactionOperations. Eg, on a modification operation, if the
+ // TransactionContext is non-null, then we directly call the TransactionContext.
+ // However, at the same time, the code may be executing the cached
+ // TransactionOperations. So to avoid thus timing, we don't publish the
+ // TransactionContext until after we've executed all cached TransactionOperations.
+ TransactionContext localTransactionContext;
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
failure.getMessage());
- transactionContext = new NoOpTransactionContext(failure, identifier);
+ localTransactionContext = new NoOpTransactionContext(failure, identifier);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
- createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+ localTransactionContext = createValidTransactionContext(
+ CreateTransactionReply.fromSerializable(response));
} else {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- transactionContext = new NoOpTransactionContext(exception, identifier);
+ localTransactionContext = new NoOpTransactionContext(exception, identifier);
}
for(TransactionOperation oper: txOperationsOnComplete) {
- oper.invoke(transactionContext);
+ oper.invoke(localTransactionContext);
}
txOperationsOnComplete.clear();
+
+ // We're done invoking the TransactionOperations so we can now publish the
+ // TransactionContext.
+ transactionContext = localTransactionContext;
}
}
- private void createValidTransactionContext(CreateTransactionReply reply) {
+ private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
String transactionPath = reply.getTransactionPath();
LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ return new TransactionContextImpl(transactionPath, transactionActor, identifier,
actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// Verify the data in the store
// 5. Commit the Tx
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// 6. Verify the data in the store
// Wait for the Tx commit to complete.
- assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
- txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
- txCohort.get().commit().get(5, TimeUnit.SECONDS);
+ doCommit(txCohort.get());
// Verify the data in the store
@Test
public void testTransactionChain() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem()) {{
- DistributedDataStore dataStore =
- setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
+ DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
// 1. Create a Tx chain and write-only Tx
// 2. Write some data
- NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- writeTx.write(TestModel.TEST_PATH, containerNode);
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx.write(TestModel.TEST_PATH, testNode);
// 3. Ready the Tx for commit
- DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+ final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
- // 4. Commit the Tx
+ // 4. Commit the Tx on another thread that first waits for the second read Tx.
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ final CountDownLatch continueCommit1 = new CountDownLatch(1);
+ final CountDownLatch commit1Done = new CountDownLatch(1);
+ final AtomicReference<Exception> commit1Error = new AtomicReference<>();
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ continueCommit1.await();
+ doCommit(cohort1);
+ } catch (Exception e) {
+ commit1Error.set(e);
+ } finally {
+ commit1Done.countDown();
+ }
+ }
+ }.start();
- // 5. Verify the data in the store
+ // 5. Create a new read Tx from the chain to read and verify the data from the first
+ // Tx is visible after being readied.
DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
-
Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", containerNode, optional.get());
+ assertEquals("Data node", testNode, optional.get());
+
+ // 6. Create a new RW Tx from the chain, write more data, and ready it
+
+ DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+ MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
+
+ DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
+
+ // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
+ // verify it is visible.
+
+ readTx = txChain.newReadOnlyTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
+ // 8. Wait for the 2 commits to complete and close the chain.
+
+ continueCommit1.countDown();
+ Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
+
+ if(commit1Error.get() != null) {
+ throw commit1Error.get();
+ }
+
+ doCommit(cohort2);
txChain.close();
+ // 9. Create a new read Tx from the data store and verify committed data.
+
+ readTx = dataStore.newReadOnlyTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
cleanup(dataStore);
}};
}
DistributedDataStore dataStore =
setupDistributedDataStore("testChangeListenerRegistration", "test-1");
- MockDataChangeListener listener = new MockDataChangeListener(3);
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
ListenerRegistration<MockDataChangeListener>
listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
assertNotNull("registerChangeListener returned null", listenerReg);
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ // Wait for the initial notification
+
+ listener.waitForChangeEvents(TestModel.TEST_PATH);
+
+ listener.reset(2);
+
+ // Write 2 updates.
testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
testWriteTransaction(dataStore, listPath,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
- listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+ // Wait for the 2 updates.
+
+ listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
listenerReg.close();
// 4. Commit the Tx
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
+ doCommit(cohort);
// 5. Verify the data in the store
assertEquals("Data node", nodeToWrite, optional.get());
}
+ void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+ Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+ assertEquals("canCommit", true, canCommit);
+ cohort.preCommit().get(5, TimeUnit.SECONDS);
+ cohort.commit().get(5, TimeUnit.SECONDS);
+ }
+
void cleanup(DistributedDataStore dataStore) {
dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
/**
* A mock DataChangeListener implementation.
public class MockDataChangeListener implements
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- changeList = Lists.newArrayList();
- private final CountDownLatch changeLatch;
- private final int expChangeEventCount;
+ private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> changeList =
+ Collections.synchronizedList(Lists.<AsyncDataChangeEvent<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>>newArrayList());
+
+ private volatile CountDownLatch changeLatch;
+ private int expChangeEventCount;
public MockDataChangeListener(int expChangeEventCount) {
+ reset(expChangeEventCount);
+ }
+
+ public void reset(int expChangeEventCount) {
changeLatch = new CountDownLatch(expChangeEventCount);
this.expChangeEventCount = expChangeEventCount;
+ changeList.clear();
}
@Override
}
public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
- assertEquals("Change notifications complete", true,
- Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+ boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
+ if(!done) {
+ fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
+ expChangeEventCount, (expChangeEventCount - changeLatch.getCount())));
+ }
+
+ assertEquals("Change notifications complete", true, done);
for(int i = 0; i < expPaths.length; i++) {
assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),