Allow transaction tracking to be disabled
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 402bf4822b01a1f56a12ebfcbf1a2296c24d4fb2..fb6b0142fe440905bbdd55c81c3ff23772ee8c9b 100644 (file)
@@ -5,9 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Verify.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
@@ -31,8 +32,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
@@ -78,13 +79,14 @@ import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
@@ -103,12 +105,11 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -162,6 +163,8 @@ public class Shard extends RaftActor {
     /// The name of this shard
     private final String name;
 
+    private final String shardName;
+
     private final ShardStats shardMBean;
 
     private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
@@ -183,8 +186,6 @@ public class Shard extends RaftActor {
     private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
-    private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
-
 
     private ShardSnapshot restoreFromSnapshot;
 
@@ -204,6 +205,7 @@ public class Shard extends RaftActor {
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
+        this.shardName = builder.getId().getShardName();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
         this.frontendMetadata = new FrontendMetadata(name);
@@ -214,15 +216,12 @@ public class Shard extends RaftActor {
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
                 new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
-        ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
-                    treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
+                    treeChangeListenerPublisher, name, frontendMetadata);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
-                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
-                    dataChangeListenerPublisher, name, frontendMetadata);
+                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@@ -339,8 +338,6 @@ public class Shard extends RaftActor {
                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
             } else if (CloseTransactionChain.isSerializedType(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
-            } else if (message instanceof RegisterChangeListener) {
-                changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof RegisterDataTreeChangeListener) {
                 treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof UpdateSchemaContext) {
@@ -370,7 +367,10 @@ public class Shard extends RaftActor {
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else if (message instanceof PersistAbortTransactionPayload) {
                 final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
-                persistPayload(txId, AbortTransactionPayload.create(txId), true);
+                persistPayload(txId, AbortTransactionPayload.create(txId,
+                    datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
+                persistPayload(txId, PurgeTransactionPayload.create(txId,
+                    datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
             } else if (message instanceof MakeLeaderLocal) {
                 onMakeLeaderLocal();
             } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
@@ -400,9 +400,7 @@ public class Shard extends RaftActor {
                         responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
                             .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
                             .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
-                            .onFailureCallback(t -> {
-                                LOG.warn("Error slicing response {}", success, t);
-                            }).build()));
+                            .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
                 } else {
                     envelope.sendSuccess(success, executionTimeNanos);
                 }
@@ -441,6 +439,32 @@ public class Shard extends RaftActor {
         return Optional.of(state.getLastConnectTicks());
     }
 
+    private void disableTracking(final DisableTrackingPayload payload) {
+        final ClientIdentifier clientId = payload.getIdentifier();
+        LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+        frontendMetadata.disableTracking(clientId);
+
+        if (isLeader()) {
+            final FrontendIdentifier frontendId = clientId.getFrontendId();
+            final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+            if (frontend != null) {
+                if (clientId.equals(frontend.getIdentifier())) {
+                    if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+                        verify(knownFrontends.replace(frontendId, frontend,
+                            new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+                        LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+                    } else {
+                        LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+                    }
+                } else {
+                    LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+                }
+            } else {
+                LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+            }
+        }
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
@@ -468,8 +492,7 @@ public class Shard extends RaftActor {
     }
 
     // Acquire our frontend tracking handle and verify generation matches
-    @Nullable
-    private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
+    private @Nullable LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
         final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
         if (existing != null) {
             final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
@@ -479,7 +502,8 @@ public class Shard extends RaftActor {
             }
             if (cmp > 0) {
                 LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
-                throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+                throw new RetiredGenerationException(clientId.getGeneration(),
+                    existing.getIdentifier().getGeneration());
             }
 
             LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
@@ -502,8 +526,7 @@ public class Shard extends RaftActor {
         throw new OutOfSequenceEnvelopeException(0);
     }
 
-    @Nonnull
-    private static ABIVersion selectVersion(final ConnectClientRequest message) {
+    private static @NonNull ABIVersion selectVersion(final ConnectClientRequest message) {
         final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
         for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
             if (clientRange.contains(v)) {
@@ -535,7 +558,7 @@ public class Shard extends RaftActor {
             final ABIVersion selectedVersion = selectVersion(message);
             final LeaderFrontendState frontend;
             if (existing == null) {
-                frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+                frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
                 knownFrontends.put(clientId.getFrontendId(), frontend);
                 LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
             } else {
@@ -551,8 +574,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Nullable
-    private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || paused || !isLeaderActive()) {
@@ -594,6 +616,10 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    String getShardName() {
+        return shardName;
+    }
+
     @Override
     protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
             final short leaderPayloadVersion) {
@@ -762,7 +788,8 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
 
                 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
-                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+                        forwardedReady.getParticipatingShardNames());
                 readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(readyLocal, getContext());
             }
@@ -789,9 +816,16 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
-        store.closeTransactionChain(id, null);
-        store.purgeTransactionChain(id, null);
+        if (isLeader()) {
+            final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+            // FIXME: CONTROLLER-1628: stage purge once no transactions are present
+            store.closeTransactionChain(id, null);
+            store.purgeTransactionChain(id, null);
+        } else if (getLeader() != null) {
+            getLeader().forward(closeTransactionChain, getContext());
+        } else {
+            LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier());
+        }
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -839,7 +873,6 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
         if (restoreFromSnapshot == null) {
             return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
@@ -859,7 +892,7 @@ public class Shard extends RaftActor {
         if (txCommitTimeoutCheckSchedule == null) {
             // Schedule a message to be periodically sent to check if the current in-progress
             // transaction should be expired and aborted.
-            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
                     period, period, getSelf(),
                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
@@ -869,6 +902,11 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
+            if (data instanceof DisableTrackingPayload) {
+                disableTracking((DisableTrackingPayload) data);
+                return;
+            }
+
             try {
                 store.applyReplicatedPayload(identifier, (Payload)data);
             } catch (DataValidationFailedException | IOException e) {
@@ -883,7 +921,6 @@ public class Shard extends RaftActor {
     protected void onStateChanged() {
         boolean isLeader = isLeader();
         boolean hasLeader = hasLeader();
-        changeSupport.onLeadershipChange(isLeader, hasLeader);
         treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
 
         // If this actor is no longer the leader close all the transaction chains
@@ -932,6 +969,8 @@ public class Shard extends RaftActor {
                             messagesToForward.size(), leader);
 
                     for (Object message : messagesToForward) {
+                        LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
                         leader.tell(message, self());
                     }
                 }
@@ -986,7 +1025,6 @@ public class Shard extends RaftActor {
     @Override
     protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
         return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
-                .dataChangeListenerActors(changeSupport.getListenerActors())
                 .commitCohortActors(store.getCohortActors());
     }
 
@@ -1025,7 +1063,7 @@ public class Shard extends RaftActor {
         private DatastoreContext datastoreContext;
         private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
-        private TipProducingDataTree dataTree;
+        private DataTree dataTree;
         private volatile boolean sealed;
 
         protected AbstractBuilder(final Class<S> shardClass) {
@@ -1059,9 +1097,9 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
+        public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
             checkSealed();
-            this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
+            this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
             return self();
         }
 
@@ -1071,7 +1109,7 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T dataTree(final TipProducingDataTree newDataTree) {
+        public T dataTree(final DataTree newDataTree) {
             checkSealed();
             this.dataTree = newDataTree;
             return self();
@@ -1097,7 +1135,7 @@ public class Shard extends RaftActor {
             return restoreFromSnapshot;
         }
 
-        public TipProducingDataTree getDataTree() {
+        public DataTree getDataTree() {
             return dataTree;
         }
 
@@ -1128,7 +1166,7 @@ public class Shard extends RaftActor {
     }
 
     public static class Builder extends AbstractBuilder<Builder, Shard> {
-        private Builder() {
+        Builder() {
             super(Shard.class);
         }
     }