BUG-5626: Eliminate ShardIdentifier.Builder
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerGetSnapshotReplyActor.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import akka.actor.ActorRef;
11 import akka.actor.PoisonPill;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.Status.Failure;
15 import akka.actor.UntypedActor;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
24 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
25 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.Duration;
29
30 /**
31  * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
32  * a DatastoreSnapshot instance reply.
33  *
34  * @author Thomas Pantelis
35  */
36 class ShardManagerGetSnapshotReplyActor extends UntypedActor {
37     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
38
39     private final Set<String> remainingShardNames;
40     private final Params params;
41     private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
42
43     private ShardManagerGetSnapshotReplyActor(Params params) {
44         this.params = params;
45         remainingShardNames = new HashSet<>(params.shardNames);
46
47         LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
48
49         getContext().setReceiveTimeout(params.receiveTimeout);
50     }
51
52     @Override
53     public void onReceive(Object message) {
54         if(message instanceof GetSnapshotReply) {
55             onGetSnapshotReply((GetSnapshotReply)message);
56         } else if(message instanceof Failure) {
57             LOG.debug("{}: Received {}", params.id, message);
58
59             params.replyToActor.tell(message, getSelf());
60             getSelf().tell(PoisonPill.getInstance(), getSelf());
61         } else if (message instanceof ReceiveTimeout) {
62             String msg = String.format(
63                     "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
64                         params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
65                         remainingShardNames);
66             LOG.warn("{}: {}", params.id, msg);
67             params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
68             getSelf().tell(PoisonPill.getInstance(), getSelf());
69         }
70     }
71
72     private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
73         LOG.debug("{}: Received {}", params.id, getSnapshotReply);
74
75         ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId());
76         shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
77
78         remainingShardNames.remove(shardId.getShardName());
79         if(remainingShardNames.isEmpty()) {
80             LOG.debug("{}: All shard snapshots received", params.id);
81
82             DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
83                     shardSnapshots);
84             params.replyToActor.tell(datastoreSnapshot, getSelf());
85             getSelf().tell(PoisonPill.getInstance(), getSelf());
86         }
87     }
88
89     public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
90             ActorRef replyToActor, String id, Duration receiveTimeout) {
91         return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
92                 shardManagerSnapshot, replyToActor, id, receiveTimeout));
93     }
94
95     private static final class Params {
96         final Collection<String> shardNames;
97         final String datastoreType;
98         final byte[] shardManagerSnapshot;
99         final ActorRef replyToActor;
100         final String id;
101         final Duration receiveTimeout;
102
103         Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
104                 String id, Duration receiveTimeout) {
105             this.shardNames = shardNames;
106             this.datastoreType = datastoreType;
107             this.shardManagerSnapshot = shardManagerSnapshot;
108             this.replyToActor = replyToActor;
109             this.id = id;
110             this.receiveTimeout = receiveTimeout;
111         }
112     }
113 }