Merge "Make Raft messages serializable"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
24 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
25 import scala.concurrent.duration.Duration;
26
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30
31 /**
32  * The ShardManager has the following jobs,
33  * <p>
34  * <li> Create all the local shard replicas that belong on this cluster member
35  * <li> Find the primary replica for any given shard
36  * <li> Engage in shard replica elections which decide which replica should be the primary
37  * </p>
38  * <p/>
39  * <h3>>Creation of Shard replicas</h3
40  * <p>
41  * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
42  * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
43  * </p>
44  * <p/>
45  * <h3> Replica Elections </h3>
46  * <p/>
47  * <p>
48  * The Shard Manager uses multiple cues to initiate election.
49  * <li> When a member of the cluster dies
50  * <li> When a local shard replica dies
51  * <li> When a local shard replica comes alive
52  * </p>
53  */
54 public class ShardManager extends AbstractUntypedActor {
55
56     // Stores a mapping between a member name and the address of the member
57     private final Map<String, Address> memberNameToAddress = new HashMap<>();
58
59     private final Map<String, ActorPath> localShards = new HashMap<>();
60
61
62     private final String type;
63
64     private final ClusterWrapper cluster;
65
66     private final Configuration configuration;
67
68     /**
69      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
70      *             configuration or operational
71      */
72     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
73
74         this.type = Preconditions.checkNotNull(type, "type should not be null");
75         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
76         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
77
78         // Subscribe this actor to cluster member events
79         cluster.subscribeToMemberEvents(getSelf());
80
81         // Create all the local Shards and make them a child of the ShardManager
82         // TODO: This may need to be initiated when we first get the schema context
83         createLocalShards();
84     }
85
86     public static Props props(final String type,
87         final ClusterWrapper cluster,
88         final Configuration configuration) {
89         return Props.create(new Creator<ShardManager>() {
90
91             @Override
92             public ShardManager create() throws Exception {
93                 return new ShardManager(type, cluster, configuration);
94             }
95         });
96     }
97
98
99     @Override
100     public void handleReceive(Object message) throws Exception {
101         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
102             findPrimary(
103                 FindPrimary.fromSerializable(message));
104
105         } else if (message instanceof UpdateSchemaContext) {
106             updateSchemaContext(message);
107         } else if (message instanceof ClusterEvent.MemberUp){
108             memberUp((ClusterEvent.MemberUp) message);
109         } else if(message instanceof ClusterEvent.MemberRemoved) {
110             memberRemoved((ClusterEvent.MemberRemoved) message);
111         } else if(message instanceof ClusterEvent.UnreachableMember) {
112             ignoreMessage(message);
113         } else{
114           throw new Exception ("Not recognized message received, message="+message);
115         }
116
117     }
118
119     private void ignoreMessage(Object message){
120         LOG.debug("Unhandled message : " + message);
121     }
122
123     private void memberRemoved(ClusterEvent.MemberRemoved message) {
124         memberNameToAddress.remove(message.member().roles().head());
125     }
126
127     private void memberUp(ClusterEvent.MemberUp message) {
128         memberNameToAddress.put(message.member().roles().head(), message.member().address());
129     }
130
131     private void updateSchemaContext(Object message) {
132         for(ActorPath path : localShards.values()){
133             getContext().system().actorSelection(path)
134                 .forward(message,
135                     getContext());
136         }
137     }
138
139     private void findPrimary(FindPrimary message) {
140         String shardName = message.getShardName();
141
142         List<String> members =
143             configuration.getMembersFromShardName(shardName);
144
145         for(String memberName : members) {
146             if (memberName.equals(cluster.getCurrentMemberName())) {
147                 // This is a local shard
148                 ActorPath shardPath = localShards.get(shardName);
149                 if (shardPath == null) {
150                     getSender()
151                         .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
152                     return;
153                 }
154                 getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
155                     getSelf());
156                 return;
157             } else {
158                 Address address = memberNameToAddress.get(memberName);
159                 if(address != null){
160                     String path =
161                         address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
162                             memberName, shardName);
163                     getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
164                     return;
165                 }
166
167
168             }
169         }
170
171         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
172     }
173
174     private String getShardActorName(String memberName, String shardName){
175         return memberName + "-shard-" + shardName + "-" + this.type;
176     }
177
178     // Create the shards that are local to this member
179     private void createLocalShards() {
180         String memberName = this.cluster.getCurrentMemberName();
181         List<String> memberShardNames =
182             this.configuration.getMemberShardNames(memberName);
183
184         for(String shardName : memberShardNames){
185             String shardActorName = getShardActorName(memberName, shardName);
186             ActorRef actor = getContext()
187                 .actorOf(Shard.props(shardActorName), shardActorName);
188             ActorPath path = actor.path();
189             localShards.put(shardName, path);
190         }
191
192     }
193
194
195     @Override
196     public SupervisorStrategy supervisorStrategy() {
197         return new OneForOneStrategy(10, Duration.create("1 minute"),
198             new Function<Throwable, SupervisorStrategy.Directive>() {
199                 @Override
200                 public SupervisorStrategy.Directive apply(Throwable t) {
201                     return SupervisorStrategy.resume();
202                 }
203             }
204         );
205
206     }
207 }