*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- private final FrontendMetadata frontendMetadata;
+ @VisibleForTesting
+ final FrontendMetadata frontendMetadata;
+
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
private boolean paused;
}
@Override
- public void postStop() {
+ public void postStop() throws Exception {
LOG.info("Stopping Shard {}", persistenceId());
super.postStop();
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
- } else if (message instanceof PersistAbortTransactionPayload) {
- final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
- persistPayload(txId, AbortTransactionPayload.create(
- txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
- store.purgeTransaction(txId, null);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
store.resumeNextPendingTransaction();
+ } else if (GetKnownClients.INSTANCE.equals(message)) {
+ handleGetKnownClients();
} else if (!responseMessageSlicer.handleMessage(message)) {
super.handleNonRaftCommand(message);
}
requestMessageAssembler.checkExpiredAssembledMessageState();
}
- private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+ private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
final LeaderFrontendState state = knownFrontends.get(frontend);
if (state == null) {
// Not tell-based protocol, do nothing
- return Optional.absent();
+ return OptionalLong.empty();
}
if (isIsolatedLeader()) {
// We are isolated and no new request can come through until we emerge from it. We are still updating
// liveness of frontend when we see it attempting to communicate. Use the last access timer.
- return Optional.of(state.getLastSeenTicks());
+ return OptionalLong.of(state.getLastSeenTicks());
}
// If this frontend has freshly connected, give it some time to catch up before killing its transactions.
- return Optional.of(state.getLastConnectTicks());
+ return OptionalLong.of(state.getLastConnectTicks());
+ }
+
+ private void disableTracking(final DisableTrackingPayload payload) {
+ final ClientIdentifier clientId = payload.getIdentifier();
+ LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+ frontendMetadata.disableTracking(clientId);
+
+ if (isLeader()) {
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+ final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+ if (frontend != null) {
+ if (clientId.equals(frontend.getIdentifier())) {
+ if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+ verify(knownFrontends.replace(frontendId, frontend,
+ new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+ LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+ } else {
+ LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+ }
+ } else {
+ LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+ }
+ } else {
+ LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+ knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId,
+ getDataStore()));
+ }
+ }
}
private void onMakeLeaderLocal() {
final ABIVersion selectedVersion = selectVersion(message);
final LeaderFrontendState frontend;
if (existing == null) {
- frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
knownFrontends.put(clientId.getFrontendId(), frontend);
LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
} else {
}
}
+ private void handleGetKnownClients() {
+ final ImmutableSet<ClientIdentifier> clients;
+ if (isLeader()) {
+ clients = knownFrontends.values().stream()
+ .map(LeaderFrontendState::getIdentifier)
+ .collect(ImmutableSet.toImmutableSet());
+ } else {
+ clients = frontendMetadata.getClients();
+ }
+ sender().tell(new GetKnownClientsReply(clients), self());
+ }
+
private boolean hasLeader() {
return getLeaderId() != null;
}
}
private void handleCommitTransaction(final CommitTransaction commit) {
+ final TransactionIdentifier txId = commit.getTransactionId();
if (isLeader()) {
- commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+ askProtocolEncountered(txId);
+ commitCoordinator.handleCommit(txId, getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
- messageRetrySupport.addMessageToRetry(commit, getSender(),
- "Could not commit transaction " + commit.getTransactionId());
+ messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
} else {
LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
leader.forward(commit, getContext());
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId());
+ final TransactionIdentifier txId = canCommit.getTransactionId();
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this);
+ askProtocolEncountered(txId);
+ commitCoordinator.handleCanCommit(txId, getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
messageRetrySupport.addMessageToRetry(canCommit, getSender(),
- "Could not canCommit transaction " + canCommit.getTransactionId());
+ "Could not canCommit transaction " + txId);
} else {
LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
leader.forward(canCommit, getContext());
@SuppressWarnings("checkstyle:IllegalCatch")
protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+ askProtocolEncountered(batched.getTransactionId());
+
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
} catch (Exception e) {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
+ askProtocolEncountered(forwardedReady.getTransactionId());
commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
}
private void handleAbortTransaction(final AbortTransaction abort) {
- doAbortTransaction(abort.getTransactionId(), getSender());
+ final TransactionIdentifier transactionId = abort.getTransactionId();
+ askProtocolEncountered(transactionId);
+ doAbortTransaction(transactionId, getSender());
}
void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
if (isLeader()) {
final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
- // FIXME: CONTROLLER-1628: stage purge once no transactions are present
- store.closeTransactionChain(id, null);
- store.purgeTransactionChain(id, null);
+ askProtocolEncountered(id.getClientId());
+ store.closeTransactionChain(id);
} else if (getLeader() != null) {
getLeader().forward(closeTransactionChain, getContext());
} else {
@SuppressWarnings("checkstyle:IllegalCatch")
private void createTransaction(final CreateTransaction createTransaction) {
+ askProtocolEncountered(createTransaction.getTransactionId());
+
try {
if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
&& failIfIsolatedLeader(getSender())) {
transactionId);
}
+ // Called on leader only
+ private void askProtocolEncountered(final TransactionIdentifier transactionId) {
+ askProtocolEncountered(transactionId.getHistoryId().getClientId());
+ }
+
+ // Called on leader only
+ private void askProtocolEncountered(final ClientIdentifier clientId) {
+ final FrontendIdentifier frontend = clientId.getFrontendId();
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ if (!(state instanceof LeaderFrontendState.Disabled)) {
+ LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+ if (knownFrontends.isEmpty()) {
+ knownFrontends = new HashMap<>();
+ }
+ knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
+
+ persistPayload(clientId, DisableTrackingPayload.create(clientId,
+ datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
+ }
+ }
+
private void updateSchemaContext(final UpdateSchemaContext message) {
updateSchemaContext(message.getSchemaContext());
}
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof Payload) {
+ if (data instanceof DisableTrackingPayload) {
+ disableTracking((DisableTrackingPayload) data);
+ return;
+ }
+
try {
store.applyReplicatedPayload(identifier, (Payload)data);
} catch (DataValidationFailedException | IOException e) {
paused = true;
// Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
- knownFrontends.values().forEach(LeaderFrontendState::retire);
- knownFrontends = ImmutableMap.of();
+ if (datastoreContext.isUseTellBasedProtocol()) {
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+ }
store.setRunOnPendingTransactionsComplete(operation);
}
}
public abstract static class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
- private final Class<S> shardClass;
+ private final Class<? extends S> shardClass;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private DataTree dataTree;
+
private volatile boolean sealed;
- protected AbstractBuilder(final Class<S> shardClass) {
+ protected AbstractBuilder(final Class<? extends S> shardClass) {
this.shardClass = shardClass;
}
protected void checkSealed() {
- Preconditions.checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+ checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
}
@SuppressWarnings("unchecked")
public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
checkSealed();
- this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
+ this.schemaContextProvider = requireNonNull(newSchemaContextProvider);
return self();
}
}
protected void verify() {
- Preconditions.checkNotNull(id, "id should not be null");
- Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
+ requireNonNull(id, "id should not be null");
+ requireNonNull(peerAddresses, "peerAddresses should not be null");
+ requireNonNull(datastoreContext, "dataStoreContext should not be null");
+ requireNonNull(schemaContextProvider, "schemaContextProvider should not be null");
}
public Props props() {
public static class Builder extends AbstractBuilder<Builder, Shard> {
Builder() {
- super(Shard.class);
+ this(Shard.class);
+ }
+
+ Builder(final Class<? extends Shard> shardClass) {
+ super(shardClass);
}
}