Merge "Fix the build errors due to the class change of InstanceIdentifier to YangInst...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
19 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
21 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
22
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 /**
28  * The ShardManager has the following jobs,
29  * <p>
30  * <li> Create all the local shard replicas that belong on this cluster member
31  * <li> Find the primary replica for any given shard
32  * <li> Engage in shard replica elections which decide which replica should be the primary
33  * </p>
34  * <p/>
35  * <h3>>Creation of Shard replicas</h3
36  * <p>
37  * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
38  * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
39  * </p>
40  * <p/>
41  * <h3> Replica Elections </h3>
42  * <p/>
43  * <p>
44  * The Shard Manager uses multiple cues to initiate election.
45  * <li> When a member of the cluster dies
46  * <li> When a local shard replica dies
47  * <li> When a local shard replica comes alive
48  * </p>
49  */
50 public class ShardManager extends AbstractUntypedActor {
51
52     // Stores a mapping between a shard name and the address of the current primary
53     private final Map<String, Address> shardNameToPrimaryAddress =
54         new HashMap<>();
55
56     // Stores a mapping between a member name and the address of the member
57     private final Map<String, Address> memberNameToAddress = new HashMap<>();
58
59     // Stores a mapping between the shard name and all the members on which a replica of that shard are available
60     private final Map<String, List<String>> shardNameToMembers =
61         new HashMap<>();
62
63     private final LoggingAdapter log =
64         Logging.getLogger(getContext().system(), this);
65
66
67     private final Map<String, ActorPath> localShards = new HashMap<>();
68
69
70     private final String type;
71
72     private final ClusterWrapper cluster;
73
74     private final Configuration configuration;
75
76     /**
77      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
78      *             configuration or operational
79      */
80     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
81         this.type = type;
82         this.cluster = cluster;
83         this.configuration = configuration;
84         String memberName = cluster.getCurrentMemberName();
85         List<String> memberShardNames =
86             configuration.getMemberShardNames(memberName);
87
88         for(String shardName : memberShardNames){
89             String shardActorName = getShardActorName(memberName, shardName);
90             ActorRef actor = getContext()
91                 .actorOf(Shard.props(shardActorName), shardActorName);
92             ActorPath path = actor.path();
93             localShards.put(shardName, path);
94         }
95     }
96
97     public static Props props(final String type,
98         final ClusterWrapper cluster,
99         final Configuration configuration) {
100         return Props.create(new Creator<ShardManager>() {
101
102             @Override
103             public ShardManager create() throws Exception {
104                 return new ShardManager(type, cluster, configuration);
105             }
106         });
107     }
108
109     @Override
110     public void handleReceive(Object message) throws Exception {
111         if (message instanceof FindPrimary) {
112             FindPrimary msg = ((FindPrimary) message);
113             String shardName = msg.getShardName();
114
115             List<String> members =
116                 configuration.getMembersFromShardName(shardName);
117
118             for(String memberName : members) {
119                 if (memberName.equals(cluster.getCurrentMemberName())) {
120                     // This is a local shard
121                     ActorPath shardPath = localShards.get(shardName);
122                     // FIXME: This check may be redundant
123                     if (shardPath == null) {
124                         getSender()
125                             .tell(new PrimaryNotFound(shardName), getSelf());
126                         return;
127                     }
128                     getSender().tell(new PrimaryFound(shardPath.toString()),
129                         getSelf());
130                     return;
131                 } else {
132                     Address address = memberNameToAddress.get(shardName);
133                     if(address != null){
134                         String path =
135                             address.toString() + "/user/" + getShardActorName(
136                                 memberName, shardName);
137                         getSender().tell(new PrimaryFound(path), getSelf());
138                     }
139
140
141                 }
142             }
143
144             getSender().tell(new PrimaryNotFound(shardName), getSelf());
145
146         } else if (message instanceof UpdateSchemaContext) {
147             for(ActorPath path : localShards.values()){
148                 getContext().system().actorSelection(path)
149                     .forward(message,
150                         getContext());
151             }
152         }
153     }
154
155     private String getShardActorName(String memberName, String shardName){
156         return memberName + "-shard-" + shardName + "-" + this.type;
157     }
158
159
160 }