2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
21 import com.google.common.base.Preconditions;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
25 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
32 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
33 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
34 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import scala.concurrent.duration.Duration;
39 import java.util.ArrayList;
40 import java.util.HashMap;
41 import java.util.List;
45 * The ShardManager has the following jobs,
47 * <li> Create all the local shard replicas that belong on this cluster member
48 * <li> Find the address of the local shard
49 * <li> Find the primary replica for any given shard
50 * <li> Monitor the cluster members and store their addresses
53 public class ShardManager extends AbstractUntypedActor {
55 // Stores a mapping between a member name and the address of the member
56 // Member names look like "member-1", "member-2" etc and are as specified
58 private final Map<String, Address> memberNameToAddress = new HashMap<>();
60 // Stores a mapping between a shard name and it's corresponding information
61 // Shard names look like inventory, topology etc and are as specified in
63 private final Map<String, ShardInformation> localShards = new HashMap<>();
65 // The type of a ShardManager reflects the type of the datastore itself
66 // A data store could be of type config/operational
67 private final String type;
69 private final ClusterWrapper cluster;
71 private final Configuration configuration;
73 private ShardManagerInfoMBean mBean;
75 private final DatastoreContext datastoreContext;
78 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
79 * configuration or operational
81 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
82 DatastoreContext datastoreContext) {
84 this.type = Preconditions.checkNotNull(type, "type should not be null");
85 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
86 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
87 this.datastoreContext = datastoreContext;
89 // Subscribe this actor to cluster member events
90 cluster.subscribeToMemberEvents(getSelf());
92 // Create all the local Shards and make them a child of the ShardManager
93 // TODO: This may need to be initiated when we first get the schema context
97 public static Props props(final String type,
98 final ClusterWrapper cluster,
99 final Configuration configuration,
100 final DatastoreContext datastoreContext) {
102 Preconditions.checkNotNull(type, "type should not be null");
103 Preconditions.checkNotNull(cluster, "cluster should not be null");
104 Preconditions.checkNotNull(configuration, "configuration should not be null");
106 return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
110 public void handleReceive(Object message) throws Exception {
111 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
113 FindPrimary.fromSerializable(message));
114 } else if(message instanceof FindLocalShard){
115 findLocalShard((FindLocalShard) message);
116 } else if (message instanceof UpdateSchemaContext) {
117 updateSchemaContext(message);
118 } else if (message instanceof ClusterEvent.MemberUp){
119 memberUp((ClusterEvent.MemberUp) message);
120 } else if(message instanceof ClusterEvent.MemberRemoved) {
121 memberRemoved((ClusterEvent.MemberRemoved) message);
122 } else if(message instanceof ClusterEvent.UnreachableMember) {
123 ignoreMessage(message);
125 unknownMessage(message);
130 private void findLocalShard(FindLocalShard message) {
131 ShardInformation shardInformation =
132 localShards.get(message.getShardName());
134 if(shardInformation != null){
135 getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
139 getSender().tell(new LocalShardNotFound(message.getShardName()),
143 private void memberRemoved(ClusterEvent.MemberRemoved message) {
144 memberNameToAddress.remove(message.member().roles().head());
147 private void memberUp(ClusterEvent.MemberUp message) {
148 String memberName = message.member().roles().head();
150 memberNameToAddress.put(memberName , message.member().address());
152 for(ShardInformation info : localShards.values()){
153 String shardName = info.getShardName();
154 info.updatePeerAddress(getShardIdentifier(memberName, shardName),
155 getShardActorPath(shardName, memberName));
160 * Notifies all the local shards of a change in the schema context
164 private void updateSchemaContext(Object message) {
165 for(ShardInformation info : localShards.values()){
166 info.getActor().tell(message,getSelf());
170 private void findPrimary(FindPrimary message) {
171 String shardName = message.getShardName();
173 // First see if the there is a local replica for the shard
174 ShardInformation info = localShards.get(shardName);
176 ActorPath shardPath = info.getActorPath();
177 if (shardPath != null) {
180 new PrimaryFound(shardPath.toString()).toSerializable(),
186 List<String> members =
187 configuration.getMembersFromShardName(shardName);
189 if(cluster.getCurrentMemberName() != null) {
190 members.remove(cluster.getCurrentMemberName());
193 // There is no way for us to figure out the primary (for now) so assume
194 // that one of the remote nodes is a primary
195 for(String memberName : members) {
196 Address address = memberNameToAddress.get(memberName);
199 getShardActorPath(shardName, memberName);
200 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
204 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
207 private String getShardActorPath(String shardName, String memberName) {
208 Address address = memberNameToAddress.get(memberName);
209 if(address != null) {
210 StringBuilder builder = new StringBuilder();
211 builder.append(address.toString())
213 .append(ShardManagerIdentifier.builder().type(type).build().toString())
215 .append(getShardIdentifier(memberName, shardName));
216 return builder.toString();
222 * Construct the name of the shard actor given the name of the member on
223 * which the shard resides and the name of the shard
229 private ShardIdentifier getShardIdentifier(String memberName, String shardName){
230 return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
234 * Create shards that are local to the member on which the ShardManager
238 private void createLocalShards() {
239 String memberName = this.cluster.getCurrentMemberName();
240 List<String> memberShardNames =
241 this.configuration.getMemberShardNames(memberName);
243 List<String> localShardActorNames = new ArrayList<>();
244 for(String shardName : memberShardNames){
245 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
246 Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
247 ActorRef actor = getContext()
248 .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
249 withMailbox(ActorContext.MAILBOX), shardId.toString());
251 localShardActorNames.add(shardId.toString());
252 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
255 mBean = ShardManagerInfo
256 .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
261 * Given the name of the shard find the addresses of all it's peers
266 private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
268 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
270 List<String> members =
271 this.configuration.getMembersFromShardName(shardName);
273 String currentMemberName = this.cluster.getCurrentMemberName();
275 for(String memberName : members){
276 if(!currentMemberName.equals(memberName)){
277 ShardIdentifier shardId = getShardIdentifier(memberName,
280 getShardActorPath(shardName, currentMemberName);
281 peerAddresses.put(shardId, path);
284 return peerAddresses;
288 public SupervisorStrategy supervisorStrategy() {
290 return new OneForOneStrategy(10, Duration.create("1 minute"),
291 new Function<Throwable, SupervisorStrategy.Directive>() {
293 public SupervisorStrategy.Directive apply(Throwable t) {
294 StringBuilder sb = new StringBuilder();
295 for(StackTraceElement element : t.getStackTrace()) {
297 .append(element.toString());
299 LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
300 return SupervisorStrategy.resume();
307 private class ShardInformation {
308 private final String shardName;
309 private final ActorRef actor;
310 private final ActorPath actorPath;
311 private final Map<ShardIdentifier, String> peerAddresses;
313 private ShardInformation(String shardName, ActorRef actor,
314 Map<ShardIdentifier, String> peerAddresses) {
315 this.shardName = shardName;
317 this.actorPath = actor.path();
318 this.peerAddresses = peerAddresses;
321 public String getShardName() {
325 public ActorRef getActor(){
329 public ActorPath getActorPath() {
333 public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
334 LOG.info("updatePeerAddress for peer {} with address {}", peerId,
336 if(peerAddresses.containsKey(peerId)){
337 peerAddresses.put(peerId, peerAddress);
340 "Sending PeerAddressResolved for peer {} with address {} to {}",
341 peerId, peerAddress, actor.path());
344 .tell(new PeerAddressResolved(peerId, peerAddress),
351 private static class ShardManagerCreator implements Creator<ShardManager> {
352 private static final long serialVersionUID = 1L;
355 final ClusterWrapper cluster;
356 final Configuration configuration;
357 final DatastoreContext datastoreContext;
359 ShardManagerCreator(String type, ClusterWrapper cluster,
360 Configuration configuration, DatastoreContext datastoreContext) {
362 this.cluster = cluster;
363 this.configuration = configuration;
364 this.datastoreContext = datastoreContext;
368 public ShardManager create() throws Exception {
369 return new ShardManager(type, cluster, configuration, datastoreContext);