Remove PersistAbortTransactionPayload
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index dbcbd3d02eff96f393e389d6b92ee5487b83ce18..8e00aa6091adafb24c4ef2c8636cfaae7f34e091 100644 (file)
@@ -5,9 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Verify.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
@@ -29,10 +30,11 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
@@ -76,14 +78,13 @@ import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
@@ -107,7 +108,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -363,9 +363,6 @@ public class Shard extends RaftActor {
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
                 store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
-            } else if (message instanceof PersistAbortTransactionPayload) {
-                final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
-                persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else if (message instanceof MakeLeaderLocal) {
                 onMakeLeaderLocal();
             } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
@@ -434,6 +431,34 @@ public class Shard extends RaftActor {
         return Optional.of(state.getLastConnectTicks());
     }
 
+    private void disableTracking(final DisableTrackingPayload payload) {
+        final ClientIdentifier clientId = payload.getIdentifier();
+        LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+        frontendMetadata.disableTracking(clientId);
+
+        if (isLeader()) {
+            final FrontendIdentifier frontendId = clientId.getFrontendId();
+            final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+            if (frontend != null) {
+                if (clientId.equals(frontend.getIdentifier())) {
+                    if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+                        verify(knownFrontends.replace(frontendId, frontend,
+                            new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+                        LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+                    } else {
+                        LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+                    }
+                } else {
+                    LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+                }
+            } else {
+                LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+                knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId,
+                    getDataStore()));
+            }
+        }
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
@@ -461,8 +486,7 @@ public class Shard extends RaftActor {
     }
 
     // Acquire our frontend tracking handle and verify generation matches
-    @Nullable
-    private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
+    private @Nullable LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
         if (existing != null) {
             final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
@@ -496,8 +520,7 @@ public class Shard extends RaftActor {
         throw new OutOfSequenceEnvelopeException(0);
     }
 
-    @Nonnull
-    private static ABIVersion selectVersion(final ConnectClientRequest message) {
+    private static @NonNull ABIVersion selectVersion(final ConnectClientRequest message) {
         final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
         for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
             if (clientRange.contains(v)) {
@@ -529,7 +552,7 @@ public class Shard extends RaftActor {
             final ABIVersion selectedVersion = selectVersion(message);
             final LeaderFrontendState frontend;
             if (existing == null) {
-                frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+                frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
                 knownFrontends.put(clientId.getFrontendId(), frontend);
                 LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
             } else {
@@ -545,8 +568,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Nullable
-    private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || paused || !isLeaderActive()) {
@@ -621,13 +643,14 @@ public class Shard extends RaftActor {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
+        final TransactionIdentifier txId = commit.getTransactionId();
         if (isLeader()) {
-            commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
-                messageRetrySupport.addMessageToRetry(commit, getSender(),
-                        "Could not commit transaction " + commit.getTransactionId());
+                messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(commit, getContext());
@@ -636,15 +659,17 @@ public class Shard extends RaftActor {
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId());
+        final TransactionIdentifier txId = canCommit.getTransactionId();
+        LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCanCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
                 messageRetrySupport.addMessageToRetry(canCommit, getSender(),
-                        "Could not canCommit transaction " + canCommit.getTransactionId());
+                        "Could not canCommit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(canCommit, getContext());
@@ -654,6 +679,8 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+        askProtocolEncountered(batched.getTransactionId());
+
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
         } catch (Exception e) {
@@ -750,6 +777,7 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(forwardedReady.getTransactionId());
             commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
@@ -769,7 +797,9 @@ public class Shard extends RaftActor {
     }
 
     private void handleAbortTransaction(final AbortTransaction abort) {
-        doAbortTransaction(abort.getTransactionId(), getSender());
+        final TransactionIdentifier transactionId = abort.getTransactionId();
+        askProtocolEncountered(transactionId);
+        doAbortTransaction(transactionId, getSender());
     }
 
     void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
@@ -788,13 +818,24 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
-        store.closeTransactionChain(id, null);
-        store.purgeTransactionChain(id, null);
+        if (isLeader()) {
+            final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+            askProtocolEncountered(id.getClientId());
+
+            // FIXME: CONTROLLER-1628: stage purge once no transactions are present
+            store.closeTransactionChain(id, null);
+            store.purgeTransactionChain(id, null);
+        } else if (getLeader() != null) {
+            getLeader().forward(closeTransactionChain, getContext());
+        } else {
+            LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier());
+        }
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void createTransaction(final CreateTransaction createTransaction) {
+        askProtocolEncountered(createTransaction.getTransactionId());
+
         try {
             if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
                     && failIfIsolatedLeader(getSender())) {
@@ -817,6 +858,27 @@ public class Shard extends RaftActor {
             transactionId);
     }
 
+    // Called on leader only
+    private void askProtocolEncountered(final TransactionIdentifier transactionId) {
+        askProtocolEncountered(transactionId.getHistoryId().getClientId());
+    }
+
+    // Called on leader only
+    private void askProtocolEncountered(final ClientIdentifier clientId) {
+        final FrontendIdentifier frontend = clientId.getFrontendId();
+        final LeaderFrontendState state = knownFrontends.get(frontend);
+        if (!(state instanceof LeaderFrontendState.Disabled)) {
+            LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+            if (knownFrontends.isEmpty()) {
+                knownFrontends = new HashMap<>();
+            }
+            knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
+
+            persistPayload(clientId, DisableTrackingPayload.create(clientId,
+                datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
+        }
+    }
+
     private void updateSchemaContext(final UpdateSchemaContext message) {
         updateSchemaContext(message.getSchemaContext());
     }
@@ -838,7 +900,6 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
         if (restoreFromSnapshot == null) {
             return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
@@ -858,7 +919,7 @@ public class Shard extends RaftActor {
         if (txCommitTimeoutCheckSchedule == null) {
             // Schedule a message to be periodically sent to check if the current in-progress
             // transaction should be expired and aborted.
-            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
                     period, period, getSelf(),
                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
@@ -868,6 +929,11 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
+            if (data instanceof DisableTrackingPayload) {
+                disableTracking((DisableTrackingPayload) data);
+                return;
+            }
+
             try {
                 store.applyReplicatedPayload(identifier, (Payload)data);
             } catch (DataValidationFailedException | IOException e) {
@@ -966,8 +1032,10 @@ public class Shard extends RaftActor {
         paused = true;
 
         // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
-        knownFrontends.values().forEach(LeaderFrontendState::retire);
-        knownFrontends = ImmutableMap.of();
+        if (datastoreContext.isUseTellBasedProtocol()) {
+            knownFrontends.values().forEach(LeaderFrontendState::retire);
+            knownFrontends = ImmutableMap.of();
+        }
 
         store.setRunOnPendingTransactionsComplete(operation);
     }