2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
21 import com.google.common.base.Preconditions;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
25 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
32 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
33 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
34 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import scala.concurrent.duration.Duration;
39 import java.util.ArrayList;
40 import java.util.HashMap;
41 import java.util.List;
45 * The ShardManager has the following jobs,
47 * <li> Create all the local shard replicas that belong on this cluster member
48 * <li> Find the address of the local shard
49 * <li> Find the primary replica for any given shard
50 * <li> Monitor the cluster members and store their addresses
53 public class ShardManager extends AbstractUntypedActor {
55 // Stores a mapping between a member name and the address of the member
56 // Member names look like "member-1", "member-2" etc and are as specified
58 private final Map<String, Address> memberNameToAddress = new HashMap<>();
60 // Stores a mapping between a shard name and it's corresponding information
61 // Shard names look like inventory, topology etc and are as specified in
63 private final Map<String, ShardInformation> localShards = new HashMap<>();
65 // The type of a ShardManager reflects the type of the datastore itself
66 // A data store could be of type config/operational
67 private final String type;
69 private final ClusterWrapper cluster;
71 private final Configuration configuration;
73 private ShardManagerInfoMBean mBean;
75 private final DatastoreContext datastoreContext;
78 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
79 * configuration or operational
81 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
82 DatastoreContext datastoreContext) {
84 this.type = Preconditions.checkNotNull(type, "type should not be null");
85 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
86 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
87 this.datastoreContext = datastoreContext;
89 // Subscribe this actor to cluster member events
90 cluster.subscribeToMemberEvents(getSelf());
92 // Create all the local Shards and make them a child of the ShardManager
93 // TODO: This may need to be initiated when we first get the schema context
97 public static Props props(final String type,
98 final ClusterWrapper cluster,
99 final Configuration configuration,
100 final DatastoreContext datastoreContext) {
102 Preconditions.checkNotNull(type, "type should not be null");
103 Preconditions.checkNotNull(cluster, "cluster should not be null");
104 Preconditions.checkNotNull(configuration, "configuration should not be null");
106 return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
110 public void handleReceive(Object message) throws Exception {
111 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
113 FindPrimary.fromSerializable(message));
114 } else if(message instanceof FindLocalShard){
115 findLocalShard((FindLocalShard) message);
116 } else if (message instanceof UpdateSchemaContext) {
117 updateSchemaContext(message);
118 } else if (message instanceof ClusterEvent.MemberUp){
119 memberUp((ClusterEvent.MemberUp) message);
120 } else if(message instanceof ClusterEvent.MemberRemoved) {
121 memberRemoved((ClusterEvent.MemberRemoved) message);
122 } else if(message instanceof ClusterEvent.UnreachableMember) {
123 ignoreMessage(message);
125 unknownMessage(message);
130 private void findLocalShard(FindLocalShard message) {
131 ShardInformation shardInformation =
132 localShards.get(message.getShardName());
134 if(shardInformation != null){
135 getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
139 getSender().tell(new LocalShardNotFound(message.getShardName()),
143 private void memberRemoved(ClusterEvent.MemberRemoved message) {
144 memberNameToAddress.remove(message.member().roles().head());
147 private void memberUp(ClusterEvent.MemberUp message) {
148 String memberName = message.member().roles().head();
150 memberNameToAddress.put(memberName , message.member().address());
152 for(ShardInformation info : localShards.values()){
153 String shardName = info.getShardName();
154 info.updatePeerAddress(getShardIdentifier(memberName, shardName),
155 getShardActorPath(shardName, memberName));
160 * Notifies all the local shards of a change in the schema context
164 private void updateSchemaContext(Object message) {
165 for(ShardInformation info : localShards.values()){
166 info.getActor().tell(message,getSelf());
170 private void findPrimary(FindPrimary message) {
171 String shardName = message.getShardName();
173 // First see if the there is a local replica for the shard
174 ShardInformation info = localShards.get(shardName);
176 ActorPath shardPath = info.getActorPath();
177 if (shardPath != null) {
180 new PrimaryFound(shardPath.toString()).toSerializable(),
186 List<String> members =
187 configuration.getMembersFromShardName(shardName);
189 if(cluster.getCurrentMemberName() != null) {
190 members.remove(cluster.getCurrentMemberName());
193 // There is no way for us to figure out the primary (for now) so assume
194 // that one of the remote nodes is a primary
195 for(String memberName : members) {
196 Address address = memberNameToAddress.get(memberName);
199 getShardActorPath(shardName, memberName);
200 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
204 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
207 private String getShardActorPath(String shardName, String memberName) {
208 Address address = memberNameToAddress.get(memberName);
209 if(address != null) {
210 StringBuilder builder = new StringBuilder();
211 builder.append(address.toString())
213 .append(ShardManagerIdentifier.builder().type(type).build().toString())
215 .append(getShardIdentifier(memberName, shardName));
216 return builder.toString();
222 * Construct the name of the shard actor given the name of the member on
223 * which the shard resides and the name of the shard
229 private ShardIdentifier getShardIdentifier(String memberName, String shardName){
230 return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
234 * Create shards that are local to the member on which the ShardManager
238 private void createLocalShards() {
239 String memberName = this.cluster.getCurrentMemberName();
240 List<String> memberShardNames =
241 this.configuration.getMemberShardNames(memberName);
243 List<String> localShardActorNames = new ArrayList<>();
244 for(String shardName : memberShardNames){
245 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
246 Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
247 ActorRef actor = getContext()
248 .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
249 withMailbox(ActorContext.MAILBOX), shardId.toString());
251 localShardActorNames.add(shardId.toString());
252 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
255 mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
256 datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
260 * Given the name of the shard find the addresses of all it's peers
265 private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
267 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
269 List<String> members =
270 this.configuration.getMembersFromShardName(shardName);
272 String currentMemberName = this.cluster.getCurrentMemberName();
274 for(String memberName : members){
275 if(!currentMemberName.equals(memberName)){
276 ShardIdentifier shardId = getShardIdentifier(memberName,
279 getShardActorPath(shardName, currentMemberName);
280 peerAddresses.put(shardId, path);
283 return peerAddresses;
287 public SupervisorStrategy supervisorStrategy() {
289 return new OneForOneStrategy(10, Duration.create("1 minute"),
290 new Function<Throwable, SupervisorStrategy.Directive>() {
292 public SupervisorStrategy.Directive apply(Throwable t) {
293 StringBuilder sb = new StringBuilder();
294 for(StackTraceElement element : t.getStackTrace()) {
296 .append(element.toString());
298 LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
299 return SupervisorStrategy.resume();
306 private class ShardInformation {
307 private final String shardName;
308 private final ActorRef actor;
309 private final ActorPath actorPath;
310 private final Map<ShardIdentifier, String> peerAddresses;
312 private ShardInformation(String shardName, ActorRef actor,
313 Map<ShardIdentifier, String> peerAddresses) {
314 this.shardName = shardName;
316 this.actorPath = actor.path();
317 this.peerAddresses = peerAddresses;
320 public String getShardName() {
324 public ActorRef getActor(){
328 public ActorPath getActorPath() {
332 public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
333 LOG.info("updatePeerAddress for peer {} with address {}", peerId,
335 if(peerAddresses.containsKey(peerId)){
336 peerAddresses.put(peerId, peerAddress);
339 "Sending PeerAddressResolved for peer {} with address {} to {}",
340 peerId, peerAddress, actor.path());
343 .tell(new PeerAddressResolved(peerId, peerAddress),
350 private static class ShardManagerCreator implements Creator<ShardManager> {
351 private static final long serialVersionUID = 1L;
354 final ClusterWrapper cluster;
355 final Configuration configuration;
356 final DatastoreContext datastoreContext;
358 ShardManagerCreator(String type, ClusterWrapper cluster,
359 Configuration configuration, DatastoreContext datastoreContext) {
361 this.cluster = cluster;
362 this.configuration = configuration;
363 this.datastoreContext = datastoreContext;
367 public ShardManager create() throws Exception {
368 return new ShardManager(type, cluster, configuration, datastoreContext);