PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
- store.checkForExpiredTransactions(transactionCommitTimeout);
- commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ commitTimeoutCheck();
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if (message instanceof RegisterRoleChangeListener) {
}
}
+ private void commitTimeoutCheck() {
+ store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ }
+
+ private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+ final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ return state == null ? Optional.absent() : Optional.of(state.getLastConnectTicks());
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {