/* * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore.shardmanager; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.actor.Status.Failure; import akka.actor.UntypedActor; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; /** * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return * a DatastoreSnapshot instance reply. * * @author Thomas Pantelis */ class ShardManagerGetSnapshotReplyActor extends UntypedActor { private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class); private final Set remainingShardNames; private final Params params; private final List shardSnapshots = new ArrayList<>(); private ShardManagerGetSnapshotReplyActor(Params params) { this.params = params; remainingShardNames = new HashSet<>(params.shardNames); LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size()); getContext().setReceiveTimeout(params.receiveTimeout); } @Override public void onReceive(Object message) { if (message instanceof GetSnapshotReply) { onGetSnapshotReply((GetSnapshotReply)message); } else if (message instanceof Failure) { LOG.debug("{}: Received {}", params.id, message); params.replyToActor.tell(message, getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof ReceiveTimeout) { String msg = String.format( "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s " + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(), remainingShardNames); LOG.warn("{}: {}", params.id, msg); params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } } private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) { LOG.debug("{}: Received {}", params.id, getSnapshotReply); ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId()); shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot())); remainingShardNames.remove(shardId.getShardName()); if (remainingShardNames.isEmpty()) { LOG.debug("{}: All shard snapshots received", params.id); DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot, shardSnapshots); params.replyToActor.tell(datastoreSnapshot, getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } } public static Props props(Collection shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, String id, Duration receiveTimeout) { return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType, shardManagerSnapshot, replyToActor, id, receiveTimeout)); } private static final class Params { final Collection shardNames; final String datastoreType; final byte[] shardManagerSnapshot; final ActorRef replyToActor; final String id; final Duration receiveTimeout; Params(Collection shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, String id, Duration receiveTimeout) { this.shardNames = shardNames; this.datastoreType = datastoreType; this.shardManagerSnapshot = shardManagerSnapshot; this.replyToActor = replyToActor; this.id = id; this.receiveTimeout = receiveTimeout; } } }