2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import akka.actor.ActorRef;
11 import akka.actor.PoisonPill;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.Status.Failure;
15 import akka.actor.UntypedActor;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
24 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
25 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
26 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.Duration;
32 * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
33 * a DatastoreSnapshot instance reply.
35 * @author Thomas Pantelis
37 class ShardManagerGetSnapshotReplyActor extends UntypedActor {
38 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
40 private final Set<String> remainingShardNames;
41 private final Params params;
42 private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
44 private ShardManagerGetSnapshotReplyActor(Params params) {
46 remainingShardNames = new HashSet<>(params.shardNames);
48 LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
50 getContext().setReceiveTimeout(params.receiveTimeout);
54 public void onReceive(Object message) {
55 if (message instanceof GetSnapshotReply) {
56 onGetSnapshotReply((GetSnapshotReply)message);
57 } else if (message instanceof Failure) {
58 LOG.debug("{}: Received {}", params.id, message);
60 params.replyToActor.tell(message, getSelf());
61 getSelf().tell(PoisonPill.getInstance(), getSelf());
62 } else if (message instanceof ReceiveTimeout) {
63 String msg = String.format(
64 "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s "
65 + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(),
66 remainingShardNames.size(), remainingShardNames);
67 LOG.warn("{}: {}", params.id, msg);
68 params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
69 getSelf().tell(PoisonPill.getInstance(), getSelf());
73 private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
74 LOG.debug("{}: Received {}", params.id, getSnapshotReply);
76 ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId());
77 shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
79 remainingShardNames.remove(shardId.getShardName());
80 if (remainingShardNames.isEmpty()) {
81 LOG.debug("{}: All shard snapshots received", params.id);
83 DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType,
84 params.shardManagerSnapshot, shardSnapshots);
85 params.replyToActor.tell(datastoreSnapshot, getSelf());
86 getSelf().tell(PoisonPill.getInstance(), getSelf());
90 public static Props props(Collection<String> shardNames, String datastoreType,
91 ShardManagerSnapshot shardManagerSnapshot, ActorRef replyToActor, String id, Duration receiveTimeout) {
92 return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
93 shardManagerSnapshot, replyToActor, id, receiveTimeout));
96 private static final class Params {
97 final Collection<String> shardNames;
98 final String datastoreType;
99 final ShardManagerSnapshot shardManagerSnapshot;
100 final ActorRef replyToActor;
102 final Duration receiveTimeout;
104 Params(Collection<String> shardNames, String datastoreType, ShardManagerSnapshot shardManagerSnapshot,
105 ActorRef replyToActor, String id, Duration receiveTimeout) {
106 this.shardNames = shardNames;
107 this.datastoreType = datastoreType;
108 this.shardManagerSnapshot = shardManagerSnapshot;
109 this.replyToActor = replyToActor;
111 this.receiveTimeout = receiveTimeout;