2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
35 import scala.concurrent.duration.Duration;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.List;
43 * The ShardManager has the following jobs,
45 * <li> Create all the local shard replicas that belong on this cluster member
46 * <li> Find the address of the local shard
47 * <li> Find the primary replica for any given shard
48 * <li> Monitor the cluster members and store their addresses
51 public class ShardManager extends AbstractUntypedActor {
53 // Stores a mapping between a member name and the address of the member
54 // Member names look like "member-1", "member-2" etc and are as specified
56 private final Map<String, Address> memberNameToAddress = new HashMap<>();
58 // Stores a mapping between a shard name and it's corresponding information
59 // Shard names look like inventory, topology etc and are as specified in
61 private final Map<String, ShardInformation> localShards = new HashMap<>();
63 // The type of a ShardManager reflects the type of the datastore itself
64 // A data store could be of type config/operational
65 private final String type;
67 private final ClusterWrapper cluster;
69 private final Configuration configuration;
71 private ShardManagerInfoMBean mBean;
73 private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
76 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
77 * configuration or operational
79 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
80 InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
82 this.type = Preconditions.checkNotNull(type, "type should not be null");
83 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
84 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
85 this.dataStoreProperties = dataStoreProperties;
87 // Subscribe this actor to cluster member events
88 cluster.subscribeToMemberEvents(getSelf());
90 // Create all the local Shards and make them a child of the ShardManager
91 // TODO: This may need to be initiated when we first get the schema context
95 public static Props props(final String type,
96 final ClusterWrapper cluster,
97 final Configuration configuration,
98 final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
100 Preconditions.checkNotNull(type, "type should not be null");
101 Preconditions.checkNotNull(cluster, "cluster should not be null");
102 Preconditions.checkNotNull(configuration, "configuration should not be null");
104 return Props.create(new Creator<ShardManager>() {
107 public ShardManager create() throws Exception {
108 return new ShardManager(type, cluster, configuration, dataStoreProperties);
115 public void handleReceive(Object message) throws Exception {
116 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
118 FindPrimary.fromSerializable(message));
119 } else if(message instanceof FindLocalShard){
120 findLocalShard((FindLocalShard) message);
121 } else if (message instanceof UpdateSchemaContext) {
122 updateSchemaContext(message);
123 } else if (message instanceof ClusterEvent.MemberUp){
124 memberUp((ClusterEvent.MemberUp) message);
125 } else if(message instanceof ClusterEvent.MemberRemoved) {
126 memberRemoved((ClusterEvent.MemberRemoved) message);
127 } else if(message instanceof ClusterEvent.UnreachableMember) {
128 ignoreMessage(message);
130 unknownMessage(message);
135 private void findLocalShard(FindLocalShard message) {
136 ShardInformation shardInformation =
137 localShards.get(message.getShardName());
139 if(shardInformation != null){
140 getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
144 getSender().tell(new LocalShardNotFound(message.getShardName()),
148 private void memberRemoved(ClusterEvent.MemberRemoved message) {
149 memberNameToAddress.remove(message.member().roles().head());
152 private void memberUp(ClusterEvent.MemberUp message) {
153 String memberName = message.member().roles().head();
155 memberNameToAddress.put(memberName , message.member().address());
157 for(ShardInformation info : localShards.values()){
158 String shardName = info.getShardName();
159 info.updatePeerAddress(getShardIdentifier(memberName, shardName),
160 getShardActorPath(shardName, memberName));
165 * Notifies all the local shards of a change in the schema context
169 private void updateSchemaContext(Object message) {
170 for(ShardInformation info : localShards.values()){
171 info.getActor().tell(message,getSelf());
175 private void findPrimary(FindPrimary message) {
176 String shardName = message.getShardName();
178 // First see if the there is a local replica for the shard
179 ShardInformation info = localShards.get(shardName);
181 ActorPath shardPath = info.getActorPath();
182 if (shardPath != null) {
185 new PrimaryFound(shardPath.toString()).toSerializable(),
191 List<String> members =
192 configuration.getMembersFromShardName(shardName);
194 if(cluster.getCurrentMemberName() != null) {
195 members.remove(cluster.getCurrentMemberName());
198 // There is no way for us to figure out the primary (for now) so assume
199 // that one of the remote nodes is a primary
200 for(String memberName : members) {
201 Address address = memberNameToAddress.get(memberName);
204 getShardActorPath(shardName, memberName);
205 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
209 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
212 private String getShardActorPath(String shardName, String memberName) {
213 Address address = memberNameToAddress.get(memberName);
214 if(address != null) {
215 StringBuilder builder = new StringBuilder();
216 builder.append(address.toString())
218 .append(ShardManagerIdentifier.builder().type(type).build().toString())
220 .append(getShardIdentifier(memberName, shardName));
221 return builder.toString();
227 * Construct the name of the shard actor given the name of the member on
228 * which the shard resides and the name of the shard
234 private ShardIdentifier getShardIdentifier(String memberName, String shardName){
235 return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
239 * Create shards that are local to the member on which the ShardManager
243 private void createLocalShards() {
244 String memberName = this.cluster.getCurrentMemberName();
245 List<String> memberShardNames =
246 this.configuration.getMemberShardNames(memberName);
248 List<String> localShardActorNames = new ArrayList<>();
249 for(String shardName : memberShardNames){
250 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
251 Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
252 ActorRef actor = getContext()
253 .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
255 localShardActorNames.add(shardId.toString());
256 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
259 mBean = ShardManagerInfo
260 .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
265 * Given the name of the shard find the addresses of all it's peers
270 private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
272 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
274 List<String> members =
275 this.configuration.getMembersFromShardName(shardName);
277 String currentMemberName = this.cluster.getCurrentMemberName();
279 for(String memberName : members){
280 if(!currentMemberName.equals(memberName)){
281 ShardIdentifier shardId = getShardIdentifier(memberName,
284 getShardActorPath(shardName, currentMemberName);
285 peerAddresses.put(shardId, path);
288 return peerAddresses;
292 public SupervisorStrategy supervisorStrategy() {
293 return new OneForOneStrategy(10, Duration.create("1 minute"),
294 new Function<Throwable, SupervisorStrategy.Directive>() {
296 public SupervisorStrategy.Directive apply(Throwable t) {
297 return SupervisorStrategy.resume();
304 private class ShardInformation {
305 private final String shardName;
306 private final ActorRef actor;
307 private final ActorPath actorPath;
308 private final Map<ShardIdentifier, String> peerAddresses;
310 private ShardInformation(String shardName, ActorRef actor,
311 Map<ShardIdentifier, String> peerAddresses) {
312 this.shardName = shardName;
314 this.actorPath = actor.path();
315 this.peerAddresses = peerAddresses;
318 public String getShardName() {
322 public ActorRef getActor(){
326 public ActorPath getActorPath() {
330 public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
331 LOG.info("updatePeerAddress for peer {} with address {}", peerId,
333 if(peerAddresses.containsKey(peerId)){
334 peerAddresses.put(peerId, peerAddress);
337 "Sending PeerAddressResolved for peer {} with address {} to {}",
338 peerId, peerAddress, actor.path());
341 .tell(new PeerAddressResolved(peerId, peerAddress),