Fix license header violations in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 7ca9ca99284348be0f99b775076c6313788ebcf4..5a10c3e961bc8e038a0dd3969134b903aace2a56 100644 (file)
@@ -18,9 +18,6 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -62,6 +58,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@ -82,7 +79,7 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+    protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
     static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
@@ -133,7 +130,7 @@ public class Shard extends RaftActor {
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setShardActor(getSelf());
+        shardMBean.setShard(this);
 
         if (isMetricsCaptureEnabled()) {
             getContext().become(new MeteringBehavior(this));
@@ -274,6 +271,10 @@ public class Shard extends RaftActor {
         }
     }
 
+    public int getPendingTxCommitQueueSize() {
+        return commitCoordinator.getQueueSize();
+    }
+
     @Override
     protected Optional<ActorRef> getRoleChangeNotifier() {
         return roleChangeNotifier;
@@ -321,7 +322,7 @@ public class Shard extends RaftActor {
     }
 
     void continueCommit(final CohortEntry cohortEntry) throws Exception {
-        final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+        final DataTreeCandidate candidate = cohortEntry.getCandidate();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
@@ -344,10 +345,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().commit().get();
+            cohortEntry.commit();
 
             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
@@ -380,7 +378,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 try {
-                    store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
                 } catch (DataValidationFailedException e) {
                     shardMBean.incrementFailedTransactionsCount();
                     LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
@@ -406,13 +404,10 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(Object message) {
+    private void noLeaderError(String errMessage, Object message) {
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
-        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-            "Could not find the leader for shard %s. This typically happens" +
-            " when the system is coming up or recovering and a leader is being elected. Try again" +
-            " later.", persistenceId()))), getSelf());
+        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
@@ -429,6 +424,8 @@ public class Shard extends RaftActor {
         // window where it could have a stale leader during leadership transitions.
         //
         if(isLeader()) {
+            failIfIsolatedLeader(getSender());
+
             try {
                 commitCoordinator.handleBatchedModifications(batched, getSender(), this);
             } catch (Exception e) {
@@ -445,13 +442,27 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError(batched);
+                noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
             }
         }
     }
 
+    private boolean failIfIsolatedLeader(ActorRef sender) {
+        if(getRaftState() == RaftState.IsolatedLeader) {
+            sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+                    "Shard %s was the leader but has lost contact with all of its followers. Either all" +
+                    " other follower nodes are down or this node is isolated by a network partition.",
+                    persistenceId()))), getSelf());
+            return true;
+        }
+
+        return false;
+    }
+
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         if (isLeader()) {
+            failIfIsolatedLeader(getSender());
+
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
@@ -466,7 +477,7 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError(message);
+                noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
             }
         }
     }
@@ -476,38 +487,7 @@ public class Shard extends RaftActor {
     }
 
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry != null) {
-            LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
-
-            // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
-            // aborted during replication in which case we may still commit locally if replication
-            // succeeds.
-            commitCoordinator.currentTransactionComplete(transactionID, false);
-
-            final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
-            final ActorRef self = getSelf();
-
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void v) {
-                    shardMBean.incrementAbortTransactionsCount();
-
-                    if(sender != null) {
-                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable t) {
-                    LOG.error("{}: An exception happened during abort", persistenceId(), t);
-
-                    if(sender != null) {
-                        sender.tell(new akka.actor.Status.Failure(t), self);
-                    }
-                }
-            });
-        }
+        commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {
@@ -516,10 +496,8 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-                "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
-                " when the system is coming up or recovering and a leader is being elected. Try again" +
-                " later.", persistenceId()))), getSelf());
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+                    "Could not create a shard transaction", persistenceId())), getSelf());
         }
     }
 
@@ -537,6 +515,11 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         try {
+            if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+                    failIfIsolatedLeader(getSender())) {
+                return;
+            }
+
             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
                 createTransaction.getVersion());