X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=fb6b0142fe440905bbdd55c81c3ff23772ee8c9b;hb=refs%2Fchanges%2F83%2F81983%2F13;hp=897527764c18b2ae26061b014ac22e5e24ab4de5;hpb=bc301a153a07be7c23327e85179974422624c80a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 897527764c..fb6b0142fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Verify.verify; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; @@ -83,6 +85,8 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload; +import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; @@ -363,9 +367,10 @@ public class Shard extends RaftActor { (DataTreeCohortActorRegistry.CohortRegistryCommand) message); } else if (message instanceof PersistAbortTransactionPayload) { final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId(); - persistPayload(txId, AbortTransactionPayload.create( - txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true); - store.purgeTransaction(txId, null); + persistPayload(txId, AbortTransactionPayload.create(txId, + datastoreContext.getInitialPayloadSerializedBufferCapacity()), true); + persistPayload(txId, PurgeTransactionPayload.create(txId, + datastoreContext.getInitialPayloadSerializedBufferCapacity()), false); } else if (message instanceof MakeLeaderLocal) { onMakeLeaderLocal(); } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) { @@ -434,6 +439,32 @@ public class Shard extends RaftActor { return Optional.of(state.getLastConnectTicks()); } + private void disableTracking(final DisableTrackingPayload payload) { + final ClientIdentifier clientId = payload.getIdentifier(); + LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId); + frontendMetadata.disableTracking(clientId); + + if (isLeader()) { + final FrontendIdentifier frontendId = clientId.getFrontendId(); + final LeaderFrontendState frontend = knownFrontends.get(frontendId); + if (frontend != null) { + if (clientId.equals(frontend.getIdentifier())) { + if (!(frontend instanceof LeaderFrontendState.Disabled)) { + verify(knownFrontends.replace(frontendId, frontend, + new LeaderFrontendState.Disabled(persistenceId(), clientId, store))); + LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId); + } else { + LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend); + } + } else { + LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId); + } + } else { + LOG.debug("{}: leader state for {} not found", persistenceId(), clientId); + } + } + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -527,7 +558,7 @@ public class Shard extends RaftActor { final ABIVersion selectedVersion = selectVersion(message); final LeaderFrontendState frontend; if (existing == null) { - frontend = new LeaderFrontendState(persistenceId(), clientId, store); + frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store); knownFrontends.put(clientId.getFrontendId(), frontend); LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); } else { @@ -871,6 +902,11 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { + if (data instanceof DisableTrackingPayload) { + disableTracking((DisableTrackingPayload) data); + return; + } + try { store.applyReplicatedPayload(identifier, (Payload)data); } catch (DataValidationFailedException | IOException e) {