// required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
boolean hasInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
return instanceIdentifierPathArguments_;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
return instanceIdentifierPathArguments_;
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder clearInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
bitField0_ |= 0x00000001;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
if (instanceIdentifierPathArgumentsBuilder_ != null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
// required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
boolean hasInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
return instanceIdentifierPathArguments_;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
return instanceIdentifierPathArguments_;
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder clearInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
bitField0_ |= 0x00000001;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
if (instanceIdentifierPathArgumentsBuilder_ != null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
}
message WriteData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+ // base Helium version
+ required InstanceIdentifier instanceIdentifierPathArguments = 1;
+ required Node normalizedNode = 2;
}
message WriteDataReply{
}
message MergeData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+ // base Helium version
+ required InstanceIdentifier instanceIdentifierPathArguments = 1;
+ required Node normalizedNode = 2;
}
message MergeDataReply{
// It seems the sender is never null but it doesn't hurt to check. If the caller passes in
// a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(new DataChangedReply(), getSelf());
+ getSender().tell(DataChangedReply.INSTANCE, getSelf());
}
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* DataChangeListenerProxy represents a single remote DataChangeListener
*/
public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
private final ActorSelection dataChangeListenerActor;
- private final SchemaContext schemaContext;
- public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
- this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
- this.schemaContext = schemaContext;
+ public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
+ this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor,
+ "dataChangeListenerActor should not be null");
}
- @Override public void onDataChanged(
+ @Override
+ public void onDataChanged(
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
+ dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Defines version numbers.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DataStoreVersions {
+ short BASE_HELIUM_VERSION = 0;
+ short HELIUM_1_VERSION = 1;
+ short HELIUM_2_VERSION = 2;
+ short LITHIUM_VERSION = 3;
+ short CURRENT_VERSION = LITHIUM_VERSION;
+}
*/
public class Shard extends RaftActor {
- private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
commitWithNewTransaction(cohortEntry.getModification());
- sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
// currently uses a same thread executor anyway.
cohortEntry.getCohort().commit().get();
- sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
ActorRef replyActorPath = self();
- if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
shardMBean.incrementAbortTransactionsCount();
if(sender != null) {
- sender.tell(new AbortTransactionReply().toSerializable(), self);
+ sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
}
}
// This must be for install snapshot. Don't want to open this up and trigger
// deSerialization
- self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+ self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
self());
createSnapshotTransaction = null;
}
private ActorRef createTypedTransactionActor(int transactionType,
- ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+ ShardTransactionIdentifier transactionId, String transactionChainId,
+ short clientVersion ) {
DOMStoreTransactionFactory factory = store;
}
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
- String transactionChainId, int clientVersion) {
+ String transactionChainId, short clientVersion) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+ new DataChangeListenerProxy(dataChangeListenerPath);
LOG.debug("Registering for path {}", registerChangeListener.getPath());
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
"createSnapshot" + ++createSnapshotTransactionCounter, "",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
- private static final Object CAN_COMMIT_REPLY_TRUE =
- new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
-
- private static final Object CAN_COMMIT_REPLY_FALSE =
- new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
Boolean canCommit = cohortEntry.getCohort().canCommit().get();
cohortEntry.getCanCommitSender().tell(
- canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+ canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+ CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
if(!canCommit) {
// Remove the entry from the cache now since the Tx will be aborted.
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- int txnClientVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ short clientTxVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- int txnClientVersion) {
- super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ short clientTxVersion) {
+ super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
- private final int txnClientVersion;
+ private final short clientTxVersion;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID, int txnClientVersion) {
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
- this.txnClientVersion = txnClientVersion;
+ this.clientTxVersion = clientTxVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID, int txnClientVersion) {
+ String transactionID, short txnClientVersion) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
datastoreContext, shardStats, transactionID, txnClientVersion));
}
return schemaContext;
}
- protected int getTxnClientVersion() {
- return txnClientVersion;
+ protected short getClientTxVersion() {
+ return clientTxVersion;
}
@Override
getDOMStoreTransaction().close();
if(sendReply) {
- getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
+ getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
}
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
+ protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+ final boolean returnSerialized) {
final ActorRef sender = getSender();
final ActorRef self = getSelf();
final YangInstanceIdentifier path = message.getPath();
final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
transaction.read(path);
-
future.addListener(new Runnable() {
@Override
public void run() {
try {
Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
- sender.tell((returnSerialized ? readDataReply.toSerializable():
+ sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
readDataReply), self);
} catch (Exception e) {
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
- final int txnClientVersion;
+ final short txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID, int txnClientVersion) {
+ ShardStats shardStats, String transactionID, short txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
import akka.actor.Props;
import akka.japi.Creator;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
createTransaction(createTransaction);
} else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
chain.close();
- getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
+ getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
}else{
unknownMessage(message);
}
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- int txnClientVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ short clientTxVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
} else if (message instanceof ReadyTransaction) {
- readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY);
+ readyTransaction(transaction, !SERIALIZED_REPLY);
- } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+ } else if(WriteData.isSerializedType(message)) {
+ writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
- } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+ } else if(MergeData.isSerializedType(message)) {
+ mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
} else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
} else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY);
+ readyTransaction(transaction, SERIALIZED_REPLY);
} else if (message instanceof GetCompositedModification) {
// This is here for testing only
new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
try {
transaction.write(message.getPath(), message.getData());
- WriteDataReply writeDataReply = new WriteDataReply();
- getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply,
- getSelf());
+ WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
+ getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
+ writeDataReply, getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
try {
transaction.merge(message.getPath(), message.getData());
- MergeDataReply mergeDataReply = new MergeDataReply();
- getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply ,
- getSelf());
+ MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
+ getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
+ mergeDataReply, getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
- boolean returnSerialized) {
+ private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
String transactionID = getTransactionID();
LOG.debug("readyTransaction : {}", transactionID);
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
cohort, modification, returnSerialized), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
if(remoteTransactionActorsMB.get()) {
for(ActorSelection actor : remoteTransactionActors) {
LOG.trace("Sending CloseTransaction to {}", actor);
- actorContext.sendOperationAsync(actor,
- new CloseTransaction().toSerializable());
+ actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
}
}
}
}
}
-
-
-
-
/**
* Performs a CreateTransaction try async.
*/
private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
- private final int remoteTransactionVersion;
+ private final short remoteTransactionVersion;
private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal, int remoteTransactionVersion) {
+ boolean isTxActorLocal, short remoteTransactionVersion) {
super(identifier);
this.transactionPath = transactionPath;
this.actor = actor;
return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
}
+ private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
+ return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+ msg.toSerializable(remoteTransactionVersion));
+ }
+
@Override
public void closeTransaction() {
LOG.debug("Tx {} closeTransaction called", identifier);
- actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
+ actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
}
@Override
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = executeOperationAsync(new ReadyTransaction());
+ final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
// At some point in the future when upgrades from Helium are not supported
// we could remove this code to resolvePath and just use the cohortPath as the
// resolved cohortPath
- if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+ if(TransactionContextImpl.this.remoteTransactionVersion <
+ DataStoreVersions.HELIUM_1_VERSION) {
cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
}
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext)));
+ recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext)));
+ recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
}
@Override
ReadDataReply reply = (ReadDataReply) readResponse;
returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
- } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+ } else if (ReadDataReply.isSerializedType(readResponse)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
} else {
public static final Class<ThreePhaseCommitCohortMessages.AbortTransactionReply> SERIALIZABLE_CLASS =
ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
+ private static final Object SERIALIZED_INSTANCE =
+ ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+
+ public static final AbortTransactionReply INSTANCE = new AbortTransactionReply();
+
@Override
public Object toSerializable() {
- return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+ return SERIALIZED_INSTANCE;
}
}
public static final Class<ThreePhaseCommitCohortMessages.CanCommitTransactionReply> SERIALIZABLE_CLASS =
ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
- private final Boolean canCommit;
+ public static final CanCommitTransactionReply YES = new CanCommitTransactionReply(true);
+ public static final CanCommitTransactionReply NO = new CanCommitTransactionReply(false);
- public CanCommitTransactionReply(final Boolean canCommit) {
+ private final boolean canCommit;
+ private final Object serializedMessage;
+
+ private CanCommitTransactionReply(final boolean canCommit) {
this.canCommit = canCommit;
+ this.serializedMessage = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().
+ setCanCommit(canCommit).build();
}
- public Boolean getCanCommit() {
+ public boolean getCanCommit() {
return canCommit;
}
@Override
public Object toSerializable() {
- return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+ return serializedMessage;
}
public static CanCommitTransactionReply fromSerializable(final Object message) {
- return new CanCommitTransactionReply(
- ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit());
+ ThreePhaseCommitCohortMessages.CanCommitTransactionReply serialized =
+ (ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message;
+ return serialized.getCanCommit() ? YES : NO;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class CloseTransaction implements SerializableMessage{
- public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CloseTransaction.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.CloseTransaction.newBuilder().build();
- }
+ public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.CloseTransaction.class;
+
+ private static final Object SERIALIZED_INSTANCE =
+ ShardTransactionMessages.CloseTransaction.newBuilder().build();
+
+ public static final CloseTransaction INSTANCE = new CloseTransaction();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
public class CloseTransactionChainReply implements SerializableMessage {
- public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
- ShardTransactionChainMessages.CloseTransactionChainReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
- }
+ public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
+ ShardTransactionChainMessages.CloseTransactionChainReply.class;
+ private static final Object SERIALIZED_INSTANCE =
+ ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
+
+ public static final CloseTransactionChainReply INSTANCE = new CloseTransactionChainReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class CloseTransactionReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CloseTransactionReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
- }
+ public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.CloseTransactionReply.class;
+
+ private static final Object SERIALIZED_INSTANCE =
+ ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
+
+ public static final CloseTransactionReply INSTANCE = new CloseTransactionReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
public static final Class<ThreePhaseCommitCohortMessages.CommitTransactionReply> SERIALIZABLE_CLASS =
ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
+ private static final Object SERIALIZED_INSTANCE =
+ ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+
+ public static final CommitTransactionReply INSTANCE = new CommitTransactionReply();
+
@Override
public Object toSerializable() {
- return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+ return SERIALIZED_INSTANCE;
}
}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
ShardTransactionMessages.CreateTransaction.class;
- public static final int HELIUM_1_VERSION = 1;
- public static final int CURRENT_VERSION = HELIUM_1_VERSION;
-
private final String transactionId;
private final int transactionType;
private final String transactionChainId;
- private final int version;
+ private final short version;
public CreateTransaction(String transactionId, int transactionType) {
this(transactionId, transactionType, "");
}
public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
- this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+ this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
}
private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
- int version) {
+ short version) {
this.transactionId = transactionId;
this.transactionType = transactionType;
this.transactionChainId = transactionChainId;
return transactionType;
}
- public int getVersion() {
+ public short getVersion() {
return version;
}
(ShardTransactionMessages.CreateTransaction) message;
return new CreateTransaction(createTransaction.getTransactionId(),
createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
- createTransaction.getMessageVersion());
+ (short)createTransaction.getMessageVersion());
}
public String getTransactionChainId() {
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class CreateTransactionReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.CreateTransactionReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CreateTransactionReply.class;
+ public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
private final String transactionPath;
private final String transactionId;
- private final int version;
+ private final short version;
- public CreateTransactionReply(final String transactionPath,
- final String transactionId) {
- this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+ public CreateTransactionReply(String transactionPath, String transactionId) {
+ this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
}
public CreateTransactionReply(final String transactionPath,
- final String transactionId, final int version) {
+ final String transactionId, final short version) {
this.transactionPath = transactionPath;
this.transactionId = transactionId;
this.version = version;
return transactionId;
}
- public int getVersion() {
+ public short getVersion() {
return version;
}
.build();
}
- public static CreateTransactionReply fromSerializable(final Object serializable){
+ public static CreateTransactionReply fromSerializable(Object serializable){
ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
- return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
+ return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+ (short)o.getMessageVersion());
}
}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DataChanged implements SerializableMessage {
- public static final Class<DataChangeListenerMessages.DataChanged> SERIALIZABLE_CLASS =
- DataChangeListenerMessages.DataChanged.class;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
- final private SchemaContext schemaContext;
- private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
- change;
+public class DataChanged implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
+ public DataChanged() {
+ }
- public DataChanged(SchemaContext schemaContext,
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
this.change = change;
- this.schemaContext = schemaContext;
}
-
public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
return change;
}
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ in.readShort(); // Read the version
- private NormalizedNodeMessages.Node convertToNodeTree(
- NormalizedNode<?, ?> normalizedNode) {
+ NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
- return new NormalizedNodeToNodeCodec(schemaContext)
- .encode(normalizedNode)
- .getNormalizedNode();
+ // Note: the scope passed to builder is not actually used.
+ Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
- }
+ // Read created data
- private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
- Set<YangInstanceIdentifier> removedPaths) {
- final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
- for (YangInstanceIdentifier id : removedPaths) {
- removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+ int size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ builder.addCreated(path, node);
}
- return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
- @Override
- public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
- return removedPathInstanceIds.iterator();
- }
- };
- }
+ // Read updated data
- private NormalizedNodeMessages.NodeMap convertToNodeMap(
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
- NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
- new NormalizedNodeToNodeCodec(schemaContext);
- NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
- NormalizedNodeMessages.NodeMap.newBuilder();
- NormalizedNodeMessages.NodeMapEntry.Builder builder =
- NormalizedNodeMessages.NodeMapEntry.newBuilder();
- for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
- .entrySet()) {
-
-
- NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
- InstanceIdentifierUtils.toSerializable(entry.getKey());
-
- builder.setInstanceIdentifierPath(instanceIdentifier)
- .setNormalizedNode(normalizedNodeToNodeCodec
- .encode(entry.getValue())
- .getNormalizedNode());
- nodeMapBuilder.addMapEntries(builder.build());
+ size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
+ NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
+ builder.addUpdated(path, before, after);
}
- return nodeMapBuilder.build();
- }
-
- @Override
- public Object toSerializable() {
- return DataChangeListenerMessages.DataChanged.newBuilder()
- .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
- .setCreatedData(convertToNodeMap(change.getCreatedData()))
- .setOriginalData(convertToNodeMap(change.getOriginalData()))
- .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
- .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
- .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
- .build();
- }
+ // Read removed data
- public static DataChanged fromSerialize(SchemaContext sc, Object message,
- YangInstanceIdentifier pathId) {
- DataChangeListenerMessages.DataChanged dataChanged =
- (DataChangeListenerMessages.DataChanged) message;
- DataChangedEvent event = new DataChangedEvent(sc);
- if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
- .isInitialized()) {
- event.setCreatedData(dataChanged.getCreatedData());
- }
- if (dataChanged.getOriginalData() != null && dataChanged
- .getOriginalData().isInitialized()) {
- event.setOriginalData(dataChanged.getOriginalData());
+ size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ builder.addRemoved(path, node);
}
- if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
- .isInitialized()) {
- event.setUpdateData(dataChanged.getUpdatedData());
- }
+ // Read original subtree
- if (dataChanged.getOriginalSubTree() != null && dataChanged
- .getOriginalSubTree().isInitialized()) {
- event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+ boolean present = in.readBoolean();
+ if(present) {
+ builder.setBefore(streamReader.readNormalizedNode());
}
- if (dataChanged.getUpdatedSubTree() != null && dataChanged
- .getUpdatedSubTree().isInitialized()) {
- event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
- }
+ // Read updated subtree
- if (dataChanged.getRemovedPathsList() != null && !dataChanged
- .getRemovedPathsList().isEmpty()) {
- event.setRemovedPaths(dataChanged.getRemovedPathsList());
+ present = in.readBoolean();
+ if(present) {
+ builder.setAfter(streamReader.readNormalizedNode());
}
- return new DataChanged(sc, event);
-
+ change = builder.build();
}
- static class DataChangedEvent implements
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
- private final NormalizedNodeToNodeCodec nodeCodec;
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
- private NormalizedNode<?, ?> originalSubTree;
- private NormalizedNode<?, ?> updatedSubTree;
- private Set<YangInstanceIdentifier> removedPathIds;
-
- DataChangedEvent(SchemaContext schemaContext) {
- nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
- if(createdData == null){
- return Collections.emptyMap();
- }
- return createdData;
- }
-
- DataChangedEvent setCreatedData(
- NormalizedNodeMessages.NodeMap nodeMap) {
- this.createdData = convertNodeMapToMap(nodeMap);
- return this;
- }
-
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
- NormalizedNodeMessages.NodeMap nodeMap) {
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
- new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
- for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
- .getMapEntriesList()) {
- YangInstanceIdentifier id = InstanceIdentifierUtils
- .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
- mapEntries.put(id,
- nodeCodec.decode(nodeMapEntry.getNormalizedNode()));
- }
- return mapEntries;
- }
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(DataStoreVersions.CURRENT_VERSION);
+ NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+ NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter);
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
- if(updatedData == null){
- return Collections.emptyMap();
- }
- return updatedData;
- }
+ // Write created data
- DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
- this.updatedData = convertNodeMapToMap(nodeMap);
- return this;
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
+ out.writeInt(createdData.size());
+ for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+ streamWriter.writeYangInstanceIdentifier(e.getKey());
+ nodeWriter.write(e.getValue());
}
- @Override
- public Set<YangInstanceIdentifier> getRemovedPaths() {
- if (removedPathIds == null) {
- return Collections.emptySet();
- }
- return removedPathIds;
- }
+ // Write updated data
- public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
- Set<YangInstanceIdentifier> removedIds = new HashSet<>();
- for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
- removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
- }
- this.removedPathIds = removedIds;
- return this;
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
+ out.writeInt(updatedData.size());
+ for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+ streamWriter.writeYangInstanceIdentifier(e.getKey());
+ nodeWriter.write(originalData.get(e.getKey()));
+ nodeWriter.write(e.getValue());
}
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
- if (originalData == null) {
- Collections.emptyMap();
- }
- return originalData;
- }
+ // Write removed data
- DataChangedEvent setOriginalData(
- NormalizedNodeMessages.NodeMap nodeMap) {
- this.originalData = convertNodeMapToMap(nodeMap);
- return this;
+ Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
+ out.writeInt(removed.size());
+ for(YangInstanceIdentifier path: removed) {
+ streamWriter.writeYangInstanceIdentifier(path);
+ nodeWriter.write(originalData.get(path));
}
- @Override
- public NormalizedNode<?, ?> getOriginalSubtree() {
- return originalSubTree;
- }
+ // Write original subtree
- DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
- YangInstanceIdentifier instanceIdentifierPath) {
- originalSubTree = nodeCodec.decode(node);
- return this;
+ NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
+ out.writeBoolean(originalSubtree != null);
+ if(originalSubtree != null) {
+ nodeWriter.write(originalSubtree);
}
- @Override
- public NormalizedNode<?, ?> getUpdatedSubtree() {
- return updatedSubTree;
- }
+ // Write original subtree
- DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
- YangInstanceIdentifier instanceIdentifierPath) {
- updatedSubTree = nodeCodec.decode(node);
- return this;
+ NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
+ out.writeBoolean(updatedSubtree != null);
+ if(updatedSubtree != null) {
+ nodeWriter.write(updatedSubtree);
}
-
-
}
-
-
-
}
import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
public class DataChangedReply implements SerializableMessage {
- public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
- DataChangeListenerMessages.DataChangedReply.class;
- @Override
- public Object toSerializable() {
- return DataChangeListenerMessages.DataChangedReply.newBuilder().build();
- }
+ public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
+ DataChangeListenerMessages.DataChangedReply.class;
+
+ private static final Object SERIALIZED_INSTANCE =
+ DataChangeListenerMessages.DataChangedReply.newBuilder().build();
+
+ public static final DataChangedReply INSTANCE = new DataChangedReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class DeleteDataReply implements SerializableMessage{
- public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.DeleteDataReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.DeleteDataReply.newBuilder().build();
- }
+ public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.DeleteDataReply.class;
+
+ private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+
+ public static final DeleteDataReply INSTANCE = new DeleteDataReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Externalizable with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyExternalizable implements Externalizable {
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+
+/**
+ * A reply with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+
+ private final Object legacySerializedInstance;
+
+ protected EmptyReply(Object legacySerializedInstance) {
+ super();
+ this.legacySerializedInstance = legacySerializedInstance;
+ }
+
+ @Override
+ public Object toSerializable(short toVersion) {
+ return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
+ }
+}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class MergeData extends ModifyData{
+public class MergeData extends ModifyData implements VersionedSerializableMessage {
+ private static final long serialVersionUID = 1L;
- public static final Class<ShardTransactionMessages.MergeData> SERIALIZABLE_CLASS =
- ShardTransactionMessages.MergeData.class;
+ public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
- public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
- SchemaContext context) {
- super(path, data, context);
+ public MergeData() {
+ }
+
+ public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ super(path, data);
}
@Override
- public Object toSerializable() {
- Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
- return ShardTransactionMessages.MergeData.newBuilder()
- .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
- .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ public Object toSerializable(short toVersion) {
+ if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+ setVersion(toVersion);
+ return this;
+ } else {
+ // To base or R1 Helium version
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+ return ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ }
+ }
+
+ public static MergeData fromSerializable(Object serializable){
+ if(serializable instanceof MergeData) {
+ return (MergeData) serializable;
+ } else {
+ // From base or R1 Helium version
+ ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+ Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+ o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+ return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ }
}
- public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
- ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
- Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
- o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+ public static boolean isSerializedType(Object message) {
+ return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+ message instanceof ShardTransactionMessages.MergeData;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class MergeDataReply implements SerializableMessage{
- public static final Class<ShardTransactionMessages.MergeDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.MergeDataReply.class;
+public class MergeDataReply extends EmptyReply {
+ private static final long serialVersionUID = 1L;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.MergeDataReply.newBuilder().build();
- }
+ private static final Object LEGACY_SERIALIZED_INSTANCE =
+ ShardTransactionMessages.MergeDataReply.newBuilder().build();
+
+ public static final MergeDataReply INSTANCE = new MergeDataReply();
+
+ public MergeDataReply() {
+ super(LEGACY_SERIALIZED_INSTANCE);
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
-import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public abstract class ModifyData implements SerializableMessage {
- protected final YangInstanceIdentifier path;
- protected final NormalizedNode<?, ?> data;
- protected final SchemaContext schemaContext;
+public abstract class ModifyData implements Externalizable {
+ private static final long serialVersionUID = 1L;
- public ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
- SchemaContext context) {
- Preconditions.checkNotNull(context,
- "Cannot serialize an object which does not have a schema schemaContext");
+ private YangInstanceIdentifier path;
+ private NormalizedNode<?, ?> data;
+ private short version;
+ protected ModifyData() {
+ }
+ protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
this.path = path;
this.data = data;
- this.schemaContext = context;
}
public YangInstanceIdentifier getPath() {
return data;
}
+ public short getVersion() {
+ return version;
+ }
+
+ protected void setVersion(short version) {
+ this.version = version;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ version = in.readShort();
+ SerializationUtils.deserializePathAndNode(in, this, APPLIER);
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(version);
+ SerializationUtils.serializePathAndNode(path, data, out);
+ }
+
+ private static final Applier<ModifyData> APPLIER = new Applier<ModifyData>() {
+ @Override
+ public void apply(ModifyData instance, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ instance.path = path;
+ instance.data = data;
+ }
+ };
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class ReadDataReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.ReadDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.ReadDataReply.class;
+public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+ private static final long serialVersionUID = 1L;
- private final NormalizedNode<?, ?> normalizedNode;
- private final SchemaContext schemaContext;
+ public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
- public ReadDataReply(SchemaContext context,NormalizedNode<?, ?> normalizedNode){
+ private NormalizedNode<?, ?> normalizedNode;
+ private short version;
+ public ReadDataReply() {
+ }
+
+ public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
this.normalizedNode = normalizedNode;
- this.schemaContext = context;
}
public NormalizedNode<?, ?> getNormalizedNode() {
}
@Override
- public Object toSerializable(){
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ version = in.readShort();
+ normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(version);
+ SerializationUtils.serializeNormalizedNode(normalizedNode, out);
+ }
+
+ @Override
+ public Object toSerializable(short toVersion) {
+ if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+ version = toVersion;
+ return this;
+ } else {
+ return toSerializableReadDataReply(normalizedNode);
+ }
+ }
+
+ private static ShardTransactionMessages.ReadDataReply toSerializableReadDataReply(
+ NormalizedNode<?, ?> normalizedNode) {
if(normalizedNode != null) {
return ShardTransactionMessages.ReadDataReply.newBuilder()
- .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext)
- .encode(normalizedNode).getNormalizedNode()).build();
+ .setNormalizedNode(new NormalizedNodeToNodeCodec(null)
+ .encode(normalizedNode).getNormalizedNode()).build();
} else {
return ShardTransactionMessages.ReadDataReply.newBuilder().build();
}
}
- public static ReadDataReply fromSerializable(SchemaContext schemaContext,
- YangInstanceIdentifier id, Object serializable) {
- ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
- return new ReadDataReply(schemaContext, new NormalizedNodeToNodeCodec(schemaContext).decode(
- o.getNormalizedNode()));
+ public static ReadDataReply fromSerializable(Object serializable) {
+ if(serializable instanceof ReadDataReply) {
+ return (ReadDataReply) serializable;
+ } else {
+ ShardTransactionMessages.ReadDataReply o =
+ (ShardTransactionMessages.ReadDataReply) serializable;
+ return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+ }
+ }
+
+ public static ByteString fromSerializableAsByteString(Object serializable) {
+ if(serializable instanceof ReadDataReply) {
+ ReadDataReply r = (ReadDataReply)serializable;
+ return toSerializableReadDataReply(r.getNormalizedNode()).toByteString();
+ } else {
+ ShardTransactionMessages.ReadDataReply o =
+ (ShardTransactionMessages.ReadDataReply) serializable;
+ return o.getNormalizedNode().toByteString();
+ }
}
- public static ByteString getNormalizedNodeByteString(Object serializable){
- ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
- return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+ public static boolean isSerializedType(Object message) {
+ return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+ message instanceof ShardTransactionMessages.ReadDataReply;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class ReadyTransaction implements SerializableMessage{
- public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
- ShardTransactionMessages.ReadyTransaction.class;
+ public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.ReadyTransaction.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.ReadyTransaction.newBuilder().build();
- }
+ private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+ public static final ReadyTransaction INSTANCE = new ReadyTransaction();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Interface for a Serializable message with versioning.
+ *
+ * @author Thomas Pantelis
+ */
+public interface VersionedSerializableMessage {
+ Object toSerializable(short toVersion);
+}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class WriteData extends ModifyData {
+public class WriteData extends ModifyData implements VersionedSerializableMessage {
+ private static final long serialVersionUID = 1L;
- public static final Class<ShardTransactionMessages.WriteData> SERIALIZABLE_CLASS =
- ShardTransactionMessages.WriteData.class;
+ public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
- public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
- super(path, data, schemaContext);
+ public WriteData() {
+ }
+
+ public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ super(path, data);
}
@Override
- public Object toSerializable() {
- Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
- return ShardTransactionMessages.WriteData.newBuilder()
- .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
- .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ public Object toSerializable(short toVersion) {
+ if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+ setVersion(toVersion);
+ return this;
+ } else {
+ // To base or R1 Helium version
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+ return ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ }
+ }
+
+ public static WriteData fromSerializable(Object serializable) {
+ if(serializable instanceof WriteData) {
+ return (WriteData) serializable;
+ } else {
+ // From base or R1 Helium version
+ ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
+ Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+ o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+ return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ }
}
- public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){
- ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
- Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
- o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+ public static boolean isSerializedType(Object message) {
+ return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+ message instanceof ShardTransactionMessages.WriteData;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class WriteDataReply implements SerializableMessage{
- public static final Class<ShardTransactionMessages.WriteDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.WriteDataReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.WriteDataReply.newBuilder().build();
- }
+public class WriteDataReply extends EmptyReply {
+ private static final long serialVersionUID = 1L;
+
+ private static final Object LEGACY_SERIALIZED_INSTANCE =
+ ShardTransactionMessages.WriteDataReply.newBuilder().build();
+
+ public static final WriteDataReply INSTANCE = new WriteDataReply();
+
+ public WriteDataReply() {
+ super(LEGACY_SERIALIZED_INSTANCE);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+
+/**
+ * Provides various utility methods for serialization and de-serialization.
+ *
+ * @author Thomas Pantelis
+ */
+public final class SerializationUtils {
+ public static interface Applier<T> {
+ void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
+ }
+
+ public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
+ DataOutput out) {
+ Preconditions.checkNotNull(path);
+ Preconditions.checkNotNull(node);
+ try {
+ NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+ NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+ streamWriter.writeYangInstanceIdentifier(path);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Error serializing path {} and Node {}",
+ path, node), e);
+ }
+ }
+
+ public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
+ try {
+ NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ applier.apply(instance, path, node);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing path and Node", e);
+ }
+ }
+
+ public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
+ try {
+ out.writeBoolean(node != null);
+ if(node != null) {
+ NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+ NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Error serializing NormalizedNode {}",
+ node), e);
+ }
+ }
+
+ public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
+ try {
+ boolean present = in.readBoolean();
+ if(present) {
+ NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+ return streamReader.readNormalizedNode();
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+ }
+
+ return null;
+ }
+}
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
public class DataChangeListenerProxyTest extends AbstractActorTest {
private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
final ActorRef actorRef = getSystem().actorOf(props);
DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
- TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
+ getSystem().actorSelection(actorRef.path()));
dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
// Let the DataChangeListener know that notifications should be enabled
subject.tell(new EnableNotification(true), getRef());
- subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ subject.tell(new DataChanged(mockChangeEvent),
getRef());
expectMsgClass(DataChangedReply.class);
final ActorRef subject =
getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
- subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ subject.tell(new DataChanged(mockChangeEvent),
getRef());
new Within(duration("1 seconds")) {
getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
- subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
- ActorRef.noSender());
+ subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender());
// Make sure no DataChangedReply is sent to DeadLetters.
while(true) {
SchemaContext schemaContext = CompositeModel.createTestContext();
- subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+ subject.tell(new DataChanged(mockChangeEvent1),getRef());
expectMsgClass(DataChangedReply.class);
- subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+ subject.tell(new DataChanged(mockChangeEvent2),getRef());
expectMsgClass(DataChangedReply.class);
- subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+ subject.tell(new DataChanged(mockChangeEvent3),getRef());
expectMsgClass(DataChangedReply.class);
Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
import java.util.Collections;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
/**
* Tests backwards compatibility support from Helium-1 to Helium.
*/
public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+ @SuppressWarnings("unchecked")
@Test
public void testTransactionCommit() throws Exception {
new ShardTestKit(getSystem()) {{
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+ DataStoreVersions.BASE_HELIUM_VERSION), getRef());
- expectMsgClass(duration, WriteDataReply.class);
+ expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
// Ready the Tx
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
expectMsgClass(duration, WriteDataReply.class);
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
- ShardTransactionMessages.ReadDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+ Object replySerialized =
+ expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
- assertNotNull(ReadDataReply.fromSerializable(
- testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
- .getNormalizedNode());
+ assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
// unserialized read
transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
final ActorRef shard = createShard();
Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRO"));
props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRW"));
// serialized read
transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
- ShardTransactionMessages.ReadDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+ Object replySerialized =
+ expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
- assertTrue(ReadDataReply.fromSerializable(
- testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
+ assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
// unserialized read
transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
}};
}
+ @Test
+ public void testOnReceiveReadDataHeliumR1() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+
+ ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+ getRef());
+
+ ShardTransactionMessages.ReadDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
+
+ assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+ }};
+ }
+
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
+ DataStoreVersions.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
- getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+ DataStoreVersions.HELIUM_2_VERSION), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
assertModification(transaction, WriteModification.class);
- //unserialized write
+ // unserialized write
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- TestModel.createTestContext()),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
getRef());
expectMsgClass(duration("5 seconds"), WriteDataReply.class);
}};
}
+ @Test
+ public void testOnReceiveHeliumR1WriteData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+ transaction.tell(serialized, getRef());
+
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+
+ assertModification(transaction, WriteModification.class);
+ }};
+ }
+
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
- getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+ DataStoreVersions.HELIUM_2_VERSION), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
//unserialized merge
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
getRef());
expectMsgClass(duration("5 seconds"), MergeDataReply.class);
}};
}
+ @Test
+ public void testOnReceiveHeliumR1MergeData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+ transaction.tell(serialized, getRef());
+
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+
+ assertModification(transaction, MergeModification.class);
+ }};
+ }
+
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
watch(transaction);
}
+ @SuppressWarnings("unchecked")
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
@SuppressWarnings("serial")
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(false));
+ CanCommitTransactionReply.NO);
future = proxy.canCommit();
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
ThreePhaseCommitCohortProxy proxy = setupProxy(3);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
- new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply(), new PreCommitTransactionReply());
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.google.common.util.concurrent.CheckedFuture;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
public class TransactionProxyTest {
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
@Override
public boolean matches(Object argument) {
- CreateTransaction obj = CreateTransaction.fromSerializable(argument);
- return obj.getTransactionId().startsWith(memberName) &&
- obj.getTransactionType() == type.ordinal();
+ if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+
+ return false;
}
};
}
private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+ final int transactionVersion) {
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
- if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
- return false;
+ if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+ (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+ ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+ WriteData obj = WriteData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
}
- WriteData obj = WriteData.fromSerializable(argument, schemaContext);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return false;
}
};
}
private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+ final int transactionVersion) {
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
- if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
- return false;
+ if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+ (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+ ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+ MergeData obj = MergeData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
}
- MergeData obj = MergeData.fromSerializable(argument, schemaContext);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return false;
}
};
return Futures.successful((Object)new ReadyTransactionReply(path));
}
+ private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+ short transactionVersion) {
+ return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+ }
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+ return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
}
private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data));
+ return Futures.successful(new ReadDataReply(data));
}
private Future<Object> dataExistsSerializedReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists));
}
+ private Future<Object> writeSerializedDataReply(short version) {
+ return Futures.successful(new WriteDataReply().toSerializable(version));
+ }
+
private Future<Object> writeSerializedDataReply() {
- return Futures.successful(new WriteDataReply().toSerializable());
+ return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
}
private Future<WriteDataReply> writeDataReply() {
return Futures.successful(new WriteDataReply());
}
+ private Future<Object> mergeSerializedDataReply(short version) {
+ return Futures.successful(new MergeDataReply().toSerializable(version));
+ }
+
private Future<Object> mergeSerializedDataReply() {
- return Futures.successful(new MergeDataReply().toSerializable());
+ return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
}
private Future<MergeDataReply> mergeDataReply() {
.build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+ TransactionType type, int transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
- doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
return actorRef;
}
private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
}
eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ WriteDataReply.class);
}
@Test(expected=IllegalStateException.class)
eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS);
+ MergeDataReply.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ WriteDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
- @Test
- public void testReadyForwardCompatibility() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+ private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+ READ_WRITE, version);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+ doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
- transactionProxy.read(TestModel.TEST_PATH);
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+ get(5, TimeUnit.SECONDS);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+ transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+ transactionProxy.merge(TestModel.TEST_PATH, testNode);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ return actorRef;
+ }
+
+ @Test
+ public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+ ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
}
+ @Test
+ public void testCompatibilityWithHeliumR1Version() throws Exception {
+ ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+ verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+ }
+
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
verifyCohortFutures(proxy, TestException.class);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+ MergeDataReply.class, TestException.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS);
+ MergeDataReply.class);
verifyCohortFutures(proxy, TestException.class);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for DataChanged.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangedTest {
+
+ @Test
+ public void testSerialization() {
+ DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE).
+ addCreated(TestModel.TEST_PATH, ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build()).
+ addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
+.
+ addRemoved(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()).
+ setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME)).
+ setAfter(ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).
+ withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()).build();
+
+ DataChanged expected = new DataChanged(change);
+
+ DataChanged actual = (DataChanged) SerializationUtils.clone(expected);
+
+ assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData());
+ assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData());
+ assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree());
+ assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths());
+ assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData());
+ assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree());
+ }
+}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
public class MergeDataTest {
@Test
public void testSerialization() {
- SchemaContext schemaContext = TestModel.createTestContext();
- MergeData expected = new MergeData(TestModel.TEST_PATH, ImmutableNodes
- .containerNode(TestModel.TEST_QNAME), schemaContext);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- MergeData actual = MergeData.fromSerializable(expected.toSerializable(), schemaContext);
- Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
- Assert.assertEquals("getData", expected.getData(), actual.getData());
+ MergeData expected = new MergeData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", MergeData.class, serialized.getClass());
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
+
+ Object clone = SerializationUtils.clone((Serializable) serialized);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
+ MergeData actual = MergeData.fromSerializable(clone);
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, MergeData.isSerializedType(
+ ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+ .setNormalizedNode(Node.getDefaultInstance()).build()));
+ assertEquals("isSerializedType", true,
+ MergeData.isSerializedType(new MergeData()));
+ assertEquals("isSerializedType", false, MergeData.isSerializedType(new Object()));
+ }
+
+ /**
+ * Tests backwards compatible serialization/deserialization of a MergeData message with the
+ * base and R1 Helium versions, which used the protobuff MergeData message.
+ */
+ @Test
+ public void testSerializationWithHeliumR1Version() throws Exception {
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ MergeData expected = new MergeData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
+
+ MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ReadDataReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataReplyTest {
+
+ @Test
+ public void testSerialization() {
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ ReadDataReply expected = new ReadDataReply(data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
+
+ ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+ (Serializable) serialized));
+ assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(
+ ShardTransactionMessages.ReadDataReply.newBuilder().build()));
+ assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(new ReadDataReply()));
+ assertEquals("isSerializedType", false, ReadDataReply.isSerializedType(new Object()));
+ }
+
+ /**
+ * Tests backwards compatible serialization/deserialization of a ReadDataReply message with the
+ * base and R1 Helium versions, which used the protobuff ReadDataReply message.
+ */
+ @Test
+ public void testSerializationWithHeliumR1Version() throws Exception {
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ ReadDataReply expected = new ReadDataReply(data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
+
+ ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+ (Serializable) serialized));
+ assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore.messages;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
/**
* Unit tests for WriteData.
@Test
public void testSerialization() {
- SchemaContext schemaContext = TestModel.createTestContext();
- WriteData expected = new WriteData(TestModel.TEST_PATH, ImmutableNodes
- .containerNode(TestModel.TEST_QNAME), schemaContext);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- WriteData actual = WriteData.fromSerializable(expected.toSerializable(), schemaContext);
- Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
- Assert.assertEquals("getData", expected.getData(), actual.getData());
+ WriteData expected = new WriteData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", WriteData.class, serialized.getClass());
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
+
+ Object clone = SerializationUtils.clone((Serializable) serialized);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
+ WriteData actual = WriteData.fromSerializable(clone);
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, WriteData.isSerializedType(
+ ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+ .setNormalizedNode(Node.getDefaultInstance()).build()));
+ assertEquals("isSerializedType", true, WriteData.isSerializedType(new WriteData()));
+ assertEquals("isSerializedType", false, WriteData.isSerializedType(new Object()));
+ }
+
+ /**
+ * Tests backwards compatible serialization/deserialization of a WriteData message with the
+ * base and R1 Helium versions, which used the protobuff WriteData message.
+ */
+ @Test
+ public void testSerializationWithHeliumR1Version() throws Exception {
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ WriteData expected = new WriteData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
+
+ WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
}
}
*/
package org.opendaylight.controller.md.cluster.datastore.model;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Set;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Set;
-
public class TestModel {
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+ public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";