Store schemaContext in ActorContext so that all proxy objects can have access to it 22/8822/7
authorMoiz Raja <moraja@cisco.com>
Tue, 8 Jul 2014 19:44:05 +0000 (12:44 -0700)
committerMoiz Raja <moraja@cisco.com>
Mon, 28 Jul 2014 20:56:30 +0000 (13:56 -0700)
Use it for MergeData and WriteData message construction

Change-Id: I20df92fc77c41016df7cc6f737226368b25b5a0f
Signed-off-by: Moiz Raja <moraja@cisco.com>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java

index 4401104..10bcd30 100644 (file)
@@ -44,6 +44,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private final String type;
     private final ActorContext actorContext;
 
+    private SchemaContext schemaContext;
+
+
 
     /**
      * Executor used to run FutureTask's
@@ -88,28 +91,29 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext, executor);
+        return new TransactionChainProxy(actorContext, executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
-            executor);
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
-            executor);
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
-            executor);
+            executor, schemaContext);
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
         actorContext.getShardManager().tell(
             new UpdateSchemaContext(schemaContext), null);
     }
index 250ef49..f55774f 100644 (file)
@@ -93,7 +93,7 @@ public class ShardManager extends AbstractUntypedActor {
         getSender().tell(new PrimaryNotFound(shardName), getSelf());
       }
     } else if(message instanceof UpdateSchemaContext){
-        // FIXME : Notify all local shards of a context change
+        // FIXME : Notify all local shards of a schemaContext change
         getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
     }
   }
index 71b61ff..2e8538d 100644 (file)
@@ -13,6 +13,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import java.util.concurrent.ExecutorService;
 
@@ -22,28 +23,30 @@ import java.util.concurrent.ExecutorService;
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
     private final ExecutorService transactionExecutor;
+    private final SchemaContext schemaContext;
 
-    public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor) {
+    public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor, SchemaContext schemaContext) {
         this.actorContext = actorContext;
         this.transactionExecutor = transactionExecutor;
+        this.schemaContext = schemaContext;
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+            TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor);
+            TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE, transactionExecutor);
+            TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext);
     }
 
     @Override
index d3f209b..9a77d13 100644 (file)
@@ -15,7 +15,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@@ -24,10 +23,12 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -63,17 +64,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
     private final String identifier;
     private final ExecutorService executor;
+    private final SchemaContext schemaContext;
 
     public TransactionProxy(
         ActorContext actorContext,
         TransactionType transactionType,
-        ExecutorService executor
+        ExecutorService executor,
+        SchemaContext schemaContext
         ) {
 
         this.identifier = "txn-" + counter.getAndIncrement();
         this.transactionType = transactionType;
         this.actorContext = actorContext;
         this.executor = executor;
+        this.schemaContext = schemaContext;
 
         Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
         if(response instanceof CreateTransactionReply){
@@ -116,13 +120,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
-        remoteTransaction.tell(new WriteData(path, data), null);
+        remoteTransaction.tell(new WriteData(path, data, schemaContext), null);
     }
 
     @Override
     public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
-        remoteTransaction.tell(new MergeData(path, data), null);
+        remoteTransaction.tell(new MergeData(path, data, schemaContext), null);
     }
 
     @Override
index 75d1e95..5ab7741 100644 (file)
@@ -8,11 +8,33 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class MergeData extends ModifyData {
-  public MergeData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-    super(path, data);
-  }
+public class MergeData extends ModifyData{
+    public MergeData(InstanceIdentifier path, NormalizedNode<?, ?> data,
+        SchemaContext context) {
+        super(path, data, context);
+    }
+
+    @Override public Object toSerializable() {
+        return ShardTransactionMessages.MergeData.newBuilder()
+            .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.getParentPath(path.toString()))
+            .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext).encode(path, data).getNormalizedNode()).build();
+    }
+
+    public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
+        ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+        InstanceIdentifier identifier = InstanceIdentifierUtils.from(o.getInstanceIdentifierPathArguments());
+
+        NormalizedNode<?, ?> normalizedNode =
+            new NormalizedNodeToNodeCodec(schemaContext)
+                .decode(identifier, o.getNormalizedNode());
+
+        return new MergeData(identifier, normalizedNode, schemaContext);
+    }
 }
index da86088..cdab30c 100644 (file)
@@ -8,24 +8,33 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.common.base.Preconditions;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public abstract class ModifyData {
-  private final InstanceIdentifier path;
-  private final NormalizedNode<?,?> data;
+public abstract class ModifyData implements SerializableMessage {
+    protected final InstanceIdentifier path;
+    protected final NormalizedNode<?, ?> data;
+    protected final SchemaContext schemaContext;
 
-  public ModifyData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-    this.path = path;
-    this.data = data;
-  }
+    public ModifyData(InstanceIdentifier path, NormalizedNode<?, ?> data,
+        SchemaContext context) {
+        Preconditions.checkNotNull(context,
+            "Cannot serialize an object which does not have a schema schemaContext");
 
-  public InstanceIdentifier getPath() {
-    return path;
-  }
 
-  public NormalizedNode<?, ?> getData() {
-    return data;
-  }
+        this.path = path;
+        this.data = data;
+        this.schemaContext = context;
+    }
+
+    public InstanceIdentifier getPath() {
+        return path;
+    }
+
+    public NormalizedNode<?, ?> getData() {
+        return data;
+    }
 
 }
index 1348e65..913da06 100644 (file)
@@ -10,10 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class WriteData extends ModifyData{
 
-  public WriteData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-    super(path, data);
+  public WriteData(InstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
+    super(path, data, schemaContext);
   }
+
+    @Override public Object toSerializable() {
+        throw new UnsupportedOperationException("toSerializable");
+    }
 }
index c97e07d..3dacec7 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -45,6 +46,8 @@ public class ActorContext {
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
 
+    private SchemaContext schemaContext = null;
+
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
index 8d7f7c8..6bef691 100644 (file)
@@ -99,7 +99,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     final ActorRef transactionActorRef = watchActor(transaction);
 
                     transaction.tell(new WriteData(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()),
                         getRef());
 
                     Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
index 6521211..28bbdba 100644 (file)
@@ -169,7 +169,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 protected void run() {
 
                     subject.tell(new WriteData(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()),
                         getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
@@ -207,7 +207,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 protected void run() {
 
                     subject.tell(new MergeData(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()),
                         getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
index fa32c87..4f3a194 100644 (file)
@@ -42,7 +42,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
 
         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
@@ -73,7 +73,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
 
         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
@@ -104,7 +104,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
         transactionProxy.write(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
@@ -136,7 +136,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
         transactionProxy.merge(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
@@ -168,7 +168,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -199,7 +199,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
@@ -222,7 +222,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
         Assert.assertNotNull(transactionProxy.getIdentifier());
     }
@@ -238,7 +238,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
 
         transactionProxy.close();