import java.util.function.Supplier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
} else if (message instanceof DeleteSnapshotsSuccess) {
- LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
+ LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
} else if (message instanceof RegisterRoleChangeListenerReply) {
LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
} else if (message instanceof ClusterEvent.MemberEvent) {
private void onInitConfigListener() {
LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
- final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
org.opendaylight.mdsal.common.api.LogicalDatastoreType
.valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
}
configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
- configUpdateHandler.initListener(dataStore, type);
+ configUpdateHandler.initListener(dataStore, datastoreType);
}
private void onShutDown() {
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
shardReplicaOperationsInProgress.remove(shardName);
- String msg = String.format("RemoveServer request to leader %s for shard %s failed",
- primaryPath, shardName);
- LOG.debug("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+ shardName, failure);
// FAILURE
- sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+ failure)), self());
} else {
// SUCCESS
self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
shardReplicaOperationsInProgress.remove(shardName);
- String msg = String.format("RemoveServer request to leader %s for shard %s failed",
- primaryPath, shardName);
-
- LOG.debug("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+ shardName, failure);
// FAILURE
- sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+ failure)), self());
} else {
// SUCCESS
self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
}
self().tell((RunnableMessage) () -> {
+ // At any rate, invalidate primaryShardInfo cache
+ primaryShardInfoCache.remove(shardName);
+
shardActorsStopping.remove(shardName);
notifyOnCompleteTasks(failure, result);
}, ActorRef.noSender());
configuration.addPrefixShardConfiguration(config);
final Builder builder = newShardDatastoreContextBuilder(shardName);
- builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ builder.logicalStoreType(config.getPrefix().getDatastoreType())
.storeRoot(config.getPrefix().getRootIdentifier());
DatastoreContext shardDatastoreContext = builder.build();
final ActorRef sender = getSender();
if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
+ // why is a non-actor sending this message? Just ignore.
+ return;
}
String actorName = sender.path().name();
try {
shardId = ShardIdentifier.fromShardIdString(actorName);
} catch (IllegalArgumentException e) {
- LOG.debug("{}: ignoring actor {}", actorName, e);
+ LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e);
return;
}
}
private void markMemberUnavailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
private void markMemberAvailable(final MemberName memberName) {
- final String memberStr = memberName.getName();
for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- // XXX: why are we using String#contains() here?
- if (leaderId != null && leaderId.contains(memberStr)) {
+ if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
- LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+ LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
for (ShardInformation info : localShards.values()) {
info.setSchemaContext(schemaContext);
if (info.getActor() == null) {
LOG.debug("Creating Shard {}", info.getShardId());
info.setActor(newShardActor(info));
+ // Update peer address for every existing peer memeber to avoid missing sending
+ // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
+ String shardName = info.getShardName();
+ for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
+ String peerId = getShardIdentifier(memberName, shardName).toString() ;
+ String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
+ info.updatePeerAddress(peerId, peerAddress, getSelf());
+ info.peerUp(memberName, peerId, getSelf());
+ LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
+ persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
+ }
} else {
info.getActor().tell(message, getSelf());
}
}
}
- restoreFromSnapshot = null; // null out to GC
+ // null out to GC
+ restoreFromSnapshot = null;
for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
if (shardReplicaOperationsInProgress.contains(shardName)) {
- String msg = String.format("A shard replica operation for %s is already in progress", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
+ sender.tell(new Status.Failure(new IllegalStateException(
+ String.format("A shard replica operation for %s is already in progress", shardName))), getSelf());
return true;
}
// Create the localShard
if (schemaContext == null) {
- String msg = String.format(
- "No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
+ persistenceId(), shardName);
+ getSender().tell(new Status.Failure(new IllegalStateException(
+ "No SchemaContext is available in order to create a local shard instance for " + shardName)),
+ getSelf());
return;
}
// verify the shard with the specified name is present in the cluster configuration
if (!this.configuration.isShardConfigured(shardName)) {
- String msg = String.format("No module configuration exists for shard %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
+ getSender().tell(new Status.Failure(new IllegalArgumentException(
+ "No module configuration exists for shard " + shardName)), getSelf());
return;
}
// Create the localShard
if (schemaContext == null) {
- String msg = String.format(
- "No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
+ persistenceId(), shardName);
+ getSender().tell(new Status.Failure(new IllegalStateException(
+ "No SchemaContext is available in order to create a local shard instance for " + shardName)),
+ getSelf());
return;
}
}
private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
- String msg = String.format("Local shard %s already exists", shardName);
- LOG.debug("{}: {}", persistenceId(), msg);
- sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
+ sender.tell(new Status.Failure(new AlreadyExistsException(
+ String.format("Local shard %s already exists", shardName))), getSelf());
}
private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
sender);
} else if (response instanceof LocalShardNotFound) {
- String msg = String.format("Local shard %s does not exist", shardName);
- LOG.debug("{}: {}", persistenceId, msg);
- sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
+ sender.tell(new Status.Failure(new IllegalArgumentException(
+ String.format("Local shard %s does not exist", shardName))), self());
} else {
- String msg = String.format("Failed to find local shard %s: received response: %s",
- shardName, response);
- LOG.debug("{}: {}", persistenceId, msg);
- sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
- new RuntimeException(msg)), self());
+ LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
+ response);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
+ : new RuntimeException(
+ String.format("Failed to find local shard %s: received response: %s", shardName,
+ response))), self());
}
}
}
public void onComplete(final Throwable failure, final Object response) {
shardReplicaOperationsInProgress.remove(shardName);
if (failure != null) {
- String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
- shardActorRef.path());
- LOG.debug("{}: {}", persistenceId(), msg, failure);
- sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
+ shardActorRef.path(), failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path()), failure)), self());
} else {
LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
@Override
public void onUnknownResponse(final Object response) {
- String msg = String.format("Failed to find leader for shard %s: received response: %s",
- shardName, response);
- LOG.debug("{}: {}", persistenceId, msg);
- targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
- new RuntimeException(msg)), shardManagerActor);
+ LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
+ response);
+ targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
+ : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response))), shardManagerActor);
}
}