Revert "Leader should always apply modifications as local"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerGetSnapshotReplyActor.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import akka.actor.ActorRef;
11 import akka.actor.PoisonPill;
12 import akka.actor.Props;
13 import akka.actor.ReceiveTimeout;
14 import akka.actor.Status.Failure;
15 import akka.actor.UntypedAbstractActor;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
24 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
25 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
26 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Temporary actor used by the ShardManager to compile GetSnapshot replies from the Shard actors and return
33  * a DatastoreSnapshot instance reply.
34  *
35  * @author Thomas Pantelis
36  */
37 final class ShardManagerGetSnapshotReplyActor extends UntypedAbstractActor {
38     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
39
40     private final Set<String> remainingShardNames;
41     private final Params params;
42     private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
43
44     private ShardManagerGetSnapshotReplyActor(final Params params) {
45         this.params = params;
46         remainingShardNames = new HashSet<>(params.shardNames);
47
48         LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
49
50         getContext().setReceiveTimeout(params.receiveTimeout);
51     }
52
53     @Override
54     public void onReceive(final Object message) {
55         if (message instanceof GetSnapshotReply) {
56             onGetSnapshotReply((GetSnapshotReply)message);
57         } else if (message instanceof Failure) {
58             LOG.debug("{}: Received {}", params.id, message);
59
60             params.replyToActor.tell(message, getSelf());
61             getSelf().tell(PoisonPill.getInstance(), getSelf());
62         } else if (message instanceof ReceiveTimeout) {
63             LOG.warn("{}: Timed out after {} ms while waiting for snapshot replies from {} shard(s). "
64                 + "{} shard(s) {} did not respond", params.id, params.receiveTimeout.toMillis(),
65                 params.shardNames.size(), remainingShardNames.size(), remainingShardNames);
66             params.replyToActor.tell(new Failure(new TimeoutException(String.format(
67                 "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s "
68                 + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(),
69                 remainingShardNames.size(), remainingShardNames))), getSelf());
70             getSelf().tell(PoisonPill.getInstance(), getSelf());
71         }
72     }
73
74     private void onGetSnapshotReply(final GetSnapshotReply getSnapshotReply) {
75         LOG.debug("{}: Received {}", params.id, getSnapshotReply);
76
77         ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId());
78         shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
79
80         remainingShardNames.remove(shardId.getShardName());
81         if (remainingShardNames.isEmpty()) {
82             LOG.debug("{}: All shard snapshots received", params.id);
83
84             DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType,
85                     params.shardManagerSnapshot, shardSnapshots);
86             params.replyToActor.tell(datastoreSnapshot, getSelf());
87             getSelf().tell(PoisonPill.getInstance(), getSelf());
88         }
89     }
90
91     public static Props props(final Collection<String> shardNames, final String datastoreType,
92             final ShardManagerSnapshot shardManagerSnapshot, final ActorRef replyToActor, final String id,
93             final FiniteDuration receiveTimeout) {
94         return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
95                 shardManagerSnapshot, replyToActor, id, receiveTimeout));
96     }
97
98     private static final class Params {
99         final Collection<String> shardNames;
100         final String datastoreType;
101         final ShardManagerSnapshot shardManagerSnapshot;
102         final ActorRef replyToActor;
103         final String id;
104         final FiniteDuration receiveTimeout;
105
106         Params(final Collection<String> shardNames, final String datastoreType,
107                 final ShardManagerSnapshot shardManagerSnapshot, final ActorRef replyToActor, final String id,
108                 final FiniteDuration receiveTimeout) {
109             this.shardNames = shardNames;
110             this.datastoreType = datastoreType;
111             this.shardManagerSnapshot = shardManagerSnapshot;
112             this.replyToActor = replyToActor;
113             this.id = id;
114             this.receiveTimeout = receiveTimeout;
115         }
116     }
117 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.