Bug 3020: Add version to AppendEntries and AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 62d3259a714059f497ca3e97ad08166a3a1106bf..14a20da247f5138b17e3c30e437bf3c661e5ec25 100644 (file)
@@ -46,14 +46,17 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
@@ -63,6 +66,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -112,7 +116,8 @@ public class Shard extends RaftActor {
 
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
+        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
+                DataStoreVersions.CURRENT_VERSION);
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
@@ -225,6 +230,8 @@ public class Shard extends RaftActor {
             } else if (message instanceof ForwardedReadyTransaction) {
                 commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
                         getSender(), this);
+            } else if (message instanceof ReadyLocalTransaction) {
+                handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -265,6 +272,12 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    @Override
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+        return new ShardLeaderStateChanged(memberId, leaderId,
+                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
 
@@ -384,6 +397,15 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
+    private void noLeaderError(Object message) {
+        // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+        // it more resilient in case we're in the process of electing a new leader.
+        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+            "Could not find the leader for shard %s. This typically happens" +
+            " when the system is coming up or recovering and a leader is being elected. Try again" +
+            " later.", persistenceId()))), getSelf());
+    }
+
     private void handleBatchedModifications(BatchedModifications batched) {
         // This message is sent to prepare the modificationsa transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
@@ -414,12 +436,27 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
-                // it more resilient in case we're in the process of electing a new leader.
-                getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-                    "Could not find the leader for shard %s. This typically happens" +
-                    " when the system is coming up or recovering and a leader is being elected. Try again" +
-                    " later.", persistenceId()))), getSelf());
+                noLeaderError(batched);
+            }
+        }
+    }
+
+    private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+        if (isLeader()) {
+            try {
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+            } catch (Exception e) {
+                LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+                        message.getTransactionID(), e);
+                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+            }
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+                leader.forward(message, getContext());
+            } else {
+                noLeaderError(message);
             }
         }
     }
@@ -484,7 +521,7 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier transactionId, String transactionChainId,
             short clientVersion ) {
 
-        return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
                 transactionId, transactionChainId, clientVersion);
     }