import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.Status.Failure;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedAbstractActor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
*
* @author Thomas Pantelis
*/
-class ShardManagerGetSnapshotReplyActor extends UntypedActor {
+final class ShardManagerGetSnapshotReplyActor extends UntypedAbstractActor {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
private final Set<String> remainingShardNames;
private final Params params;
private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
- private ShardManagerGetSnapshotReplyActor(Params params) {
+ private ShardManagerGetSnapshotReplyActor(final Params params) {
this.params = params;
remainingShardNames = new HashSet<>(params.shardNames);
}
@Override
- public void onReceive(Object message) {
+ public void onReceive(final Object message) {
if (message instanceof GetSnapshotReply) {
onGetSnapshotReply((GetSnapshotReply)message);
} else if (message instanceof Failure) {
params.replyToActor.tell(message, getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
- String msg = String.format(
- "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s "
- + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(),
- remainingShardNames.size(), remainingShardNames);
- LOG.warn("{}: {}", params.id, msg);
- params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
+ LOG.warn("{}: Timed out after {} ms while waiting for snapshot replies from {} shard(s). "
+ + "{} shard(s) {} did not respond", params.id, params.receiveTimeout.toMillis(),
+ params.shardNames.size(), remainingShardNames.size(), remainingShardNames);
+ params.replyToActor.tell(new Failure(new TimeoutException(String.format(
+ "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s "
+ + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(),
+ remainingShardNames.size(), remainingShardNames))), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
}
- private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
+ private void onGetSnapshotReply(final GetSnapshotReply getSnapshotReply) {
LOG.debug("{}: Received {}", params.id, getSnapshotReply);
ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId());
}
}
- public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
- ActorRef replyToActor, String id, Duration receiveTimeout) {
+ public static Props props(final Collection<String> shardNames, final String datastoreType,
+ final ShardManagerSnapshot shardManagerSnapshot, final ActorRef replyToActor, final String id,
+ final FiniteDuration receiveTimeout) {
return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
shardManagerSnapshot, replyToActor, id, receiveTimeout));
}
private static final class Params {
final Collection<String> shardNames;
final String datastoreType;
- final byte[] shardManagerSnapshot;
+ final ShardManagerSnapshot shardManagerSnapshot;
final ActorRef replyToActor;
final String id;
- final Duration receiveTimeout;
+ final FiniteDuration receiveTimeout;
- Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
- String id, Duration receiveTimeout) {
+ Params(final Collection<String> shardNames, final String datastoreType,
+ final ShardManagerSnapshot shardManagerSnapshot, final ActorRef replyToActor, final String id,
+ final FiniteDuration receiveTimeout) {
this.shardNames = shardNames;
this.datastoreType = datastoreType;
this.shardManagerSnapshot = shardManagerSnapshot;