f77b3cbd7b5a418d4013bc3a18cc8bfbdbe9f032
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManagerGetSnapshotReplyActor.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.PoisonPill;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.Status.Failure;
15 import akka.actor.UntypedActor;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.TimeoutException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
21 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
22 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.Duration;
26
27 /**
28  * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
29  * a DatastoreSnapshot instance reply.
30  *
31  * @author Thomas Pantelis
32  */
33 class ShardManagerGetSnapshotReplyActor extends UntypedActor {
34     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
35
36     private int repliesReceived;
37     private final Params params;
38     private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
39
40     private ShardManagerGetSnapshotReplyActor(Params params) {
41         this.params = params;
42
43         LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
44
45         getContext().setReceiveTimeout(params.receiveTimeout);
46     }
47
48     @Override
49     public void onReceive(Object message) {
50         if(message instanceof GetSnapshotReply) {
51             onGetSnapshotReply((GetSnapshotReply)message);
52         } else if(message instanceof Failure) {
53             LOG.debug("{}: Received {}", params.id, message);
54
55             params.replyToActor.tell(message, getSelf());
56             getSelf().tell(PoisonPill.getInstance(), getSelf());
57         } else if (message instanceof ReceiveTimeout) {
58             LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
59                     params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
60
61             params.replyToActor.tell(new Failure(new TimeoutException(String.format(
62                     "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
63                         params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
64             getSelf().tell(PoisonPill.getInstance(), getSelf());
65         }
66     }
67
68     private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
69         LOG.debug("{}: Received {}", params.id, getSnapshotReply);
70
71         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
72         shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
73
74         if(++repliesReceived == params.totalShardCount) {
75             LOG.debug("{}: All shard snapshots received", params.id);
76
77             DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
78                     shardSnapshots);
79             params.replyToActor.tell(datastoreSnapshot, getSelf());
80             getSelf().tell(PoisonPill.getInstance(), getSelf());
81         }
82     }
83
84     public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
85             ActorRef replyToActor, String id, Duration receiveTimeout) {
86         return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
87                 shardManagerSnapshot, replyToActor, id, receiveTimeout));
88     }
89
90     private static final class Params {
91         final int totalShardCount;
92         final String datastoreType;
93         final byte[] shardManagerSnapshot;
94         final ActorRef replyToActor;
95         final String id;
96         final Duration receiveTimeout;
97
98         Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
99                 String id, Duration receiveTimeout) {
100             this.totalShardCount = totalShardCount;
101             this.datastoreType = datastoreType;
102             this.shardManagerSnapshot = shardManagerSnapshot;
103             this.replyToActor = replyToActor;
104             this.id = id;
105             this.receiveTimeout = receiveTimeout;
106         }
107     }
108 }