2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
35 import scala.concurrent.duration.Duration;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.List;
43 * The ShardManager has the following jobs,
45 * <li> Create all the local shard replicas that belong on this cluster member
46 * <li> Find the address of the local shard
47 * <li> Find the primary replica for any given shard
48 * <li> Monitor the cluster members and store their addresses
51 public class ShardManager extends AbstractUntypedActor {
53 // Stores a mapping between a member name and the address of the member
54 // Member names look like "member-1", "member-2" etc and are as specified
56 private final Map<String, Address> memberNameToAddress = new HashMap<>();
58 // Stores a mapping between a shard name and it's corresponding information
59 // Shard names look like inventory, topology etc and are as specified in
61 private final Map<String, ShardInformation> localShards = new HashMap<>();
63 // The type of a ShardManager reflects the type of the datastore itself
64 // A data store could be of type config/operational
65 private final String type;
67 private final ClusterWrapper cluster;
69 private final Configuration configuration;
71 private ShardManagerInfoMBean mBean;
73 private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
76 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
77 * configuration or operational
79 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
80 InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
82 this.type = Preconditions.checkNotNull(type, "type should not be null");
83 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
84 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
85 this.dataStoreProperties = dataStoreProperties;
87 // Subscribe this actor to cluster member events
88 cluster.subscribeToMemberEvents(getSelf());
90 // Create all the local Shards and make them a child of the ShardManager
91 // TODO: This may need to be initiated when we first get the schema context
95 public static Props props(final String type,
96 final ClusterWrapper cluster,
97 final Configuration configuration,
98 final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
100 Preconditions.checkNotNull(type, "type should not be null");
101 Preconditions.checkNotNull(cluster, "cluster should not be null");
102 Preconditions.checkNotNull(configuration, "configuration should not be null");
104 return Props.create(new Creator<ShardManager>() {
107 public ShardManager create() throws Exception {
108 return new ShardManager(type, cluster, configuration, dataStoreProperties);
115 public void handleReceive(Object message) throws Exception {
116 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
118 FindPrimary.fromSerializable(message));
119 } else if(message instanceof FindLocalShard){
120 findLocalShard((FindLocalShard) message);
121 } else if (message instanceof UpdateSchemaContext) {
122 updateSchemaContext(message);
123 } else if (message instanceof ClusterEvent.MemberUp){
124 memberUp((ClusterEvent.MemberUp) message);
125 } else if(message instanceof ClusterEvent.MemberRemoved) {
126 memberRemoved((ClusterEvent.MemberRemoved) message);
127 } else if(message instanceof ClusterEvent.UnreachableMember) {
128 ignoreMessage(message);
130 unknownMessage(message);
135 private void findLocalShard(FindLocalShard message) {
136 ShardInformation shardInformation =
137 localShards.get(message.getShardName());
139 if(shardInformation != null){
140 getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
144 getSender().tell(new LocalShardNotFound(message.getShardName()),
148 private void memberRemoved(ClusterEvent.MemberRemoved message) {
149 memberNameToAddress.remove(message.member().roles().head());
152 private void memberUp(ClusterEvent.MemberUp message) {
153 String memberName = message.member().roles().head();
155 memberNameToAddress.put(memberName , message.member().address());
157 for(ShardInformation info : localShards.values()){
158 String shardName = info.getShardName();
159 info.updatePeerAddress(getShardIdentifier(memberName, shardName),
160 getShardActorPath(shardName, memberName));
165 * Notifies all the local shards of a change in the schema context
169 private void updateSchemaContext(Object message) {
170 for(ShardInformation info : localShards.values()){
171 info.getActor().tell(message,getSelf());
175 private void findPrimary(FindPrimary message) {
176 String shardName = message.getShardName();
178 // First see if the there is a local replica for the shard
179 ShardInformation info = localShards.get(shardName);
181 ActorPath shardPath = info.getActorPath();
182 if (shardPath != null) {
185 new PrimaryFound(shardPath.toString()).toSerializable(),
191 List<String> members =
192 configuration.getMembersFromShardName(shardName);
194 if(cluster.getCurrentMemberName() != null) {
195 members.remove(cluster.getCurrentMemberName());
198 // There is no way for us to figure out the primary (for now) so assume
199 // that one of the remote nodes is a primary
200 for(String memberName : members) {
201 Address address = memberNameToAddress.get(memberName);
204 getShardActorPath(shardName, memberName);
205 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
209 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
212 private String getShardActorPath(String shardName, String memberName) {
213 Address address = memberNameToAddress.get(memberName);
214 if(address != null) {
215 StringBuilder builder = new StringBuilder();
216 builder.append(address.toString())
218 .append(ShardManagerIdentifier.builder().type(type).build().toString())
220 .append(getShardIdentifier(memberName, shardName));
221 return builder.toString();
227 * Construct the name of the shard actor given the name of the member on
228 * which the shard resides and the name of the shard
234 private ShardIdentifier getShardIdentifier(String memberName, String shardName){
235 return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
239 * Create shards that are local to the member on which the ShardManager
243 private void createLocalShards() {
244 String memberName = this.cluster.getCurrentMemberName();
245 List<String> memberShardNames =
246 this.configuration.getMemberShardNames(memberName);
248 List<String> localShardActorNames = new ArrayList<>();
249 for(String shardName : memberShardNames){
250 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
251 Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
252 ActorRef actor = getContext()
253 .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
255 localShardActorNames.add(shardId.toString());
256 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
259 mBean = ShardManagerInfo
260 .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
265 * Given the name of the shard find the addresses of all it's peers
270 private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
272 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
274 List<String> members =
275 this.configuration.getMembersFromShardName(shardName);
277 String currentMemberName = this.cluster.getCurrentMemberName();
279 for(String memberName : members){
280 if(!currentMemberName.equals(memberName)){
281 ShardIdentifier shardId = getShardIdentifier(memberName,
284 getShardActorPath(shardName, currentMemberName);
285 peerAddresses.put(shardId, path);
288 return peerAddresses;
292 public SupervisorStrategy supervisorStrategy() {
294 return new OneForOneStrategy(10, Duration.create("1 minute"),
295 new Function<Throwable, SupervisorStrategy.Directive>() {
297 public SupervisorStrategy.Directive apply(Throwable t) {
298 LOG.warning("Supervisor Strategy of resume applied {}",t);
299 return SupervisorStrategy.resume();
306 private class ShardInformation {
307 private final String shardName;
308 private final ActorRef actor;
309 private final ActorPath actorPath;
310 private final Map<ShardIdentifier, String> peerAddresses;
312 private ShardInformation(String shardName, ActorRef actor,
313 Map<ShardIdentifier, String> peerAddresses) {
314 this.shardName = shardName;
316 this.actorPath = actor.path();
317 this.peerAddresses = peerAddresses;
320 public String getShardName() {
324 public ActorRef getActor(){
328 public ActorPath getActorPath() {
332 public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
333 LOG.info("updatePeerAddress for peer {} with address {}", peerId,
335 if(peerAddresses.containsKey(peerId)){
336 peerAddresses.put(peerId, peerAddress);
339 "Sending PeerAddressResolved for peer {} with address {} to {}",
340 peerId, peerAddress, actor.path());
343 .tell(new PeerAddressResolved(peerId, peerAddress),