X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManagerGetSnapshotReplyActor.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManagerGetSnapshotReplyActor.java;h=a1628bb37c050b0410f96930d8978c7ca5491450;hp=0000000000000000000000000000000000000000;hb=f86f7e8c204fb19615c45e669a764c623576e1a3;hpb=40460ae356add6bd8d28a25cf8b287c9bfa38b38 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActor.java new file mode 100644 index 0000000000..a1628bb37c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActor.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.shardmanager; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.actor.ReceiveTimeout; +import akka.actor.Status.Failure; +import akka.actor.UntypedActor; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeoutException; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +/** + * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return + * a DatastoreSnapshot instance reply. + * + * @author Thomas Pantelis + */ +class ShardManagerGetSnapshotReplyActor extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class); + + private final Set remainingShardNames; + private final Params params; + private final List shardSnapshots = new ArrayList<>(); + + private ShardManagerGetSnapshotReplyActor(Params params) { + this.params = params; + remainingShardNames = new HashSet<>(params.shardNames); + + LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size()); + + getContext().setReceiveTimeout(params.receiveTimeout); + } + + @Override + public void onReceive(Object message) { + if(message instanceof GetSnapshotReply) { + onGetSnapshotReply((GetSnapshotReply)message); + } else if(message instanceof Failure) { + LOG.debug("{}: Received {}", params.id, message); + + params.replyToActor.tell(message, getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } else if (message instanceof ReceiveTimeout) { + String msg = String.format( + "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.", + params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(), + remainingShardNames); + LOG.warn("{}: {}", params.id, msg); + params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) { + LOG.debug("{}: Received {}", params.id, getSnapshotReply); + + ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build(); + shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot())); + + remainingShardNames.remove(shardId.getShardName()); + if(remainingShardNames.isEmpty()) { + LOG.debug("{}: All shard snapshots received", params.id); + + DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot, + shardSnapshots); + params.replyToActor.tell(datastoreSnapshot, getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + public static Props props(Collection shardNames, String datastoreType, byte[] shardManagerSnapshot, + ActorRef replyToActor, String id, Duration receiveTimeout) { + return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType, + shardManagerSnapshot, replyToActor, id, receiveTimeout)); + } + + private static final class Params { + final Collection shardNames; + final String datastoreType; + final byte[] shardManagerSnapshot; + final ActorRef replyToActor; + final String id; + final Duration receiveTimeout; + + Params(Collection shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, + String id, Duration receiveTimeout) { + this.shardNames = shardNames; + this.datastoreType = datastoreType; + this.shardManagerSnapshot = shardManagerSnapshot; + this.replyToActor = replyToActor; + this.id = id; + this.receiveTimeout = receiveTimeout; + } + } +}