public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private boolean persistent = DEFAULT_PERSISTENT;
private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
- private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+ private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private DatastoreContext(){
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
+ public int getShardBatchedModificationCount() {
+ return shardBatchedModificationCount;
+ }
+
public static class Builder {
- private DatastoreContext datastoreContext = new DatastoreContext();
+ private final DatastoreContext datastoreContext = new DatastoreContext();
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+ datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
+ return this;
+ }
+
public DatastoreContext build() {
return datastoreContext;
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
+ * support the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+class LegacyTransactionContextImpl extends TransactionContextImpl {
+
+ LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+ ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ }
+
+ @Override
+ public void deleteData(YangInstanceIdentifier path) {
+ recordedOperationFutures.add(executeOperationAsync(
+ new DeleteData(path, getRemoteTransactionVersion())));
+ }
+
+ @Override
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ recordedOperationFutures.add(executeOperationAsync(
+ new MergeData(path, data, getRemoteTransactionVersion())));
+ }
+
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ recordedOperationFutures.add(executeOperationAsync(
+ new WriteData(path, data, getRemoteTransactionVersion())));
+ }
+}
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
final class OperationCompleter extends OnComplete<Object> {
private final Semaphore operationLimiter;
}
@Override
- public void onComplete(Throwable throwable, Object o){
- this.operationLimiter.release();
+ public void onComplete(Throwable throwable, Object message) {
+ if(message instanceof BatchedModificationsReply) {
+ this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
+ } else {
+ this.operationLimiter.release();
+ }
}
}
\ No newline at end of file
try {
final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
- sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
+ sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
} catch (Exception e) {
LOG.debug(String.format("Unexpected error reading path %s", path), e);
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
*/
public class ShardWriteTransaction extends ShardTransaction {
- private final MutableCompositeModification modification = new MutableCompositeModification();
+ private final MutableCompositeModification compositeModification = new MutableCompositeModification();
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@Override
public void handleReceive(Object message) throws Exception {
- if (message instanceof WriteData) {
- writeData(transaction, (WriteData) message, !SERIALIZED_REPLY);
-
- } else if (message instanceof MergeData) {
- mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY);
-
- } else if (message instanceof DeleteData) {
- deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
-
+ if (message instanceof BatchedModifications) {
+ batchedModifications((BatchedModifications)message);
} else if (message instanceof ReadyTransaction) {
readyTransaction(transaction, !SERIALIZED_REPLY);
-
+ } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readyTransaction(transaction, SERIALIZED_REPLY);
} else if(WriteData.isSerializedType(message)) {
writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
} else if(DeleteData.isSerializedType(message)) {
deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
- } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, SERIALIZED_REPLY);
-
} else if (message instanceof GetCompositedModification) {
// This is here for testing only
- getSender().tell(new GetCompositeModificationReply(modification), getSelf());
+ getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
} else {
super.handleReceive(message);
}
}
+ private void batchedModifications(BatchedModifications batched) {
+ try {
+ for(Modification modification: batched.getModifications()) {
+ compositeModification.addModification(modification);
+ modification.apply(transaction);
+ }
+
+ getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+ } catch (Exception e) {
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
boolean returnSerialized) {
LOG.debug("writeData at path : {}", message.getPath());
- modification.addModification(
+ compositeModification.addModification(
new WriteModification(message.getPath(), message.getData()));
try {
transaction.write(message.getPath(), message.getData());
boolean returnSerialized) {
LOG.debug("mergeData at path : {}", message.getPath());
- modification.addModification(
+ compositeModification.addModification(
new MergeModification(message.getPath(), message.getData()));
try {
boolean returnSerialized) {
LOG.debug("deleteData at path : {}", message.getPath());
- modification.addModification(new DeleteModification(message.getPath()));
+ compositeModification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
- cohort, modification, returnSerialized), getContext());
+ cohort, compositeModification, returnSerialized), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-final class TransactionContextImpl extends AbstractTransactionContext {
+class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
private final ActorContext actorContext;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
- private final OperationCompleter operationCompleter;
+ private final OperationCompleter operationCompleter;
+ private BatchedModifications batchedModifications;
TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
return actor;
}
- private Future<Object> executeOperationAsync(SerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+ protected short getRemoteTransactionVersion() {
+ return remoteTransactionVersion;
}
- private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
- return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
- msg.toSerializable(remoteTransactionVersion)));
+ protected Future<Object> executeOperationAsync(SerializableMessage msg) {
+ return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
}
@Override
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
+ // Send the remaining batched modifications if any.
+
+ sendBatchedModifications();
+
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
+ private void batchModification(Modification modification) {
+ if(batchedModifications == null) {
+ batchedModifications = new BatchedModifications(remoteTransactionVersion);
+ }
+
+ batchedModifications.addModification(modification);
+
+ if(batchedModifications.getModifications().size() >=
+ actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ sendBatchedModifications();
+ }
+ }
+
+ private void sendBatchedModifications() {
+ if(batchedModifications != null) {
+ LOG.debug("Tx {} sending {} batched modifications", identifier,
+ batchedModifications.getModifications().size());
+
+ recordedOperationFutures.add(executeOperationAsync(batchedModifications));
+ batchedModifications = null;
+ }
+ }
+
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new DeleteData(path)));
+ batchModification(new DeleteModification(path));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
+ batchModification(new MergeModification(path, data));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
+ batchModification(new WriteModification(path, data));
}
@Override
LOG.debug("Tx {} readData called path = {}", identifier, path);
+ // Send the remaining batched modifications if any.
+
+ sendBatchedModifications();
+
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// uncommitted semantics of the public API contract. If any one fails then fail the read.
LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ // Send the remaining batched modifications if any.
+
+ sendBatchedModifications();
+
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// uncommitted semantics of the public API contract. If any one fails then fail this
private void throttleOperation(int acquirePermits) {
try {
- if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+ if(!operationLimiter.tryAcquire(acquirePermits,
+ actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
}
} catch (InterruptedException e) {
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
String transactionPath = reply.getTransactionPath();
- LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
+ LOG.debug("Tx {} Received {}", identifier, reply);
ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+ return new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ } else {
+ return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ }
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+
+/**
+ * Message used to batch write, merge, delete modification operations to the ShardTransaction actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
+ private static final long serialVersionUID = 1L;
+
+ public BatchedModifications() {
+ }
+
+ public BatchedModifications(short version) {
+ super(version);
+ }
+
+ @Override
+ public Object toSerializable() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * The reply for the BatchedModifications message.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsReply extends VersionedExternalizableMessage {
+ private static final long serialVersionUID = 1L;
+
+ private int numBatched;
+
+ public BatchedModificationsReply() {
+ }
+
+ public BatchedModificationsReply(int numBatched) {
+ this.numBatched = numBatched;
+ }
+
+
+ public int getNumBatched() {
+ return numBatched;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ numBatched = in.readInt();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeInt(numBatched);
+ }
+
+ @Override
+ public Object toSerializable() {
+ return this;
+ }
+}
(short)o.getMessageVersion());
}
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=")
+ .append(transactionId).append(", version=").append(version).append("]");
+ return builder.toString();
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class DeleteData implements VersionedSerializableMessage, Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class DeleteData extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
public static final Class<DeleteData> SERIALIZABLE_CLASS = DeleteData.class;
private YangInstanceIdentifier path;
- private short version;
public DeleteData() {
}
- public DeleteData(final YangInstanceIdentifier path) {
+ public DeleteData(final YangInstanceIdentifier path, short version) {
+ super(version);
this.path = path;
}
return path;
}
- public short getVersion() {
- return version;
- }
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- version = in.readShort(); // Read the version - don't need to do anything with it now
+ super.readExternal(in);
path = SerializationUtils.deserializePath(in);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(version);
+ super.writeExternal(out);
SerializationUtils.serializePath(path, out);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- version = toVersion;
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
// To base or R1 Helium version
} else {
// From base or R1 Helium version
ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
- return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+ return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
public class DeleteDataReply extends EmptyReply {
+ private static final long serialVersionUID = 1L;
private static final Object LEGACY_SERIALIZED_INSTANCE =
ShardTransactionMessages.DeleteDataReply.newBuilder().build();
*
* @author Thomas Pantelis
*/
-public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+public abstract class EmptyReply extends EmptyExternalizable {
private final Object legacySerializedInstance;
this.legacySerializedInstance = legacySerializedInstance;
}
- @Override
public Object toSerializable(short toVersion) {
return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class MergeData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class MergeData extends ModifyData {
private static final long serialVersionUID = 1L;
public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
public MergeData() {
}
- public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- super(path, data);
+ public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+ super(path, data, version);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- setVersion(toVersion);
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
// To base or R1 Helium version
ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
public class MergeDataReply extends EmptyReply {
private static final long serialVersionUID = 1L;
package org.opendaylight.controller.cluster.datastore.messages;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public abstract class ModifyData implements Externalizable {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public abstract class ModifyData extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
private YangInstanceIdentifier path;
private NormalizedNode<?, ?> data;
- private short version;
protected ModifyData() {
}
- protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+ super(version);
this.path = path;
this.data = data;
}
return data;
}
- public short getVersion() {
- return version;
- }
-
- protected void setVersion(short version) {
- this.version = version;
- }
-
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- version = in.readShort();
+ super.readExternal(in);
SerializationUtils.deserializePathAndNode(in, this, APPLIER);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(version);
+ super.writeExternal(out);
SerializationUtils.serializePathAndNode(path, data, out);
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.protobuf.ByteString;
-import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+public class ReadDataReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
private NormalizedNode<?, ?> normalizedNode;
- private short version;
public ReadDataReply() {
}
- public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
+ public ReadDataReply(NormalizedNode<?, ?> normalizedNode, short version) {
+ super(version);
this.normalizedNode = normalizedNode;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- version = in.readShort();
+ super.readExternal(in);
normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(version);
+ super.writeExternal(out);
SerializationUtils.serializeNormalizedNode(normalizedNode, out);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- version = toVersion;
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
return toSerializableReadDataReply(normalizedNode);
} else {
ShardTransactionMessages.ReadDataReply o =
(ShardTransactionMessages.ReadDataReply) serializable;
- return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+ return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Abstract base class for a versioned Externalizable message.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
+ private static final long serialVersionUID = 1L;
+
+ private short version;
+
+ public VersionedExternalizableMessage() {
+ }
+
+ public VersionedExternalizableMessage(short version) {
+ this.version = version;
+ }
+
+ public short getVersion() {
+ return version;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ version = in.readShort();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(version);
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-/**
- * Interface for a Serializable message with versioning.
- *
- * @author Thomas Pantelis
- */
-public interface VersionedSerializableMessage {
- Object toSerializable(short toVersion);
-}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class WriteData extends ModifyData implements VersionedSerializableMessage {
+/**
+ * @deprecated Replaced by BatchedModifications.
+ */
+@Deprecated
+public class WriteData extends ModifyData {
private static final long serialVersionUID = 1L;
public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
public WriteData() {
}
- public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- super(path, data);
+ public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, short version) {
+ super(path, data, version);
}
@Override
- public Object toSerializable(short toVersion) {
- if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
- setVersion(toVersion);
+ public Object toSerializable() {
+ if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
return this;
} else {
// To base or R1 Helium version
ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(),
+ DataStoreVersions.HELIUM_2_VERSION);
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+/**
+ * @deprecated Replaced by BatchedModificationsReply.
+ */
+@Deprecated
public class WriteDataReply extends EmptyReply {
private static final long serialVersionUID = 1L;
public abstract class AbstractModification implements Modification {
private YangInstanceIdentifier path;
+ private short version;
- protected AbstractModification() {
+ protected AbstractModification(short version) {
+ this.version = version;
}
protected AbstractModification(YangInstanceIdentifier path) {
public YangInstanceIdentifier getPath() {
return path;
}
+
+ public short getVersion() {
+ return version;
+ }
}
private static final long serialVersionUID = 1L;
public DeleteModification() {
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public DeleteModification(short version) {
+ super(version);
}
public DeleteModification(YangInstanceIdentifier path) {
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort();
setPath(SerializationUtils.deserializePath(in));
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
SerializationUtils.serializePath(getPath(), out);
}
return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
}
- public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
- DeleteModification mod = new DeleteModification();
+ public static DeleteModification fromStream(ObjectInput in, short version)
+ throws ClassNotFoundException, IOException {
+ DeleteModification mod = new DeleteModification(version);
mod.readExternal(in);
return mod;
}
import java.io.IOException;
import java.io.ObjectInput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
private static final long serialVersionUID = 1L;
public MergeModification() {
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public MergeModification(short version) {
+ super(version);
}
public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
}
- public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
- MergeModification mod = new MergeModification();
+ public static MergeModification fromStream(ObjectInput in, short version)
+ throws ClassNotFoundException, IOException {
+ MergeModification mod = new MergeModification(version);
mod.readExternal(in);
return mod;
}
public class MutableCompositeModification implements CompositeModification {
private static final long serialVersionUID = 1L;
- private final List<Modification> modifications;
+ private final List<Modification> modifications = new ArrayList<>();
+ private short version;
public MutableCompositeModification() {
- modifications = new ArrayList<>();
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public MutableCompositeModification(short version) {
+ this.version = version;
}
@Override
return COMPOSITE;
}
+ public short getVersion() {
+ return version;
+ }
+
+ public void setVersion(short version) {
+ this.version = version;
+ }
+
/**
* Add a new Modification to the list of Modifications represented by this
* composite
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort();
+ version = in.readShort();
int size = in.readInt();
byte type = in.readByte();
switch(type) {
case Modification.WRITE:
- modifications.add(WriteModification.fromStream(in));
+ modifications.add(WriteModification.fromStream(in, version));
break;
case Modification.MERGE:
- modifications.add(MergeModification.fromStream(in));
+ modifications.add(MergeModification.fromStream(in, version));
break;
case Modification.DELETE:
- modifications.add(DeleteModification.fromStream(in));
+ modifications.add(DeleteModification.fromStream(in, version));
break;
}
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
+ out.writeShort(version);
out.writeInt(modifications.size());
builder.setTimeStamp(System.nanoTime());
for (Modification m : modifications) {
- builder.addModification(
- (PersistentMessages.Modification) m.toSerializable());
+ builder.addModification((PersistentMessages.Modification) m.toSerializable());
}
return builder.build();
private NormalizedNode<?, ?> data;
public WriteModification() {
+ this(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ public WriteModification(short version) {
+ super(version);
}
public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort(); // version
-
SerializationUtils.deserializePathAndNode(in, this, APPLIER);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
SerializationUtils.serializePathAndNode(getPath(), data, out);
}
return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
}
- public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
- WriteModification mod = new WriteModification();
+ public static WriteModification fromStream(ObjectInput in, short version)
+ throws ClassNotFoundException, IOException {
+ WriteModification mod = new WriteModification(version);
mod.readExternal(in);
return mod;
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
-
import scala.concurrent.duration.Duration;
public class DistributedConfigDataStoreProviderModule extends
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
.transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+ .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
.build();
return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.osgi.framework.BundleContext;
-
import scala.concurrent.duration.Duration;
public class DistributedOperationalDataStoreProviderModule extends
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
.transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
+ .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
.build();
return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
an operation (eg transaction create).";
}
+ leaf shard-batched-modification-count {
+ default 100;
+ type non-zero-uint32-type;
+ description "The number of transaction modification operations (put, merge, delete) to
+ batch before sending to the shard transaction actor. Batching improves
+ performance as less modifications messages are sent to the actor and thus
+ lessens the chance that the transaction actor's mailbox queue could get full.";
+ }
+
leaf enable-metric-capture {
default false;
type boolean;
assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT, build.getShardBatchedModificationCount());
}
-}
\ No newline at end of file
+}
schemaContext = TestModel.createTestContext();
doReturn(schemaContext).when(actorContext).getSchemaContext();
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationCompleterTest {
+
+ @Test
+ public void testOnComplete() throws Exception {
+ int permits = 10;
+ Semaphore operationLimiter = new Semaphore(permits);
+ operationLimiter.acquire(permits);
+ int availablePermits = 0;
+
+ OperationCompleter completer = new OperationCompleter(operationLimiter );
+
+ completer.onComplete(null, new DataExistsReply(true));
+ assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+ completer.onComplete(null, new DataExistsReply(true));
+ assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+ completer.onComplete(null, new IllegalArgumentException());
+ assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
+
+ completer.onComplete(null, new BatchedModificationsReply(4));
+ availablePermits += 4;
+ assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
+ }
+}
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
- DataStoreVersions.BASE_HELIUM_VERSION), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
- expectMsgClass(duration, WriteDataReply.class);
+ expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
+ DataStoreVersions.BASE_HELIUM_VERSION).getClass());
// Ready the Tx
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
"testOnReceiveWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
- DataStoreVersions.HELIUM_2_VERSION), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
// unserialized write
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
getRef());
expectMsgClass(duration("5 seconds"), WriteDataReply.class);
"testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
- DataStoreVersions.HELIUM_2_VERSION), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
//unserialized merge
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
getRef());
expectMsgClass(duration("5 seconds"), MergeDataReply.class);
final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
"testDeleteData");
- transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
- DataStoreVersions.HELIUM_2_VERSION), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
+ toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
assertModification(transaction, DeleteModification.class);
//unserialized
- transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
}};
}
+ @Test
+ public void testOnReceiveBatchedModifications() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+ final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+ YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+ BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.addModification(new MergeModification(mergePath, mergeData));
+ batched.addModification(new DeleteModification(deletePath));
+
+ transaction.tell(batched, getRef());
+
+ BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+ assertEquals("getNumBatched", 3, reply.getNumBatched());
+
+ JavaTestKit verification = new JavaTestKit(getSystem());
+ transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
+
+ CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
+ GetCompositeModificationReply.class).getModification();
+
+ assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
+
+ WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
+ assertEquals("getPath", writePath, write.getPath());
+ assertEquals("getData", writeData, write.getData());
+
+ MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
+ assertEquals("getPath", mergePath, merge.getPath());
+ assertEquals("getData", mergeData, merge.getData());
+
+ DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
+ assertEquals("getPath", deletePath, delete.getPath());
+
+ InOrder inOrder = Mockito.inOrder(mockWriteTx);
+ inOrder.verify(mockWriteTx).write(writePath, writeData);
+ inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
+ inOrder.verify(mockWriteTx).delete(deletePath);
+ }};
+ }
@Test
public void testOnReceiveReadyTransaction() throws Exception {
DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
- DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
+ transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
+ toSerializable(), ActorRef.noSender());
}
@Test
actorContext.setSchemaContext(schemaContext);
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+ doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext();
}
@SuppressWarnings("resource")
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@Mock
private ClusterWrapper mockClusterWrapper;
- String memberName = "mock-member";
+ private final String memberName = "mock-member";
+
+ private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
+ shardBatchedModificationCount(1);
@BeforeClass
public static void setUpClass() throws IOException {
schemaContext = TestModel.createTestContext();
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
-
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
- doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+ doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
ShardStrategyFactory.setConfiguration(configuration);
}
private ReadData eqSerializedReadData() {
+ return eqSerializedReadData(TestModel.TEST_PATH);
+ }
+
+ private ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
public boolean matches(Object argument) {
return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
- ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
+ ReadData.fromSerializable(argument).getPath().equals(path);
}
};
return argThat(matcher);
}
- private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
- return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
- }
-
- private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
- final int transactionVersion) {
+ private WriteData eqLegacyWriteData(final NormalizedNode<?, ?> nodeToWrite) {
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
- if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
- WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
- (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
- ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
-
+ if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) {
WriteData obj = WriteData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
}
return false;
return argThat(matcher);
}
- private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
- ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
- @Override
- public boolean matches(Object argument) {
- if(argument instanceof WriteData) {
- WriteData obj = (WriteData) argument;
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
- }
- return false;
- }
- };
-
- return argThat(matcher);
- }
-
- private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
- return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
- }
-
- private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
- final int transactionVersion) {
+ private MergeData eqLegacyMergeData(final NormalizedNode<?, ?> nodeToWrite) {
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
- if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
- MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
- (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
- ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
-
+ if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) {
MergeData obj = MergeData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite);
}
return false;
return argThat(matcher);
}
- private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
- ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
- @Override
- public boolean matches(Object argument) {
- if(argument instanceof MergeData) {
- MergeData obj = ((MergeData) argument);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
- }
-
- return false;
- }
- };
-
- return argThat(matcher);
- }
-
- private DeleteData eqSerializedDeleteData() {
- ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
- @Override
- public boolean matches(Object argument) {
- return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
- DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
- }
- };
-
- return argThat(matcher);
- }
-
- private DeleteData eqDeleteData() {
+ private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) {
ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
@Override
public boolean matches(Object argument) {
- return argument instanceof DeleteData &&
- ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+ return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(expPath);
}
};
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
short transactionVersion) {
- return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+ return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
}
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
}
private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(data));
+ return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
}
private Future<Object> dataExistsSerializedReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists));
}
- private Future<Object> writeSerializedDataReply(short version) {
- return Futures.successful(new WriteDataReply().toSerializable(version));
- }
-
- private Future<Object> writeSerializedDataReply() {
- return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
- }
-
- private Future<WriteDataReply> writeDataReply() {
- return Futures.successful(new WriteDataReply());
- }
-
- private Future<Object> mergeSerializedDataReply(short version) {
- return Futures.successful(new MergeDataReply().toSerializable(version));
- }
-
- private Future<Object> mergeSerializedDataReply() {
- return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+ private Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+ return Futures.successful(new BatchedModificationsReply(count));
}
private Future<Object> incompleteFuture(){
return mock(Future.class);
}
- private Future<MergeDataReply> mergeDataReply() {
- return Futures.successful(new MergeDataReply());
+ private ActorSelection actorSelection(ActorRef actorRef) {
+ return getSystem().actorSelection(actorRef.path());
+ }
+
+ private void expectBatchedModifications(ActorRef actorRef, int count) {
+ doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
- private Future<Object> deleteSerializedDataReply(short version) {
- return Futures.successful(new DeleteDataReply().toSerializable(version));
+ private void expectBatchedModifications(int count) {
+ doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(BatchedModifications.class));
}
- private Future<Object> deleteSerializedDataReply() {
- return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+ private void expectIncompleteBatchedModifications() {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(BatchedModifications.class));
}
- private Future<DeleteDataReply> deleteDataReply() {
- return Futures.successful(new DeleteDataReply());
+ private void expectReadyTransaction(ActorRef actorRef) {
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
}
- private ActorSelection actorSelection(ActorRef actorRef) {
- return getSystem().actorSelection(actorRef.path());
+ private void expectFailedBatchedModifications(ActorRef actorRef) {
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
public void testRead() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
}
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
}
@Test(expected = TestException.class)
public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+ when(mockActorContext).getDatastoreContext();
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ expectFailedBatchedModifications(actorRef);
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
+ expectBatchedModifications(actorRef, 1);
doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, expectedNode);
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
-
assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+ InOrder inOrder = Mockito.inOrder(mockActorContext);
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
}
@Test(expected=IllegalStateException.class)
public void testReadPreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
-
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.read(TestModel.TEST_PATH);
}
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedDataExists());
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
}
@Test(expected = TestException.class)
public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
+ when(mockActorContext).getDatastoreContext();
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ expectFailedBatchedModifications(actorRef);
doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedDataExists());
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedDataExists());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", true, exists);
+
+ InOrder inOrder = Mockito.inOrder(mockActorContext);
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
}
@Test(expected=IllegalStateException.class)
public void testExistsPreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
-
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.exists(TestModel.TEST_PATH);
}
// Expected
}
} else {
- assertEquals("Recording operation Future result type", expResultType,
+ assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class);
+ BatchedModificationsReply.class);
}
@Test
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
throw caughtEx.get();
}
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class);
+ BatchedModificationsReply.class);
}
@Test(expected=IllegalStateException.class)
public void testWritePreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
-
- transactionProxy.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test(expected=IllegalStateException.class)
public void testWriteAfterReadyPreConditionCheck() {
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.ready();
- transactionProxy.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.class);
+ BatchedModificationsReply.class);
}
@Test
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
- doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDeleteData());
+ // This sends the batched modification.
+ transactionProxy.ready();
+
+ verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- DeleteDataReply.class);
+ BatchedModificationsReply.class);
}
private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
+ expectReadyTransaction(actorRef);
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.read(TestModel.TEST_PATH);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class);
+ BatchedModificationsReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(BatchedModifications.class));
}
private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+ doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
- doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
+ doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode));
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
+
+ expectReadyTransaction(actorRef);
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
transactionProxy.merge(TestModel.TEST_PATH, testNode);
+ transactionProxy.delete(TestModel.TEST_PATH);
+
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
+ ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class,
+ ShardTransactionMessages.DeleteDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ expectFailedBatchedModifications(actorRef);
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
-
- doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ expectReadyTransaction(actorRef);
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verifyCohortFutures(proxy, TestException.class);
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.class, TestException.class);
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
}
@Test
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.class);
+ BatchedModificationsReply.class);
verifyCohortFutures(proxy, TestException.class);
}
doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
mockActorContext).findPrimaryShardAsync(anyString());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(actorRef, 1);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
transactionProxy.read(TestModel.TEST_PATH);
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
}
@Test
- public void testLocalTxActorWrite() throws Exception {
+ public void testLocalTxActorReady() throws Exception {
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+ setTransactionId("txn-1").setTransactionActorPath(actorPath).
+ setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, WRITE_ONLY));
doReturn(true).when(mockActorContext).isPathLocal(actorPath);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(BatchedModifications.class));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- verify(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
-
- //testing local merge
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToWrite));
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- verify(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToWrite));
-
-
- //testing local delete
- doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
- transactionProxy.delete(TestModel.TEST_PATH);
-
- verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+ BatchedModificationsReply.class);
// testing ready
doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
}
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+ setTransactionId("txn-1").setTransactionActorPath(actorPath).
+ setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
long end = System.nanoTime();
- Assert.assertTrue(String.format("took less time than expected %s was %s",
- TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
- (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+ Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+ expected, (end-start)), (end - start) > expected);
}
}
String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1")
- .setTransactionActorPath(actorPath)
- .build();
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
+ setTransactionId("txn-1").setTransactionActorPath(actorPath).
+ setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
long end = System.nanoTime();
- Assert.assertTrue(String.format("took more time than expected %s was %s",
- TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
- (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+ Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+ expected, (end-start)), (end - start) <= expected);
}
public void testWriteThrottling(boolean shardFound){
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectBatchedModifications(2);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectIncompleteBatchedModifications();
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
}
});
-
}
@Test
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectBatchedModifications(2);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+ expectBatchedModifications(2);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
}
});
-
}
@Test
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToMerge));
+ expectIncompleteBatchedModifications();
transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToMerge));
+ expectBatchedModifications(2);
transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqMergeData(nodeToMerge));
+ expectBatchedModifications(2);
transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
+ expectIncompleteBatchedModifications();
transactionProxy.delete(TestModel.TEST_PATH);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
+ expectBatchedModifications(2);
transactionProxy.delete(TestModel.TEST_PATH);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
- doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDeleteData());
+ expectBatchedModifications(2);
transactionProxy.delete(TestModel.TEST_PATH);
public void run(TransactionProxy transactionProxy) {
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
+ expectBatchedModifications(1);
doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), any(ReadyTransaction.class));
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(nodeToWrite));
-
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqWriteData(carsNode));
+ expectBatchedModifications(2);
doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), any(ReadyTransaction.class));
}
}, 2, true);
}
+
+ @Test
+ public void testModificationOperationBatching() throws Throwable {
+ int shardBatchedModificationCount = 3;
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+ when(mockActorContext).getDatastoreContext();
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+ expectReadyTransaction(actorRef);
+
+ YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
+ NormalizedNode<?, ?> writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+ YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
+ NormalizedNode<?, ?> mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+ YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
+ YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ transactionProxy.write(writePath1, writeNode1);
+ transactionProxy.write(writePath2, writeNode2);
+ transactionProxy.delete(deletePath1);
+ transactionProxy.merge(mergePath1, mergeNode1);
+ transactionProxy.merge(mergePath2, mergeNode2);
+ transactionProxy.write(writePath3, writeNode3);
+ transactionProxy.merge(mergePath3, mergeNode3);
+ transactionProxy.delete(deletePath2);
+
+ // This sends the last batch.
+ transactionProxy.ready();
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
+
+ verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
+
+ verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+ new DeleteModification(deletePath2));
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+
+ @Test
+ public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+ int shardBatchedModificationCount = 10;
+ doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
+ when(mockActorContext).getDatastoreContext();
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+ expectBatchedModifications(actorRef, shardBatchedModificationCount);
+
+ YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
+ NormalizedNode<?, ?> mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
+
+ YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
+
+ doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+ doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+ doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ transactionProxy.write(writePath1, writeNode1);
+ transactionProxy.write(writePath2, writeNode2);
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(writePath2).
+ get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", writeNode2, readOptional.get());
+
+ transactionProxy.merge(mergePath1, mergeNode1);
+ transactionProxy.merge(mergePath2, mergeNode2);
+
+ readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
+
+ transactionProxy.delete(deletePath);
+
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+ assertEquals("Exists response", true, exists);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", mergeNode2, readOptional.get());
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ new WriteModification(writePath2, writeNode2));
+
+ verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ new MergeModification(mergePath2, mergeNode2));
+
+ verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+
+ InOrder inOrder = Mockito.inOrder(mockActorContext);
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(writePath2));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+
+ inOrder.verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+
+ private List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+ ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
+ ArgumentCaptor.forClass(BatchedModifications.class);
+ verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
+ eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
+
+ List<BatchedModifications> batchedModifications = filterCaptured(
+ batchedModificationsCaptor, BatchedModifications.class);
+ return batchedModifications;
+ }
+
+ private <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+ List<T> captured = new ArrayList<>();
+ for(T c: captor.getAllValues()) {
+ if(type.isInstance(c)) {
+ captured.add(c);
+ }
+ }
+
+ return captured;
+ }
+
+ private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), expected);
+ }
+
+ private void verifyBatchedModifications(Object message, Modification... expected) {
+ assertEquals("Message type", BatchedModifications.class, message.getClass());
+ BatchedModifications batchedModifications = (BatchedModifications)message;
+ assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+ for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
+ Modification actual = batchedModifications.getModifications().get(i);
+ assertEquals("Modification type", expected[i].getClass(), actual.getClass());
+ assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
+ ((AbstractModification)actual).getPath());
+ if(actual instanceof WriteModification) {
+ assertEquals("getData", ((WriteModification)expected[i]).getData(),
+ ((WriteModification)actual).getData());
+ }
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for BatchedModifications.
+ *
+ * @author Thomas Pantelis
+ */
+public class BatchedModificationsTest {
+
+ @Test
+ public void testSerialization() {
+ YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+ NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+ YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
+ BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ batched.addModification(new WriteModification(writePath, writeData));
+ batched.addModification(new MergeModification(mergePath, mergeData));
+ batched.addModification(new DeleteModification(deletePath));
+
+ BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
+ (Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
+ assertEquals("getModifications size", 3, clone.getModifications().size());
+
+ WriteModification write = (WriteModification)clone.getModifications().get(0);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
+ assertEquals("getPath", writePath, write.getPath());
+ assertEquals("getData", writeData, write.getData());
+
+ MergeModification merge = (MergeModification)clone.getModifications().get(1);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
+ assertEquals("getPath", mergePath, merge.getPath());
+ assertEquals("getData", mergeData, merge.getData());
+
+ DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
+ assertEquals("getPath", deletePath, delete.getPath());
+ }
+
+ @Test
+ public void testBatchedModificationsReplySerialization() {
+ BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
+ (Serializable) new BatchedModificationsReply(100).toSerializable());
+ assertEquals("getNumBatched", 100, clone.getNumBatched());
+ }
+}
*
* @author Thomas Pantelis
*/
+@Deprecated
public class DeleteDataTest {
@Test
public void testSerialization() {
YangInstanceIdentifier path = TestModel.TEST_PATH;
- DeleteData expected = new DeleteData(path);
+ DeleteData expected = new DeleteData(path, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", DeleteData.class, serialized.getClass());
assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion());
Object clone = SerializationUtils.clone((Serializable) serialized);
- assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion());
DeleteData actual = DeleteData.fromSerializable(clone);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getPath", expected.getPath(), actual.getPath());
}
public void testSerializationWithHeliumR1Version() throws Exception {
YangInstanceIdentifier path = TestModel.TEST_PATH;
- DeleteData expected = new DeleteData(path);
+ DeleteData expected = new DeleteData(path, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass());
DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+@Deprecated
public class MergeDataTest {
@Test
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- MergeData expected = new MergeData(path, data);
+ MergeData expected = new MergeData(path, data, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", MergeData.class, serialized.getClass());
assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
Object clone = SerializationUtils.clone((Serializable) serialized);
- assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
MergeData actual = MergeData.fromSerializable(clone);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getPath", expected.getPath(), actual.getPath());
assertEquals("getData", expected.getData(), actual.getData());
}
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- MergeData expected = new MergeData(path, data);
+ MergeData expected = new MergeData(path, data, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- ReadDataReply expected = new ReadDataReply(data);
+ ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
(Serializable) serialized));
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
}
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- ReadDataReply expected = new ReadDataReply(data);
+ ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
*
* @author Thomas Pantelis
*/
+@Deprecated
public class WriteDataTest {
@Test
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- WriteData expected = new WriteData(path, data);
+ WriteData expected = new WriteData(path, data, DataStoreVersions.CURRENT_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", WriteData.class, serialized.getClass());
assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
Object clone = SerializationUtils.clone((Serializable) serialized);
- assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
WriteData actual = WriteData.fromSerializable(clone);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
assertEquals("getPath", expected.getPath(), actual.getPath());
assertEquals("getData", expected.getData(), actual.getData());
}
new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- WriteData expected = new WriteData(path, data);
+ WriteData expected = new WriteData(path, data, DataStoreVersions.HELIUM_1_VERSION);
- Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ Object serialized = expected.toSerializable();
assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
import org.apache.commons.lang.SerializationUtils;
import org.junit.Ignore;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+
assertEquals("getModifications size", 3, clone.getModifications().size());
WriteModification write = (WriteModification)clone.getModifications().get(0);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion());
assertEquals("getPath", writePath, write.getPath());
assertEquals("getData", writeData, write.getData());
MergeModification merge = (MergeModification)clone.getModifications().get(1);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion());
assertEquals("getPath", mergePath, merge.getPath());
assertEquals("getData", mergeData, merge.getData());
DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+ assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
}
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
- public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+ node(OUTER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+ node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");