/// The name of this shard
private final String name;
+ private final String shardName;
+
private final ShardStats shardMBean;
private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.name = builder.getId().toString();
+ this.shardName = builder.getId().getShardName();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.frontendMetadata = new FrontendMetadata(name);
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else if (message instanceof PersistAbortTransactionPayload) {
final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
- persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ persistPayload(txId, AbortTransactionPayload.create(
+ txId, datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
.message(envelope.newSuccessEnvelope(success, executionTimeNanos))
.sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
- .onFailureCallback(t -> {
- LOG.warn("Error slicing response {}", success, t);
- }).build()));
+ .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
} else {
envelope.sendSuccess(success, executionTimeNanos);
}
}
if (cmp > 0) {
LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
- throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+ throw new RetiredGenerationException(clientId.getGeneration(),
+ existing.getIdentifier().getGeneration());
}
LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
return roleChangeNotifier;
}
+ String getShardName() {
+ return shardName;
+ }
+
@Override
protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
final short leaderPayloadVersion) {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
- forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+ forwardedReady.getParticipatingShardNames());
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
}
messagesToForward.size(), leader);
for (Object message : messagesToForward) {
+ LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
leader.tell(message, self());
}
}