super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
- this.name = builder.getId().toString();
- this.shardName = builder.getId().getShardName();
- this.datastoreContext = builder.getDatastoreContext();
- this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
- this.frontendMetadata = new FrontendMetadata(name);
+ name = builder.getId().toString();
+ shardName = builder.getId().getShardName();
+ datastoreContext = builder.getDatastoreContext();
+ restoreFromSnapshot = builder.getRestoreFromSnapshot();
+ frontendMetadata = new FrontendMetadata(name);
setPersistence(datastoreContext.isPersistent());
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
+ commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
setTransactionCommitTimeout();
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
- this.name, datastoreContext);
+ name, datastoreContext);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
- responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+ responseMessageSlicer = MessageSlicer.builder().logContext(name)
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
- requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+ requestMessageAssembler = MessageAssembler.builder().logContext(name)
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.assembledMessageCallback((message, sender) -> self().tell(message, sender))
.expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
- LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId());
+ final TransactionIdentifier txId = message.getTransactionId();
+ LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
+ askProtocolEncountered(txId);
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
- LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
- message.getTransactionId(), e);
+ LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
getSender().tell(new Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not process ready local transaction " + message.getTransactionId());
+ "Could not process ready local transaction " + txId);
} else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
}
@Override
- public String persistenceId() {
- return this.name;
+ public final String persistenceId() {
+ return name;
}
@Override