X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=4b130950f27d9f40585cc432eedf4b31e71e3a9b;hb=5448d6812e386bd56aec7209c4852c586a6163b3;hp=1bf32e7fca7191236afeeb2c7ed0048b28db499c;hpb=3def254d3e8d2f24038ddfb7d1b1749ca2135fe2;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 1bf32e7fca..4b130950f2 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -8,6 +8,25 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -65,25 +84,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
/**
* A Shard represents a portion of the logical data tree
@@ -93,8 +93,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
*/
public class Shard extends RaftActor {
- private static final int HELIUM_1_TX_VERSION = 1;
-
private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@@ -391,7 +389,7 @@ public class Shard extends RaftActor {
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
ActorRef replyActorPath = self();
- if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+ if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
@@ -492,8 +490,8 @@ public class Shard extends RaftActor {
}
}
- if(this.schemaContext == null){
- throw new NullPointerException("schemaContext should not be null");
+ if(this.schemaContext == null) {
+ throw new IllegalStateException("SchemaContext is not set");
}
if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
@@ -534,9 +532,16 @@ public class Shard extends RaftActor {
}
private void createTransaction(CreateTransaction createTransaction) {
- createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
- createTransaction.getClientVersion());
+ try {
+ ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+ createTransaction.getVersion());
+
+ getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(), getSelf());
+ } catch (Exception e) {
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
@@ -546,18 +551,14 @@ public class Shard extends RaftActor {
ShardTransactionIdentifier.builder()
.remoteTransactionId(remoteTransactionId)
.build();
+
if(LOG.isDebugEnabled()) {
LOG.debug("Creating transaction : {} ", transactionId);
}
+
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
transactionChainId, clientVersion);
- getSender()
- .tell(new CreateTransactionReply(
- Serialization.serializedActorPath(transactionActor),
- remoteTransactionId).toSerializable(),
- getSelf());
-
return transactionActor;
}
@@ -615,7 +616,7 @@ public class Shard extends RaftActor {
LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
private ListenerRegistration