}
private void handleCommitTransaction(final CommitTransaction commit) {
+ final TransactionIdentifier txId = commit.getTransactionId();
if (isLeader()) {
- commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+ askProtocolEncountered(txId);
+ commitCoordinator.handleCommit(txId, getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
- messageRetrySupport.addMessageToRetry(commit, getSender(),
- "Could not commit transaction " + commit.getTransactionId());
+ messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
} else {
LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
leader.forward(commit, getContext());
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
- LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionId());
+ final TransactionIdentifier txId = canCommit.getTransactionId();
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionId(), getSender(), this);
+ askProtocolEncountered(txId);
+ commitCoordinator.handleCanCommit(txId, getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
messageRetrySupport.addMessageToRetry(canCommit, getSender(),
- "Could not canCommit transaction " + canCommit.getTransactionId());
+ "Could not canCommit transaction " + txId);
} else {
LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
leader.forward(canCommit, getContext());
@SuppressWarnings("checkstyle:IllegalCatch")
protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
+ askProtocolEncountered(batched.getTransactionId());
+
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
} catch (Exception e) {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
+ askProtocolEncountered(forwardedReady.getTransactionId());
commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
}
private void handleAbortTransaction(final AbortTransaction abort) {
- doAbortTransaction(abort.getTransactionId(), getSender());
+ final TransactionIdentifier transactionId = abort.getTransactionId();
+ askProtocolEncountered(transactionId);
+ doAbortTransaction(transactionId, getSender());
}
void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
if (isLeader()) {
final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+ askProtocolEncountered(id.getClientId());
+
// FIXME: CONTROLLER-1628: stage purge once no transactions are present
store.closeTransactionChain(id, null);
store.purgeTransactionChain(id, null);
@SuppressWarnings("checkstyle:IllegalCatch")
private void createTransaction(final CreateTransaction createTransaction) {
+ askProtocolEncountered(createTransaction.getTransactionId());
+
try {
if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
&& failIfIsolatedLeader(getSender())) {
transactionId);
}
+ // Called on leader only
+ private void askProtocolEncountered(final TransactionIdentifier transactionId) {
+ askProtocolEncountered(transactionId.getHistoryId().getClientId());
+ }
+
+ // Called on leader only
+ private void askProtocolEncountered(final ClientIdentifier clientId) {
+ final LeaderFrontendState state = knownFrontends.get(clientId.getFrontendId());
+ if (state instanceof LeaderFrontendState.Enabled) {
+ LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+ persistPayload(clientId, DisableTrackingPayload.create(clientId,
+ datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
+ }
+ }
+
private void updateSchemaContext(final UpdateSchemaContext message) {
updateSchemaContext(message.getSchemaContext());
}