private final String type;
private final ActorContext actorContext;
+ private SchemaContext schemaContext;
+
+
/**
* Executor used to run FutureTask's
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(actorContext, executor);
+ return new TransactionChainProxy(actorContext, executor, schemaContext);
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
- executor);
+ executor, schemaContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
- executor);
+ executor, schemaContext);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
- executor);
+ executor, schemaContext);
}
@Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
actorContext.getShardManager().tell(
new UpdateSchemaContext(schemaContext), null);
}
getSender().tell(new PrimaryNotFound(shardName), getSelf());
}
} else if(message instanceof UpdateSchemaContext){
- // FIXME : Notify all local shards of a context change
+ // FIXME : Notify all local shards of a schemaContext change
getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import java.util.concurrent.ExecutorService;
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
private final ExecutorService transactionExecutor;
+ private final SchemaContext schemaContext;
- public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor) {
+ public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor, SchemaContext schemaContext) {
this.actorContext = actorContext;
this.transactionExecutor = transactionExecutor;
+ this.schemaContext = schemaContext;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE, transactionExecutor);
+ TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext);
}
@Override
import com.google.common.util.concurrent.ListenableFutureTask;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import java.util.ArrayList;
import java.util.HashMap;
private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
private final String identifier;
private final ExecutorService executor;
+ private final SchemaContext schemaContext;
public TransactionProxy(
ActorContext actorContext,
TransactionType transactionType,
- ExecutorService executor
+ ExecutorService executor,
+ SchemaContext schemaContext
) {
this.identifier = "txn-" + counter.getAndIncrement();
this.transactionType = transactionType;
this.actorContext = actorContext;
this.executor = executor;
+ this.schemaContext = schemaContext;
Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
if(response instanceof CreateTransactionReply){
@Override
public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
- remoteTransaction.tell(new WriteData(path, data), null);
+ remoteTransaction.tell(new WriteData(path, data, schemaContext), null);
}
@Override
public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
- remoteTransaction.tell(new MergeData(path, data), null);
+ remoteTransaction.tell(new MergeData(path, data, schemaContext), null);
}
@Override
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class MergeData extends ModifyData {
- public MergeData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
- super(path, data);
- }
+public class MergeData extends ModifyData{
+ public MergeData(InstanceIdentifier path, NormalizedNode<?, ?> data,
+ SchemaContext context) {
+ super(path, data, context);
+ }
+
+ @Override public Object toSerializable() {
+ return ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.getParentPath(path.toString()))
+ .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext).encode(path, data).getNormalizedNode()).build();
+ }
+
+ public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
+ ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+ InstanceIdentifier identifier = InstanceIdentifierUtils.from(o.getInstanceIdentifierPathArguments());
+
+ NormalizedNode<?, ?> normalizedNode =
+ new NormalizedNodeToNodeCodec(schemaContext)
+ .decode(identifier, o.getNormalizedNode());
+
+ return new MergeData(identifier, normalizedNode, schemaContext);
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.common.base.Preconditions;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public abstract class ModifyData {
- private final InstanceIdentifier path;
- private final NormalizedNode<?,?> data;
+public abstract class ModifyData implements SerializableMessage {
+ protected final InstanceIdentifier path;
+ protected final NormalizedNode<?, ?> data;
+ protected final SchemaContext schemaContext;
- public ModifyData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
- this.path = path;
- this.data = data;
- }
+ public ModifyData(InstanceIdentifier path, NormalizedNode<?, ?> data,
+ SchemaContext context) {
+ Preconditions.checkNotNull(context,
+ "Cannot serialize an object which does not have a schema schemaContext");
- public InstanceIdentifier getPath() {
- return path;
- }
- public NormalizedNode<?, ?> getData() {
- return data;
- }
+ this.path = path;
+ this.data = data;
+ this.schemaContext = context;
+ }
+
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+
+ public NormalizedNode<?, ?> getData() {
+ return data;
+ }
}
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class WriteData extends ModifyData{
- public WriteData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
- super(path, data);
+ public WriteData(InstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
+ super(path, data, schemaContext);
}
+
+ @Override public Object toSerializable() {
+ throw new UnsupportedOperationException("toSerializable");
+ }
}
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
private final ActorSystem actorSystem;
private final ActorRef shardManager;
+ private SchemaContext schemaContext = null;
+
public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
this.actorSystem = actorSystem;
this.shardManager = shardManager;
final ActorRef transactionActorRef = watchActor(transaction);
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()),
getRef());
Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
protected void run() {
subject.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()),
getRef());
final String out = new ExpectMsg<String>("match hint") {
protected void run() {
subject.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()),
getRef());
final String out = new ExpectMsg<String>("match hint") {
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
transactionProxy.write(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.NAME_QNAME));
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
transactionProxy.merge(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.NAME_QNAME));
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
transactionProxy.delete(TestModel.TEST_PATH);
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
Assert.assertNotNull(transactionProxy.getIdentifier());
}
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
transactionProxy.close();
Object executeLocalOperationResponse) {
this.executeLocalOperationResponse = executeLocalOperationResponse;
}
-
-
}