Serialization/Deserialization and a host of other fixes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
index a2da063e55d465ffd4c9bb256e9a257475d0e107..737f57bf5d7314536f5fc20b1cbc45167f1b6e97 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -38,8 +37,9 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import java.util.concurrent.ExecutionException;
 
@@ -65,9 +65,10 @@ import java.util.concurrent.ExecutionException;
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
  * </p>
  */
-public class ShardTransaction extends UntypedActor {
+public class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
+    private final SchemaContext schemaContext;
 
     // FIXME : see below
     // If transactionChain is not null then this transaction is part of a
@@ -83,69 +84,70 @@ public class ShardTransaction extends UntypedActor {
         Logging.getLogger(getContext().system(), this);
 
     public ShardTransaction(DOMStoreReadWriteTransaction transaction,
-        ActorRef shardActor) {
-        this(null, transaction, shardActor);
+        ActorRef shardActor, SchemaContext schemaContext) {
+        this(null, transaction, shardActor, schemaContext);
     }
 
     public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction,
-        ActorRef shardActor) {
+        ActorRef shardActor, SchemaContext schemaContext) {
         this.transactionChain = transactionChain;
         this.transaction = transaction;
         this.shardActor = shardActor;
+        this.schemaContext = schemaContext;
     }
 
 
 
     public static Props props(final DOMStoreReadWriteTransaction transaction,
-        final ActorRef shardActor) {
+        final ActorRef shardActor, final SchemaContext schemaContext) {
         return Props.create(new Creator<ShardTransaction>() {
 
             @Override
             public ShardTransaction create() throws Exception {
-                return new ShardTransaction(transaction, shardActor);
+                return new ShardTransaction(transaction, shardActor, schemaContext);
             }
         });
     }
 
     public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
-        final ActorRef shardActor) {
+        final ActorRef shardActor, final SchemaContext schemaContext) {
         return Props.create(new Creator<ShardTransaction>() {
 
             @Override
             public ShardTransaction create() throws Exception {
-                return new ShardTransaction(transactionChain, transaction, shardActor);
+                return new ShardTransaction(transactionChain, transaction, shardActor, schemaContext);
             }
         });
     }
 
 
     @Override
-    public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
-
-        if (message instanceof ReadData) {
-            readData((ReadData) message);
-        } else if (message instanceof WriteData) {
-            writeData((WriteData) message);
-        } else if (message instanceof MergeData) {
-            mergeData((MergeData) message);
-        } else if (message instanceof DeleteData) {
-            deleteData((DeleteData) message);
-        } else if (message instanceof ReadyTransaction) {
-            readyTransaction((ReadyTransaction) message);
-        } else if (message instanceof CloseTransaction) {
-            closeTransaction((CloseTransaction) message);
+    public void handleReceive(Object message) throws Exception {
+        if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readData(ReadData.fromSerializable(message));
+        } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            writeData(WriteData.fromSerializable(message, schemaContext));
+        } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            mergeData(MergeData.fromSerializable(message, schemaContext));
+        } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            deleteData(DeleteData.fromSerizalizable(message));
+        } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(new ReadyTransaction());
+        } else if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
+            closeTransaction(new CloseTransaction());
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
             getSender().tell(new GetCompositeModificationReply(
                 new ImmutableCompositeModification(modification)), getSelf());
+        }else{
+          throw new Exception ("Shard:handleRecieve received an unknown message"+message);
         }
     }
 
     private void readData(ReadData message) {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
-        final InstanceIdentifier path = message.getPath();
+        final YangInstanceIdentifier path = message.getPath();
         final ListenableFuture<Optional<NormalizedNode<?, ?>>> future =
             transaction.read(path);
 
@@ -155,9 +157,9 @@ public class ShardTransaction extends UntypedActor {
                 try {
                     Optional<NormalizedNode<?, ?>> optional = future.get();
                     if (optional.isPresent()) {
-                        sender.tell(new ReadDataReply(optional.get()), self);
+                        sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
                     } else {
-                        sender.tell(new ReadDataReply(null), self);
+                        sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
                     }
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e,
@@ -172,36 +174,38 @@ public class ShardTransaction extends UntypedActor {
 
     private void writeData(WriteData message) {
         modification.addModification(
-            new WriteModification(message.getPath(), message.getData()));
+            new WriteModification(message.getPath(), message.getData(),schemaContext));
+        LOG.debug("writeData at path : " + message.getPath().toString());
         transaction.write(message.getPath(), message.getData());
-        getSender().tell(new WriteDataReply(), getSelf());
+        getSender().tell(new WriteDataReply().toSerializable(), getSelf());
     }
 
     private void mergeData(MergeData message) {
         modification.addModification(
-            new MergeModification(message.getPath(), message.getData()));
+            new MergeModification(message.getPath(), message.getData(), schemaContext));
+        LOG.debug("mergeData at path : " + message.getPath().toString());
         transaction.merge(message.getPath(), message.getData());
-        getSender().tell(new MergeDataReply(), getSelf());
+        getSender().tell(new MergeDataReply().toSerializable(), getSelf());
     }
 
     private void deleteData(DeleteData message) {
         modification.addModification(new DeleteModification(message.getPath()));
         transaction.delete(message.getPath());
-        getSender().tell(new DeleteDataReply(), getSelf());
+        getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
     }
 
     private void readyTransaction(ReadyTransaction message) {
         DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification));
+            ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
         getSender()
-            .tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
+            .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
     }
 
     private void closeTransaction(CloseTransaction message) {
         transaction.close();
-        getSender().tell(new CloseTransactionReply(), getSelf());
+        getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }