CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index e62e918e5e7eff53b8f7e00dbe1e8300d270b59f..148fa1881b836252417cdccd92ea5440950f82c1 100644 (file)
@@ -46,6 +46,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
@@ -228,6 +229,8 @@ public class Shard extends RaftActor {
             } else if (message instanceof ForwardedReadyTransaction) {
                 commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
                         getSender(), this);
+            } else if (message instanceof ReadyLocalTransaction) {
+                handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -393,6 +396,15 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
+    private void noLeaderError(Object message) {
+        // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+        // it more resilient in case we're in the process of electing a new leader.
+        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+            "Could not find the leader for shard %s. This typically happens" +
+            " when the system is coming up or recovering and a leader is being elected. Try again" +
+            " later.", persistenceId()))), getSelf());
+    }
+
     private void handleBatchedModifications(BatchedModifications batched) {
         // This message is sent to prepare the modificationsa transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
@@ -423,12 +435,27 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
-                // it more resilient in case we're in the process of electing a new leader.
-                getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-                    "Could not find the leader for shard %s. This typically happens" +
-                    " when the system is coming up or recovering and a leader is being elected. Try again" +
-                    " later.", persistenceId()))), getSelf());
+                noLeaderError(batched);
+            }
+        }
+    }
+
+    private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+        if (isLeader()) {
+            try {
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+            } catch (Exception e) {
+                LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+                        message.getTransactionID(), e);
+                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+            }
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+                leader.forward(message, getContext());
+            } else {
+                noLeaderError(message);
             }
         }
     }
@@ -493,7 +520,7 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier transactionId, String transactionChainId,
             short clientVersion ) {
 
-        return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
                 transactionId, transactionChainId, clientVersion);
     }