import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* The ShardManager has the following jobs,
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
} else if(message instanceof ShardLeaderStateChanged) {
- onLeaderStateChanged((ShardLeaderStateChanged)message);
+ onLeaderStateChanged((ShardLeaderStateChanged) message);
+ } else if(message instanceof SwitchShardBehavior){
+ onSwitchShardBehavior((SwitchShardBehavior) message);
} else {
unknownMessage(message);
}
LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+ FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+ if(shardInformation.isShardInitialized()) {
+ // If the shard is already initialized then we'll wait enough time for the shard to
+ // elect a leader, ie 2 times the election timeout.
+ timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+ .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+ }
+
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
- datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+ timeout, getSelf(),
new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
getContext().dispatcher(), getSelf());
}
private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
- return new NoShardLeaderException(String.format(
- "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
- "recovering and a leader is being elected. Try again later.", shardId));
+ return new NoShardLeaderException(null, shardId.toString());
}
private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
}
}
+ private void onSwitchShardBehavior(SwitchShardBehavior message) {
+ ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+ ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+ if(shardInformation != null && shardInformation.getActor() != null) {
+ shardInformation.getActor().tell(
+ new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+ } else {
+ LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+ message.getShardName(), message.getNewState());
+ }
+ }
+
/**
* Notifies all the local shards of a change in the schema context
*
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
}
- mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+ mBean.setShardManager(this);
}
/**
}
boolean isShardReadyWithLeaderId() {
- return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+ return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+ (isLeader() || peerAddresses.get(leaderId) != null);
}
boolean isShardInitialized() {