final FrontendIdentifier frontendId = clientId.getFrontendId();
final FrontendClientMetadataBuilder client = clients.get(frontendId);
if (client == null) {
final FrontendIdentifier frontendId = clientId.getFrontendId();
final FrontendClientMetadataBuilder client = clients.get(frontendId);
if (client == null) {
- // When we havent seen the client before, we still need to disable tracking for him since this only gets
+ // When we have not seen the client before, we still need to disable tracking for him since this only gets
// triggered once.
LOG.debug("{}: disableTracking {} does not match any client, pre-disabling client.", shardName, clientId);
clients.put(frontendId, new FrontendClientMetadataBuilder.Disabled(shardName, clientId));
// triggered once.
LOG.debug("{}: disableTracking {} does not match any client, pre-disabling client.", shardName, clientId);
clients.put(frontendId, new FrontendClientMetadataBuilder.Disabled(shardName, clientId));
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
super(builder.getId().toString(), builder.getPeerAddresses(),
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
- this.name = builder.getId().toString();
- this.shardName = builder.getId().getShardName();
- this.datastoreContext = builder.getDatastoreContext();
- this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
- this.frontendMetadata = new FrontendMetadata(name);
+ name = builder.getId().toString();
+ shardName = builder.getId().getShardName();
+ datastoreContext = builder.getDatastoreContext();
+ restoreFromSnapshot = builder.getRestoreFromSnapshot();
+ frontendMetadata = new FrontendMetadata(name);
setPersistence(datastoreContext.isPersistent());
setPersistence(datastoreContext.isPersistent());
getContext().become(new MeteringBehavior(this));
}
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name);
+ commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
setTransactionCommitTimeout();
setTransactionCommitTimeout();
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
- this.name, datastoreContext);
+ name, datastoreContext);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
- responseMessageSlicer = MessageSlicer.builder().logContext(this.name)
+ responseMessageSlicer = MessageSlicer.builder().logContext(name)
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
.messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
- requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+ requestMessageAssembler = MessageAssembler.builder().logContext(name)
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.assembledMessageCallback((message, sender) -> self().tell(message, sender))
.expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.assembledMessageCallback((message, sender) -> self().tell(message, sender))
.expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
- LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId());
+ final TransactionIdentifier txId = message.getTransactionId();
+ LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
+ askProtocolEncountered(txId);
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
- LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
- message.getTransactionId(), e);
+ LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
getSender().tell(new Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
getSender().tell(new Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not process ready local transaction " + message.getTransactionId());
+ "Could not process ready local transaction " + txId);
} else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
} else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
- public String persistenceId() {
- return this.name;
+ public final String persistenceId() {
+ return name;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
}
private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
}
private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
- // FIXME: needs to be enabled
- assumeFalse(true);
// ask based should track no metadata
assertEquals(List.of(), clientMeta.getCurrentHistories());
}
// ask based should track no metadata
assertEquals(List.of(), clientMeta.getCurrentHistories());
}