2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.PoisonPill;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.Status.Failure;
15 import akka.actor.UntypedActor;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
24 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
25 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.Duration;
31 * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
32 * a DatastoreSnapshot instance reply.
34 * @author Thomas Pantelis
36 class ShardManagerGetSnapshotReplyActor extends UntypedActor {
37 private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
39 private final Set<String> remainingShardNames;
40 private final Params params;
41 private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
43 private ShardManagerGetSnapshotReplyActor(Params params) {
45 remainingShardNames = new HashSet<>(params.shardNames);
47 LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
49 getContext().setReceiveTimeout(params.receiveTimeout);
53 public void onReceive(Object message) {
54 if(message instanceof GetSnapshotReply) {
55 onGetSnapshotReply((GetSnapshotReply)message);
56 } else if(message instanceof Failure) {
57 LOG.debug("{}: Received {}", params.id, message);
59 params.replyToActor.tell(message, getSelf());
60 getSelf().tell(PoisonPill.getInstance(), getSelf());
61 } else if (message instanceof ReceiveTimeout) {
62 String msg = String.format(
63 "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
64 params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
66 LOG.warn("{}: {}", params.id, msg);
67 params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
68 getSelf().tell(PoisonPill.getInstance(), getSelf());
72 private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
73 LOG.debug("{}: Received {}", params.id, getSnapshotReply);
75 ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
76 shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
78 remainingShardNames.remove(shardId.getShardName());
79 if(remainingShardNames.isEmpty()) {
80 LOG.debug("{}: All shard snapshots received", params.id);
82 DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
84 params.replyToActor.tell(datastoreSnapshot, getSelf());
85 getSelf().tell(PoisonPill.getInstance(), getSelf());
89 public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
90 ActorRef replyToActor, String id, Duration receiveTimeout) {
91 return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
92 shardManagerSnapshot, replyToActor, id, receiveTimeout));
95 private static final class Params {
96 final Collection<String> shardNames;
97 final String datastoreType;
98 final byte[] shardManagerSnapshot;
99 final ActorRef replyToActor;
101 final Duration receiveTimeout;
103 Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
104 String id, Duration receiveTimeout) {
105 this.shardNames = shardNames;
106 this.datastoreType = datastoreType;
107 this.shardManagerSnapshot = shardManagerSnapshot;
108 this.replyToActor = replyToActor;
110 this.receiveTimeout = receiveTimeout;