Merge "Bug 1029: Remove dead code: p2site"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 1bf32e7fca7191236afeeb2c7ed0048b28db499c..4b130950f27d9f40585cc432eedf4b31e71e3a9b 100644 (file)
@@ -8,6 +8,25 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -65,25 +84,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -93,8 +93,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
  */
 public class Shard extends RaftActor {
 
-    private static final int HELIUM_1_TX_VERSION = 1;
-
     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@@ -391,7 +389,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -492,8 +490,8 @@ public class Shard extends RaftActor {
             }
         }
 
-        if(this.schemaContext == null){
-            throw new NullPointerException("schemaContext should not be null");
+        if(this.schemaContext == null) {
+            throw new IllegalStateException("SchemaContext is not set");
         }
 
         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
@@ -534,9 +532,16 @@ public class Shard extends RaftActor {
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
-        createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
-            createTransaction.getClientVersion());
+        try {
+            ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+                createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+                createTransaction.getVersion());
+
+            getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+                    createTransaction.getTransactionId()).toSerializable(), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
@@ -546,18 +551,14 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier.builder()
                 .remoteTransactionId(remoteTransactionId)
                 .build();
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
+
         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
                 transactionChainId, clientVersion);
 
-        getSender()
-            .tell(new CreateTransactionReply(
-                    Serialization.serializedActorPath(transactionActor),
-                    remoteTransactionId).toSerializable(),
-                getSelf());
-
         return transactionActor;
     }
 
@@ -615,7 +616,7 @@ public class Shard extends RaftActor {
         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                     listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
@@ -782,7 +783,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());