2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
21 import com.google.common.base.Preconditions;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
25 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
32 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
33 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
34 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
36 import scala.concurrent.duration.Duration;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.List;
44 * The ShardManager has the following jobs,
46 * <li> Create all the local shard replicas that belong on this cluster member
47 * <li> Find the address of the local shard
48 * <li> Find the primary replica for any given shard
49 * <li> Monitor the cluster members and store their addresses
52 public class ShardManager extends AbstractUntypedActor {
54 // Stores a mapping between a member name and the address of the member
55 // Member names look like "member-1", "member-2" etc and are as specified
57 private final Map<String, Address> memberNameToAddress = new HashMap<>();
59 // Stores a mapping between a shard name and it's corresponding information
60 // Shard names look like inventory, topology etc and are as specified in
62 private final Map<String, ShardInformation> localShards = new HashMap<>();
64 // The type of a ShardManager reflects the type of the datastore itself
65 // A data store could be of type config/operational
66 private final String type;
68 private final ClusterWrapper cluster;
70 private final Configuration configuration;
72 private ShardManagerInfoMBean mBean;
74 private final ShardContext shardContext;
77 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
78 * configuration or operational
80 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
81 ShardContext shardContext) {
83 this.type = Preconditions.checkNotNull(type, "type should not be null");
84 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
85 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
86 this.shardContext = shardContext;
88 // Subscribe this actor to cluster member events
89 cluster.subscribeToMemberEvents(getSelf());
91 // Create all the local Shards and make them a child of the ShardManager
92 // TODO: This may need to be initiated when we first get the schema context
96 public static Props props(final String type,
97 final ClusterWrapper cluster,
98 final Configuration configuration,
99 final ShardContext shardContext) {
101 Preconditions.checkNotNull(type, "type should not be null");
102 Preconditions.checkNotNull(cluster, "cluster should not be null");
103 Preconditions.checkNotNull(configuration, "configuration should not be null");
105 return Props.create(new ShardManagerCreator(type, cluster, configuration, shardContext));
109 public void handleReceive(Object message) throws Exception {
110 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
112 FindPrimary.fromSerializable(message));
113 } else if(message instanceof FindLocalShard){
114 findLocalShard((FindLocalShard) message);
115 } else if (message instanceof UpdateSchemaContext) {
116 updateSchemaContext(message);
117 } else if (message instanceof ClusterEvent.MemberUp){
118 memberUp((ClusterEvent.MemberUp) message);
119 } else if(message instanceof ClusterEvent.MemberRemoved) {
120 memberRemoved((ClusterEvent.MemberRemoved) message);
121 } else if(message instanceof ClusterEvent.UnreachableMember) {
122 ignoreMessage(message);
124 unknownMessage(message);
129 private void findLocalShard(FindLocalShard message) {
130 ShardInformation shardInformation =
131 localShards.get(message.getShardName());
133 if(shardInformation != null){
134 getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
138 getSender().tell(new LocalShardNotFound(message.getShardName()),
142 private void memberRemoved(ClusterEvent.MemberRemoved message) {
143 memberNameToAddress.remove(message.member().roles().head());
146 private void memberUp(ClusterEvent.MemberUp message) {
147 String memberName = message.member().roles().head();
149 memberNameToAddress.put(memberName , message.member().address());
151 for(ShardInformation info : localShards.values()){
152 String shardName = info.getShardName();
153 info.updatePeerAddress(getShardIdentifier(memberName, shardName),
154 getShardActorPath(shardName, memberName));
159 * Notifies all the local shards of a change in the schema context
163 private void updateSchemaContext(Object message) {
164 for(ShardInformation info : localShards.values()){
165 info.getActor().tell(message,getSelf());
169 private void findPrimary(FindPrimary message) {
170 String shardName = message.getShardName();
172 // First see if the there is a local replica for the shard
173 ShardInformation info = localShards.get(shardName);
175 ActorPath shardPath = info.getActorPath();
176 if (shardPath != null) {
179 new PrimaryFound(shardPath.toString()).toSerializable(),
185 List<String> members =
186 configuration.getMembersFromShardName(shardName);
188 if(cluster.getCurrentMemberName() != null) {
189 members.remove(cluster.getCurrentMemberName());
192 // There is no way for us to figure out the primary (for now) so assume
193 // that one of the remote nodes is a primary
194 for(String memberName : members) {
195 Address address = memberNameToAddress.get(memberName);
198 getShardActorPath(shardName, memberName);
199 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
203 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
206 private String getShardActorPath(String shardName, String memberName) {
207 Address address = memberNameToAddress.get(memberName);
208 if(address != null) {
209 StringBuilder builder = new StringBuilder();
210 builder.append(address.toString())
212 .append(ShardManagerIdentifier.builder().type(type).build().toString())
214 .append(getShardIdentifier(memberName, shardName));
215 return builder.toString();
221 * Construct the name of the shard actor given the name of the member on
222 * which the shard resides and the name of the shard
228 private ShardIdentifier getShardIdentifier(String memberName, String shardName){
229 return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
233 * Create shards that are local to the member on which the ShardManager
237 private void createLocalShards() {
238 String memberName = this.cluster.getCurrentMemberName();
239 List<String> memberShardNames =
240 this.configuration.getMemberShardNames(memberName);
242 List<String> localShardActorNames = new ArrayList<>();
243 for(String shardName : memberShardNames){
244 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
245 Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
246 ActorRef actor = getContext()
247 .actorOf(Shard.props(shardId, peerAddresses, shardContext),
249 localShardActorNames.add(shardId.toString());
250 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
253 mBean = ShardManagerInfo
254 .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
259 * Given the name of the shard find the addresses of all it's peers
264 private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
266 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
268 List<String> members =
269 this.configuration.getMembersFromShardName(shardName);
271 String currentMemberName = this.cluster.getCurrentMemberName();
273 for(String memberName : members){
274 if(!currentMemberName.equals(memberName)){
275 ShardIdentifier shardId = getShardIdentifier(memberName,
278 getShardActorPath(shardName, currentMemberName);
279 peerAddresses.put(shardId, path);
282 return peerAddresses;
286 public SupervisorStrategy supervisorStrategy() {
288 return new OneForOneStrategy(10, Duration.create("1 minute"),
289 new Function<Throwable, SupervisorStrategy.Directive>() {
291 public SupervisorStrategy.Directive apply(Throwable t) {
292 StringBuilder sb = new StringBuilder();
293 for(StackTraceElement element : t.getStackTrace()) {
295 .append(element.toString());
297 LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
298 return SupervisorStrategy.resume();
305 private class ShardInformation {
306 private final String shardName;
307 private final ActorRef actor;
308 private final ActorPath actorPath;
309 private final Map<ShardIdentifier, String> peerAddresses;
311 private ShardInformation(String shardName, ActorRef actor,
312 Map<ShardIdentifier, String> peerAddresses) {
313 this.shardName = shardName;
315 this.actorPath = actor.path();
316 this.peerAddresses = peerAddresses;
319 public String getShardName() {
323 public ActorRef getActor(){
327 public ActorPath getActorPath() {
331 public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
332 LOG.info("updatePeerAddress for peer {} with address {}", peerId,
334 if(peerAddresses.containsKey(peerId)){
335 peerAddresses.put(peerId, peerAddress);
338 "Sending PeerAddressResolved for peer {} with address {} to {}",
339 peerId, peerAddress, actor.path());
342 .tell(new PeerAddressResolved(peerId, peerAddress),
349 private static class ShardManagerCreator implements Creator<ShardManager> {
350 private static final long serialVersionUID = 1L;
353 final ClusterWrapper cluster;
354 final Configuration configuration;
355 final ShardContext shardContext;
357 ShardManagerCreator(String type, ClusterWrapper cluster,
358 Configuration configuration, ShardContext shardContext) {
360 this.cluster = cluster;
361 this.configuration = configuration;
362 this.shardContext = shardContext;
366 public ShardManager create() throws Exception {
367 return new ShardManager(type, cluster, configuration, shardContext);