Improve logging around transaction lifecycle
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 11e74d5adad0eb09cae2af3a0a645fdbf96fbd0c..2b4ff4845c360d66c4a4f86478cd03564484a086 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
@@ -65,19 +66,23 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
@@ -85,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -182,9 +188,9 @@ public class Shard extends RaftActor {
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
-                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
         ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
                     treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
@@ -334,12 +340,43 @@ public class Shard extends RaftActor {
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
                 store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+            } else if (message instanceof PersistAbortTransactionPayload) {
+                final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+                persistPayload(txId, AbortTransactionPayload.create(txId), true);
+            } else if (message instanceof MakeLeaderLocal) {
+                onMakeLeaderLocal();
             } else {
                 super.handleNonRaftCommand(message);
             }
         }
     }
 
+    private void onMakeLeaderLocal() {
+        LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+        if (isLeader()) {
+            getSender().tell(new Status.Success(null), getSelf());
+            return;
+        }
+
+        final ActorSelection leader = getLeader();
+
+        if (leader == null) {
+            // Leader is not present. The cluster is most likely trying to
+            // elect a leader and we should let that run its normal course
+
+            // TODO we can wait for the election to complete and retry the
+            // request. We can also let the caller retry by sending a flag
+            // in the response indicating the request is "reTryable".
+            getSender().tell(new Failure(
+                    new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+                            + "Currently there is no leader for " + persistenceId())),
+                    getSelf());
+            return;
+        }
+
+        leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+    }
+
     // Acquire our frontend tracking handle and verify generation matches
     private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
@@ -416,7 +453,7 @@ public class Shard extends RaftActor {
             final ClientIdentifier clientId = lhReq.getTarget().getClientId();
             return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
         } else {
-            LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+            LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
             throw new UnsupportedRequestException(request);
         }
     }