import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- private final FrontendMetadata frontendMetadata = new FrontendMetadata();
- private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
+ private final FrontendMetadata frontendMetadata;
+ private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
this.name = builder.getId().toString();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
+ this.frontendMetadata = new FrontendMetadata(name);
setPersistence(datastoreContext.isPersistent());
persistenceId(), getId());
}
- store.closeAllTransactionChains();
+ store.purgeLeaderState();
}
if (hasLeader && !isIsolatedLeader()) {
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- boolean hasLeader = hasLeader();
- if (hasLeader && !isLeader()) {
+ final boolean hasLeader = hasLeader();
+ if (!hasLeader) {
+ // No leader implies we are not the leader, lose frontend state if we have any. This also places
+ // an explicit guard so the map will not get modified accidentally.
+ if (!knownFrontends.isEmpty()) {
+ LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ knownFrontends = ImmutableMap.of();
+ }
+ return;
+ }
+
+ if (!isLeader()) {
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ "change and the leader address isn't available.", this);
}
+ } else {
+ // We have become the leader, we need to reconstruct frontend state
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
}
- if (hasLeader && !isIsolatedLeader()) {
+ if (!isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}