*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Verify.verify;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else if (message instanceof PersistAbortTransactionPayload) {
final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
- persistPayload(txId, AbortTransactionPayload.create(
- txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
+ persistPayload(txId, AbortTransactionPayload.create(txId,
+ datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
+ persistPayload(txId, PurgeTransactionPayload.create(txId,
+ datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
return Optional.of(state.getLastConnectTicks());
}
+ private void disableTracking(final DisableTrackingPayload payload) {
+ final ClientIdentifier clientId = payload.getIdentifier();
+ LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+ frontendMetadata.disableTracking(clientId);
+
+ if (isLeader()) {
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+ final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+ if (frontend != null) {
+ if (clientId.equals(frontend.getIdentifier())) {
+ if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+ verify(knownFrontends.replace(frontendId, frontend,
+ new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+ LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+ } else {
+ LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+ }
+ } else {
+ LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+ }
+ } else {
+ LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+ }
+ }
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {
final ABIVersion selectedVersion = selectVersion(message);
final LeaderFrontendState frontend;
if (existing == null) {
- frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
knownFrontends.put(clientId.getFrontendId(), frontend);
LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
} else {
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof Payload) {
+ if (data instanceof DisableTrackingPayload) {
+ disableTracking((DisableTrackingPayload) data);
+ return;
+ }
+
try {
store.applyReplicatedPayload(identifier, (Payload)data);
} catch (DataValidationFailedException | IOException e) {