treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
+ dataChangeListenerPublisher, name);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
}
if (message instanceof RequestEnvelope) {
+ final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
+
try {
- final RequestSuccess<?, ?> success = handleRequest(envelope);
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
if (success != null) {
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, ticker().read() - now);
}
} catch (RequestException e) {
LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e);
+ envelope.sendFailure(e, ticker().read() - now);
} catch (Exception e) {
LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
}
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
if (request instanceof TransactionRequest) {
final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
- return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
} else if (request instanceof LocalHistoryRequest) {
final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
- return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
updateConfigParams(datastoreContext.getShardRaftConfig());
}
- boolean canSkipPayload() {
- // If we do not have any followers and we are not using persistence we can apply modification to the state
- // immediately
- return !hasFollowers() && !persistence().isRecoveryApplicable();
- }
-
// applyState() will be invoked once consensus is reached on the payload
- void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
- // We are faking the sender
- persistData(self(), transactionId, payload);
+ void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
+ boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ if (canSkipPayload) {
+ applyState(self(), transactionId, payload);
+ } else {
+ // We are faking the sender
+ persistData(self(), transactionId, payload, batchHint);
+ }
}
private void handleCommitTransaction(final CommitTransaction commit) {