Migrate ShardDataTree to use OptionalLong
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 3719f749df6bce26cbe469250284ffd834bba8a0..6a05d25fe5b91adbb58a5539c8b5e09613a6e101 100644 (file)
@@ -5,9 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Verify.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
@@ -29,10 +30,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
@@ -76,15 +79,13 @@ import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
@@ -103,12 +104,11 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -162,6 +162,8 @@ public class Shard extends RaftActor {
     /// The name of this shard
     private final String name;
 
+    private final String shardName;
+
     private final ShardStats shardMBean;
 
     private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
@@ -183,8 +185,6 @@ public class Shard extends RaftActor {
     private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
-    private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
-
 
     private ShardSnapshot restoreFromSnapshot;
 
@@ -204,6 +204,7 @@ public class Shard extends RaftActor {
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
+        this.shardName = builder.getId().getShardName();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
         this.frontendMetadata = new FrontendMetadata(name);
@@ -214,15 +215,12 @@ public class Shard extends RaftActor {
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
                 new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
-        ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
-                    treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
+                    treeChangeListenerPublisher, name, frontendMetadata);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
-                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
-                    dataChangeListenerPublisher, name, frontendMetadata);
+                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@@ -309,7 +307,7 @@ public class Shard extends RaftActor {
     @Override
     protected void handleNonRaftCommand(final Object message) {
         try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
-            final Optional<Error> maybeError = context.error();
+            final java.util.Optional<Error> maybeError = context.error();
             if (maybeError.isPresent()) {
                 LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
                     maybeError.get());
@@ -339,8 +337,6 @@ public class Shard extends RaftActor {
                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
             } else if (CloseTransactionChain.isSerializedType(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
-            } else if (message instanceof RegisterChangeListener) {
-                changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof RegisterDataTreeChangeListener) {
                 treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof UpdateSchemaContext) {
@@ -368,9 +364,6 @@ public class Shard extends RaftActor {
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
                 store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
-            } else if (message instanceof PersistAbortTransactionPayload) {
-                final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
-                persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else if (message instanceof MakeLeaderLocal) {
                 onMakeLeaderLocal();
             } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
@@ -400,9 +393,7 @@ public class Shard extends RaftActor {
                         responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
                             .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
                             .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
-                            .onFailureCallback(t -> {
-                                LOG.warn("Error slicing response {}", success, t);
-                            }).build()));
+                            .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
                 } else {
                     envelope.sendSuccess(success, executionTimeNanos);
                 }
@@ -423,22 +414,50 @@ public class Shard extends RaftActor {
         requestMessageAssembler.checkExpiredAssembledMessageState();
     }
 
-    private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+    private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
         final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
         final LeaderFrontendState state = knownFrontends.get(frontend);
         if (state == null) {
             // Not tell-based protocol, do nothing
-            return Optional.absent();
+            return OptionalLong.empty();
         }
 
         if (isIsolatedLeader()) {
             // We are isolated and no new request can come through until we emerge from it. We are still updating
             // liveness of frontend when we see it attempting to communicate. Use the last access timer.
-            return Optional.of(state.getLastSeenTicks());
+            return OptionalLong.of(state.getLastSeenTicks());
         }
 
         // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
-        return Optional.of(state.getLastConnectTicks());
+        return OptionalLong.of(state.getLastConnectTicks());
+    }
+
+    private void disableTracking(final DisableTrackingPayload payload) {
+        final ClientIdentifier clientId = payload.getIdentifier();
+        LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+        frontendMetadata.disableTracking(clientId);
+
+        if (isLeader()) {
+            final FrontendIdentifier frontendId = clientId.getFrontendId();
+            final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+            if (frontend != null) {
+                if (clientId.equals(frontend.getIdentifier())) {
+                    if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+                        verify(knownFrontends.replace(frontendId, frontend,
+                            new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+                        LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+                    } else {
+                        LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+                    }
+                } else {
+                    LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+                }
+            } else {
+                LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+                knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId,
+                    getDataStore()));
+            }
+        }
     }
 
     private void onMakeLeaderLocal() {
@@ -468,8 +487,7 @@ public class Shard extends RaftActor {
     }
 
     // Acquire our frontend tracking handle and verify generation matches
-    @Nullable
-    private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
+    private @Nullable LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
         if (existing != null) {
             final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
@@ -479,7 +497,8 @@ public class Shard extends RaftActor {
             }
             if (cmp > 0) {
                 LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
-                throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+                throw new RetiredGenerationException(clientId.getGeneration(),
+                    existing.getIdentifier().getGeneration());
             }
 
             LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
@@ -502,8 +521,7 @@ public class Shard extends RaftActor {
         throw new OutOfSequenceEnvelopeException(0);
     }
 
-    @Nonnull
-    private static ABIVersion selectVersion(final ConnectClientRequest message) {
+    private static @NonNull ABIVersion selectVersion(final ConnectClientRequest message) {
         final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
         for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
             if (clientRange.contains(v)) {
@@ -535,7 +553,7 @@ public class Shard extends RaftActor {
             final ABIVersion selectedVersion = selectVersion(message);
             final LeaderFrontendState frontend;
             if (existing == null) {
-                frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+                frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
                 knownFrontends.put(clientId.getFrontendId(), frontend);
                 LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
             } else {
@@ -551,8 +569,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Nullable
-    private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || paused || !isLeaderActive()) {
@@ -594,6 +611,10 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    String getShardName() {
+        return shardName;
+    }
+
     @Override
     protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
             final short leaderPayloadVersion) {
@@ -623,13 +644,14 @@ public class Shard extends RaftActor {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
+        final TransactionIdentifier txId = commit.getTransactionId();
         if (isLeader()) {
-            commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
-                messageRetrySupport.addMessageToRetry(commit, getSender(),
-                        "Could not commit transaction " + commit.getTransactionId());
+                messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(commit, getContext());
@@ -638,15 +660,17 @@ public class Shard extends RaftActor {
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId());
+        final TransactionIdentifier txId = canCommit.getTransactionId();
+        LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this);
+            askProtocolEncountered(txId);
+            commitCoordinator.handleCanCommit(txId, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
                 messageRetrySupport.addMessageToRetry(canCommit, getSender(),
-                        "Could not canCommit transaction " + canCommit.getTransactionId());
+                        "Could not canCommit transaction " + txId);
             } else {
                 LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
                 leader.forward(canCommit, getContext());
@@ -656,6 +680,8 @@ public class Shard extends RaftActor {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+        askProtocolEncountered(batched.getTransactionId());
+
         try {
             commitCoordinator.handleBatchedModifications(batched, sender, this);
         } catch (Exception e) {
@@ -752,6 +778,7 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
+            askProtocolEncountered(forwardedReady.getTransactionId());
             commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
@@ -762,7 +789,8 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
 
                 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
-                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+                        forwardedReady.getParticipatingShardNames());
                 readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(readyLocal, getContext());
             }
@@ -770,7 +798,9 @@ public class Shard extends RaftActor {
     }
 
     private void handleAbortTransaction(final AbortTransaction abort) {
-        doAbortTransaction(abort.getTransactionId(), getSender());
+        final TransactionIdentifier transactionId = abort.getTransactionId();
+        askProtocolEncountered(transactionId);
+        doAbortTransaction(transactionId, getSender());
     }
 
     void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
@@ -789,13 +819,21 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
-        store.closeTransactionChain(id, null);
-        store.purgeTransactionChain(id, null);
+        if (isLeader()) {
+            final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+            askProtocolEncountered(id.getClientId());
+            store.closeTransactionChain(id);
+        } else if (getLeader() != null) {
+            getLeader().forward(closeTransactionChain, getContext());
+        } else {
+            LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier());
+        }
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void createTransaction(final CreateTransaction createTransaction) {
+        askProtocolEncountered(createTransaction.getTransactionId());
+
         try {
             if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
                     && failIfIsolatedLeader(getSender())) {
@@ -818,6 +856,27 @@ public class Shard extends RaftActor {
             transactionId);
     }
 
+    // Called on leader only
+    private void askProtocolEncountered(final TransactionIdentifier transactionId) {
+        askProtocolEncountered(transactionId.getHistoryId().getClientId());
+    }
+
+    // Called on leader only
+    private void askProtocolEncountered(final ClientIdentifier clientId) {
+        final FrontendIdentifier frontend = clientId.getFrontendId();
+        final LeaderFrontendState state = knownFrontends.get(frontend);
+        if (!(state instanceof LeaderFrontendState.Disabled)) {
+            LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+            if (knownFrontends.isEmpty()) {
+                knownFrontends = new HashMap<>();
+            }
+            knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
+
+            persistPayload(clientId, DisableTrackingPayload.create(clientId,
+                datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
+        }
+    }
+
     private void updateSchemaContext(final UpdateSchemaContext message) {
         updateSchemaContext(message.getSchemaContext());
     }
@@ -839,7 +898,6 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
         if (restoreFromSnapshot == null) {
             return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
@@ -859,7 +917,7 @@ public class Shard extends RaftActor {
         if (txCommitTimeoutCheckSchedule == null) {
             // Schedule a message to be periodically sent to check if the current in-progress
             // transaction should be expired and aborted.
-            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
                     period, period, getSelf(),
                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
@@ -869,6 +927,11 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
+            if (data instanceof DisableTrackingPayload) {
+                disableTracking((DisableTrackingPayload) data);
+                return;
+            }
+
             try {
                 store.applyReplicatedPayload(identifier, (Payload)data);
             } catch (DataValidationFailedException | IOException e) {
@@ -883,7 +946,6 @@ public class Shard extends RaftActor {
     protected void onStateChanged() {
         boolean isLeader = isLeader();
         boolean hasLeader = hasLeader();
-        changeSupport.onLeadershipChange(isLeader, hasLeader);
         treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
 
         // If this actor is no longer the leader close all the transaction chains
@@ -932,6 +994,8 @@ public class Shard extends RaftActor {
                             messagesToForward.size(), leader);
 
                     for (Object message : messagesToForward) {
+                        LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
                         leader.tell(message, self());
                     }
                 }
@@ -966,8 +1030,10 @@ public class Shard extends RaftActor {
         paused = true;
 
         // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
-        knownFrontends.values().forEach(LeaderFrontendState::retire);
-        knownFrontends = ImmutableMap.of();
+        if (datastoreContext.isUseTellBasedProtocol()) {
+            knownFrontends.values().forEach(LeaderFrontendState::retire);
+            knownFrontends = ImmutableMap.of();
+        }
 
         store.setRunOnPendingTransactionsComplete(operation);
     }
@@ -986,7 +1052,6 @@ public class Shard extends RaftActor {
     @Override
     protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
         return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
-                .dataChangeListenerActors(changeSupport.getListenerActors())
                 .commitCohortActors(store.getCohortActors());
     }
 
@@ -1025,7 +1090,7 @@ public class Shard extends RaftActor {
         private DatastoreContext datastoreContext;
         private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
-        private TipProducingDataTree dataTree;
+        private DataTree dataTree;
         private volatile boolean sealed;
 
         protected AbstractBuilder(final Class<S> shardClass) {
@@ -1071,7 +1136,7 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T dataTree(final TipProducingDataTree newDataTree) {
+        public T dataTree(final DataTree newDataTree) {
             checkSealed();
             this.dataTree = newDataTree;
             return self();
@@ -1097,7 +1162,7 @@ public class Shard extends RaftActor {
             return restoreFromSnapshot;
         }
 
-        public TipProducingDataTree getDataTree() {
+        public DataTree getDataTree() {
             return dataTree;
         }