Merge "Optimizations, Monitoring and Logging"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
index 3a916bda2c6e163d75c4d39a0c4d80b94b823fb2..1ffe5ca40262ef630a247dca0ce8e46e126f3c34 100644 (file)
@@ -15,6 +15,7 @@ import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
@@ -32,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompo
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -41,8 +43,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import java.util.concurrent.ExecutionException;
-
 /**
  * The ShardTransaction Actor represents a remote transaction
  * <p>
@@ -90,7 +90,6 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
   protected ShardTransaction(DOMStoreTransactionChain transactionChain,
                           ActorRef shardActor, SchemaContext schemaContext) {
     this.transactionChain = transactionChain;
-    //this.transaction = transaction;
     this.shardActor = shardActor;
     this.schemaContext = schemaContext;
   }
@@ -174,7 +173,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
       getSender().tell(new GetCompositeModificationReply(
           new ImmutableCompositeModification(modification)), getSelf());
     }else{
-      throw new Exception ("ShardTransaction:handleRecieve received an unknown message"+message);
+         throw new UnknownMessageException(message);
     }
   }
 
@@ -197,10 +196,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
           } else {
             sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
           }
-        } catch (InterruptedException | ExecutionException e) {
-          log.error(e,
-              "An exception happened when reading data from path : "
-                  + path.toString());
+        } catch (Exception e) {
+            sender.tell(new akka.actor.Status.Failure(new ReadFailedException( "An Exception occurred  when reading data from path : "
+                + path.toString(),e)),self);
         }
 
       }
@@ -212,22 +210,36 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     modification.addModification(
         new WriteModification(message.getPath(), message.getData(),schemaContext));
     LOG.debug("writeData at path : " + message.getPath().toString());
-    transaction.write(message.getPath(), message.getData());
-    getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+
+    try {
+        transaction.write(message.getPath(), message.getData());
+        getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+    }catch(Exception e){
+        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+    }
   }
 
   protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
     modification.addModification(
         new MergeModification(message.getPath(), message.getData(), schemaContext));
     LOG.debug("mergeData at path : " + message.getPath().toString());
-    transaction.merge(message.getPath(), message.getData());
-    getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+    try {
+        transaction.merge(message.getPath(), message.getData());
+        getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+    }catch(Exception e){
+        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+    }
   }
 
   protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+    LOG.debug("deleteData at path : " + message.getPath().toString());
     modification.addModification(new DeleteModification(message.getPath()));
-    transaction.delete(message.getPath());
-    getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+    try {
+        transaction.delete(message.getPath());
+        getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+    }catch(Exception e){
+        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+    }
   }
 
   protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {