ReadData and ReadDataReply protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
index ff02bfbcce520be7a791a555cba318520d90e978..7a0b19742e7843c979bd21d35eb5c28cafd5dfc8 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import java.util.concurrent.ExecutionException;
 
@@ -67,6 +68,7 @@ import java.util.concurrent.ExecutionException;
 public class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
+    private final SchemaContext schemaContext;
 
     // FIXME : see below
     // If transactionChain is not null then this transaction is part of a
@@ -82,37 +84,38 @@ public class ShardTransaction extends AbstractUntypedActor {
         Logging.getLogger(getContext().system(), this);
 
     public ShardTransaction(DOMStoreReadWriteTransaction transaction,
-        ActorRef shardActor) {
-        this(null, transaction, shardActor);
+        ActorRef shardActor, SchemaContext schemaContext) {
+        this(null, transaction, shardActor, schemaContext);
     }
 
     public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction,
-        ActorRef shardActor) {
+        ActorRef shardActor, SchemaContext schemaContext) {
         this.transactionChain = transactionChain;
         this.transaction = transaction;
         this.shardActor = shardActor;
+        this.schemaContext = schemaContext;
     }
 
 
 
     public static Props props(final DOMStoreReadWriteTransaction transaction,
-        final ActorRef shardActor) {
+        final ActorRef shardActor, final SchemaContext schemaContext) {
         return Props.create(new Creator<ShardTransaction>() {
 
             @Override
             public ShardTransaction create() throws Exception {
-                return new ShardTransaction(transaction, shardActor);
+                return new ShardTransaction(transaction, shardActor, schemaContext);
             }
         });
     }
 
     public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
-        final ActorRef shardActor) {
+        final ActorRef shardActor, final SchemaContext schemaContext) {
         return Props.create(new Creator<ShardTransaction>() {
 
             @Override
             public ShardTransaction create() throws Exception {
-                return new ShardTransaction(transactionChain, transaction, shardActor);
+                return new ShardTransaction(transactionChain, transaction, shardActor, schemaContext);
             }
         });
     }
@@ -120,14 +123,14 @@ public class ShardTransaction extends AbstractUntypedActor {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof ReadData) {
-            readData((ReadData) message);
-        } else if (message instanceof WriteData) {
-            writeData((WriteData) message);
-        } else if (message instanceof MergeData) {
-            mergeData((MergeData) message);
-        } else if (message instanceof DeleteData) {
-            deleteData((DeleteData) message);
+        if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readData(ReadData.fromSerializable(message));
+        } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            writeData(WriteData.fromSerializable(message, schemaContext));
+        } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            mergeData(MergeData.fromSerializable(message, schemaContext));
+        } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            deleteData(DeleteData.fromSerizalizable(message));
         } else if (message instanceof ReadyTransaction) {
             readyTransaction((ReadyTransaction) message);
         } else if (message instanceof CloseTransaction) {
@@ -136,6 +139,8 @@ public class ShardTransaction extends AbstractUntypedActor {
             // This is here for testing only
             getSender().tell(new GetCompositeModificationReply(
                 new ImmutableCompositeModification(modification)), getSelf());
+        }else{
+          throw new Exception ("handleRecieve received an unknown mesages"+message);
         }
     }
 
@@ -152,9 +157,9 @@ public class ShardTransaction extends AbstractUntypedActor {
                 try {
                     Optional<NormalizedNode<?, ?>> optional = future.get();
                     if (optional.isPresent()) {
-                        sender.tell(new ReadDataReply(optional.get()), self);
+                        sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
                     } else {
-                        sender.tell(new ReadDataReply(null), self);
+                        sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
                     }
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e,
@@ -169,14 +174,14 @@ public class ShardTransaction extends AbstractUntypedActor {
 
     private void writeData(WriteData message) {
         modification.addModification(
-            new WriteModification(message.getPath(), message.getData()));
+            new WriteModification(message.getPath(), message.getData(),schemaContext));
         transaction.write(message.getPath(), message.getData());
         getSender().tell(new WriteDataReply(), getSelf());
     }
 
     private void mergeData(MergeData message) {
         modification.addModification(
-            new MergeModification(message.getPath(), message.getData()));
+            new MergeModification(message.getPath(), message.getData(), schemaContext));
         transaction.merge(message.getPath(), message.getData());
         getSender().tell(new MergeDataReply(), getSelf());
     }
@@ -190,7 +195,7 @@ public class ShardTransaction extends AbstractUntypedActor {
     private void readyTransaction(ReadyTransaction message) {
         DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification));
+            ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
         getSender()
             .tell(new ReadyTransactionReply(cohortActor.path()), getSelf());