Allow transaction tracking to be disabled
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 97b4a48f1023e3959d2056bc2f110cdc25f60e24..fb6b0142fe440905bbdd55c81c3ff23772ee8c9b 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Verify.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
@@ -83,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
@@ -436,6 +439,32 @@ public class Shard extends RaftActor {
         return Optional.of(state.getLastConnectTicks());
     }
 
+    private void disableTracking(final DisableTrackingPayload payload) {
+        final ClientIdentifier clientId = payload.getIdentifier();
+        LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+        frontendMetadata.disableTracking(clientId);
+
+        if (isLeader()) {
+            final FrontendIdentifier frontendId = clientId.getFrontendId();
+            final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+            if (frontend != null) {
+                if (clientId.equals(frontend.getIdentifier())) {
+                    if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+                        verify(knownFrontends.replace(frontendId, frontend,
+                            new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+                        LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+                    } else {
+                        LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+                    }
+                } else {
+                    LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+                }
+            } else {
+                LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+            }
+        }
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
@@ -529,7 +558,7 @@ public class Shard extends RaftActor {
             final ABIVersion selectedVersion = selectVersion(message);
             final LeaderFrontendState frontend;
             if (existing == null) {
-                frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+                frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
                 knownFrontends.put(clientId.getFrontendId(), frontend);
                 LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
             } else {
@@ -873,6 +902,11 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
+            if (data instanceof DisableTrackingPayload) {
+                disableTracking((DisableTrackingPayload) data);
+                return;
+            }
+
             try {
                 store.applyReplicatedPayload(identifier, (Payload)data);
             } catch (DataValidationFailedException | IOException e) {