import akka.actor.Status.Failure;
import akka.actor.UntypedActor;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
class ShardManagerGetSnapshotReplyActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
- private int repliesReceived;
+ private final Set<String> remainingShardNames;
private final Params params;
private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
private ShardManagerGetSnapshotReplyActor(Params params) {
this.params = params;
+ remainingShardNames = new HashSet<>(params.shardNames);
- LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
+ LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
getContext().setReceiveTimeout(params.receiveTimeout);
}
params.replyToActor.tell(message, getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
- LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
- params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
-
- params.replyToActor.tell(new Failure(new TimeoutException(String.format(
- "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
- params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
+ String msg = String.format(
+ "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
+ params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
+ remainingShardNames);
+ LOG.warn("{}: {}", params.id, msg);
+ params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
}
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
- if(++repliesReceived == params.totalShardCount) {
+ remainingShardNames.remove(shardId.getShardName());
+ if(remainingShardNames.isEmpty()) {
LOG.debug("{}: All shard snapshots received", params.id);
DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
}
}
- public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
+ public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
ActorRef replyToActor, String id, Duration receiveTimeout) {
- return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
+ return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
shardManagerSnapshot, replyToActor, id, receiveTimeout));
}
private static final class Params {
- final int totalShardCount;
+ final Collection<String> shardNames;
final String datastoreType;
final byte[] shardManagerSnapshot;
final ActorRef replyToActor;
final String id;
final Duration receiveTimeout;
- Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
+ Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
String id, Duration receiveTimeout) {
- this.totalShardCount = totalShardCount;
+ this.shardNames = shardNames;
this.datastoreType = datastoreType;
this.shardManagerSnapshot = shardManagerSnapshot;
this.replyToActor = replyToActor;