import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private final FrontendMetadata frontendMetadata;
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+ private boolean paused;
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
private void handleConnectClient(final ConnectClientRequest message) {
try {
if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
throw new NotLeaderException(getSelf());
}
private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
- if (!isLeader() || !isLeaderActive()) {
- LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+ if (!isLeader() || paused || !isLeaderActive()) {
+ LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
throw new NotLeaderException(getSelf());
}
persistenceId(), getId());
}
+ paused = false;
store.purgeLeaderState();
}
@Override
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+ paused = false;
- final boolean hasLeader = hasLeader();
- if (!hasLeader) {
- // No leader implies we are not the leader, lose frontend state if we have any. This also places
- // an explicit guard so the map will not get modified accidentally.
+ if (!isLeader()) {
if (!knownFrontends.isEmpty()) {
LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
knownFrontends = ImmutableMap.of();
}
- return;
- }
- if (!isLeader()) {
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
@Override
protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected void unpauseLeader() {
+ LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
+ store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ }
+
@Override
protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
+ private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private TipProducingDataTree dataTree;
private volatile boolean sealed;
return self();
}
- public T schemaContext(final SchemaContext newSchemaContext) {
+ public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
checkSealed();
- this.schemaContext = newSchemaContext;
+ this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
return self();
}
}
public SchemaContext getSchemaContext() {
- return schemaContext;
+ return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
}
public Props props() {