2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.serialization.Serialization;
28 import akka.util.Timeout;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.base.Objects;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import com.google.common.base.Strings;
34 import com.google.common.base.Supplier;
35 import com.google.common.collect.Sets;
36 import java.io.Serializable;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.Iterator;
43 import java.util.List;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import javax.annotation.Nonnull;
50 import javax.annotation.Nullable;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.config.Configuration;
53 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
60 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
61 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
62 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
66 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
69 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
71 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
72 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
74 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
76 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
77 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
78 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
79 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
80 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
83 import org.opendaylight.controller.cluster.raft.RaftState;
84 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
85 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
86 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
87 import org.opendaylight.controller.cluster.raft.messages.AddServer;
88 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
89 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
90 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
91 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
92 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
93 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96 import scala.concurrent.Future;
97 import scala.concurrent.duration.Duration;
98 import scala.concurrent.duration.FiniteDuration;
101 * The ShardManager has the following jobs,
103 * <li> Create all the local shard replicas that belong on this cluster member
104 * <li> Find the address of the local shard
105 * <li> Find the primary replica for any given shard
106 * <li> Monitor the cluster members and store their addresses
109 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
111 private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
113 // Stores a mapping between a shard name and it's corresponding information
114 // Shard names look like inventory, topology etc and are as specified in
116 private final Map<String, ShardInformation> localShards = new HashMap<>();
118 // The type of a ShardManager reflects the type of the datastore itself
119 // A data store could be of type config/operational
120 private final String type;
122 private final ClusterWrapper cluster;
124 private final Configuration configuration;
126 private final String shardDispatcherPath;
128 private final ShardManagerInfo mBean;
130 private DatastoreContextFactory datastoreContextFactory;
132 private final CountDownLatch waitTillReadyCountdownLatch;
134 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
136 private final ShardPeerAddressResolver peerAddressResolver;
138 private SchemaContext schemaContext;
140 private DatastoreSnapshot restoreFromSnapshot;
142 private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
144 private final String persistenceId;
148 protected ShardManager(Builder builder) {
150 this.cluster = builder.cluster;
151 this.configuration = builder.configuration;
152 this.datastoreContextFactory = builder.datastoreContextFactory;
153 this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
154 this.shardDispatcherPath =
155 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
156 this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
157 this.primaryShardInfoCache = builder.primaryShardInfoCache;
158 this.restoreFromSnapshot = builder.restoreFromSnapshot;
160 String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
161 persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
163 peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
165 // Subscribe this actor to cluster member events
166 cluster.subscribeToMemberEvents(getSelf());
168 List<String> localShardActorNames = new ArrayList<>();
169 mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
170 "shard-manager-" + this.type,
171 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
172 localShardActorNames);
173 mBean.setShardManager(this);
177 public void postStop() {
178 LOG.info("Stopping ShardManager");
180 mBean.unregisterMBean();
184 public void handleCommand(Object message) throws Exception {
185 if (message instanceof FindPrimary) {
186 findPrimary((FindPrimary)message);
187 } else if(message instanceof FindLocalShard){
188 findLocalShard((FindLocalShard) message);
189 } else if (message instanceof UpdateSchemaContext) {
190 updateSchemaContext(message);
191 } else if(message instanceof ActorInitialized) {
192 onActorInitialized(message);
193 } else if (message instanceof ClusterEvent.MemberUp){
194 memberUp((ClusterEvent.MemberUp) message);
195 } else if (message instanceof ClusterEvent.MemberExited){
196 memberExited((ClusterEvent.MemberExited) message);
197 } else if(message instanceof ClusterEvent.MemberRemoved) {
198 memberRemoved((ClusterEvent.MemberRemoved) message);
199 } else if(message instanceof ClusterEvent.UnreachableMember) {
200 memberUnreachable((ClusterEvent.UnreachableMember)message);
201 } else if(message instanceof ClusterEvent.ReachableMember) {
202 memberReachable((ClusterEvent.ReachableMember) message);
203 } else if(message instanceof DatastoreContextFactory) {
204 onDatastoreContextFactory((DatastoreContextFactory)message);
205 } else if(message instanceof RoleChangeNotification) {
206 onRoleChangeNotification((RoleChangeNotification) message);
207 } else if(message instanceof FollowerInitialSyncUpStatus){
208 onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
209 } else if(message instanceof ShardNotInitializedTimeout) {
210 onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
211 } else if(message instanceof ShardLeaderStateChanged) {
212 onLeaderStateChanged((ShardLeaderStateChanged) message);
213 } else if(message instanceof SwitchShardBehavior){
214 onSwitchShardBehavior((SwitchShardBehavior) message);
215 } else if(message instanceof CreateShard) {
216 onCreateShard((CreateShard)message);
217 } else if(message instanceof AddShardReplica){
218 onAddShardReplica((AddShardReplica)message);
219 } else if(message instanceof ForwardedAddServerReply) {
220 ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
221 onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
222 msg.removeShardOnFailure);
223 } else if(message instanceof ForwardedAddServerFailure) {
224 ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
225 onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
226 } else if(message instanceof PrimaryShardFoundForContext) {
227 PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
228 onPrimaryShardFoundContext(primaryShardFoundContext);
229 } else if(message instanceof RemoveShardReplica){
230 onRemoveShardReplica((RemoveShardReplica)message);
231 } else if(message instanceof GetSnapshot) {
233 } else if(message instanceof ServerRemoved){
234 onShardReplicaRemoved((ServerRemoved) message);
235 } else if (message instanceof SaveSnapshotSuccess) {
236 LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
237 } else if (message instanceof SaveSnapshotFailure) {
238 LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
239 persistenceId(), ((SaveSnapshotFailure) message).cause());
241 unknownMessage(message);
245 private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
246 if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
247 addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
251 private void onShardReplicaRemoved(ServerRemoved message) {
252 final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
253 final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
254 if(shardInformation == null) {
255 LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
257 } else if(shardInformation.getActor() != null) {
258 LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
259 shardInformation.getActor().tell(PoisonPill.getInstance(), self());
261 LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
265 private void onGetSnapshot() {
266 LOG.debug("{}: onGetSnapshot", persistenceId());
268 List<String> notInitialized = null;
269 for(ShardInformation shardInfo: localShards.values()) {
270 if(!shardInfo.isShardInitialized()) {
271 if(notInitialized == null) {
272 notInitialized = new ArrayList<>();
275 notInitialized.add(shardInfo.getShardName());
279 if(notInitialized != null) {
280 getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
281 "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
285 byte[] shardManagerSnapshot = null;
286 ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
287 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
288 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
290 for(ShardInformation shardInfo: localShards.values()) {
291 shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
295 private void onCreateShard(CreateShard createShard) {
298 String shardName = createShard.getModuleShardConfig().getShardName();
299 if(localShards.containsKey(shardName)) {
300 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
302 doCreateShard(createShard);
303 reply = new akka.actor.Status.Success(null);
305 } catch (Exception e) {
306 LOG.error("onCreateShard failed", e);
307 reply = new akka.actor.Status.Failure(e);
310 if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
311 getSender().tell(reply, getSelf());
315 private void doCreateShard(CreateShard createShard) {
316 ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
317 String shardName = moduleShardConfig.getShardName();
319 configuration.addModuleShardConfiguration(moduleShardConfig);
321 DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
322 if(shardDatastoreContext == null) {
323 shardDatastoreContext = newShardDatastoreContext(shardName);
325 shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
326 peerAddressResolver).build();
329 ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
331 Map<String, String> peerAddresses;
332 boolean isActiveMember;
333 if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
334 peerAddresses = getPeerAddresses(shardName);
335 isActiveMember = true;
337 // The local member is not in the given shard member configuration. In this case we'll create
338 // the shard with no peers and with elections disabled so it stays as follower. A
339 // subsequent AddServer request will be needed to make it an active member.
340 isActiveMember = false;
341 peerAddresses = Collections.emptyMap();
342 shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
343 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
346 LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
347 moduleShardConfig.getShardMemberNames(), peerAddresses);
349 ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
350 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
351 info.setActiveMember(isActiveMember);
352 localShards.put(info.getShardName(), info);
354 mBean.addLocalShard(shardId.toString());
356 if(schemaContext != null) {
357 info.setActor(newShardActor(schemaContext, info));
361 private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
362 return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
363 shardPeerAddressResolver(peerAddressResolver);
366 private DatastoreContext newShardDatastoreContext(String shardName) {
367 return newShardDatastoreContextBuilder(shardName).build();
370 private void checkReady(){
371 if (isReadyWithLeaderId()) {
372 LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
373 persistenceId(), type, waitTillReadyCountdownLatch.getCount());
375 waitTillReadyCountdownLatch.countDown();
379 private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
380 LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
382 ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
383 if(shardInformation != null) {
384 shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
385 shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
386 if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
387 primaryShardInfoCache.remove(shardInformation.getShardName());
392 LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
396 private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
397 ShardInformation shardInfo = message.getShardInfo();
399 LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
400 shardInfo.getShardName());
402 shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
404 if(!shardInfo.isShardInitialized()) {
405 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
406 message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
408 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
409 message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
413 private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
414 LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
415 status.getName(), status.isInitialSyncDone());
417 ShardInformation shardInformation = findShardInformation(status.getName());
419 if(shardInformation != null) {
420 shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
422 mBean.setSyncStatus(isInSync());
427 private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
428 LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
429 roleChanged.getOldRole(), roleChanged.getNewRole());
431 ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
432 if(shardInformation != null) {
433 shardInformation.setRole(roleChanged.getNewRole());
435 mBean.setSyncStatus(isInSync());
440 private ShardInformation findShardInformation(String memberId) {
441 for(ShardInformation info : localShards.values()){
442 if(info.getShardId().toString().equals(memberId)){
450 private boolean isReadyWithLeaderId() {
451 boolean isReady = true;
452 for (ShardInformation info : localShards.values()) {
453 if(!info.isShardReadyWithLeaderId()){
461 private boolean isInSync(){
462 for (ShardInformation info : localShards.values()) {
463 if(!info.isInSync()){
470 private void onActorInitialized(Object message) {
471 final ActorRef sender = getSender();
473 if (sender == null) {
474 return; //why is a non-actor sending this message? Just ignore.
477 String actorName = sender.path().name();
478 //find shard name from actor name; actor name is stringified shardId
479 ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
481 if (shardId.getShardName() == null) {
485 markShardAsInitialized(shardId.getShardName());
488 private void markShardAsInitialized(String shardName) {
489 LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
491 ShardInformation shardInformation = localShards.get(shardName);
492 if (shardInformation != null) {
493 shardInformation.setActorInitialized();
495 shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
500 protected void handleRecover(Object message) throws Exception {
501 if (message instanceof RecoveryCompleted) {
502 LOG.info("Recovery complete : {}", persistenceId());
504 // We no longer persist SchemaContext modules so delete all the prior messages from the akka
505 // journal on upgrade from Helium.
506 deleteMessages(lastSequenceNr());
508 } else if (message instanceof SnapshotOffer) {
509 handleShardRecovery((SnapshotOffer) message);
513 private void findLocalShard(FindLocalShard message) {
514 final ShardInformation shardInformation = localShards.get(message.getShardName());
516 if(shardInformation == null){
517 getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
521 sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
523 public Object get() {
524 return new LocalShardFound(shardInformation.getActor());
529 private void sendResponse(ShardInformation shardInformation, boolean doWait,
530 boolean wantShardReady, final Supplier<Object> messageSupplier) {
531 if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
533 final ActorRef sender = getSender();
534 final ActorRef self = self();
536 Runnable replyRunnable = new Runnable() {
539 sender.tell(messageSupplier.get(), self);
543 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
544 new OnShardInitialized(replyRunnable);
546 shardInformation.addOnShardInitialized(onShardInitialized);
548 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
549 if(shardInformation.isShardInitialized()) {
550 // If the shard is already initialized then we'll wait enough time for the shard to
551 // elect a leader, ie 2 times the election timeout.
552 timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
553 .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
556 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
557 shardInformation.getShardName());
559 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
561 new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
562 getContext().dispatcher(), getSelf());
564 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
566 } else if (!shardInformation.isShardInitialized()) {
567 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
568 shardInformation.getShardName());
569 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
571 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
572 shardInformation.getShardName());
573 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
579 getSender().tell(messageSupplier.get(), getSelf());
582 private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
583 return new NoShardLeaderException(null, shardId.toString());
586 private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
587 return new NotInitializedException(String.format(
588 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
591 private void memberRemoved(ClusterEvent.MemberRemoved message) {
592 String memberName = message.member().roles().head();
594 LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
595 message.member().address());
597 peerAddressResolver.removePeerAddress(memberName);
599 for(ShardInformation info : localShards.values()){
600 info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
604 private void memberExited(ClusterEvent.MemberExited message) {
605 String memberName = message.member().roles().head();
607 LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
608 message.member().address());
610 peerAddressResolver.removePeerAddress(memberName);
612 for(ShardInformation info : localShards.values()){
613 info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
617 private void memberUp(ClusterEvent.MemberUp message) {
618 String memberName = message.member().roles().head();
620 LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
621 message.member().address());
623 addPeerAddress(memberName, message.member().address());
628 private void addPeerAddress(String memberName, Address address) {
629 peerAddressResolver.addPeerAddress(memberName, address);
631 for(ShardInformation info : localShards.values()){
632 String shardName = info.getShardName();
633 String peerId = getShardIdentifier(memberName, shardName).toString();
634 info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
636 info.peerUp(memberName, peerId, getSelf());
640 private void memberReachable(ClusterEvent.ReachableMember message) {
641 String memberName = message.member().roles().head();
642 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
644 addPeerAddress(memberName, message.member().address());
646 markMemberAvailable(memberName);
649 private void memberUnreachable(ClusterEvent.UnreachableMember message) {
650 String memberName = message.member().roles().head();
651 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
653 markMemberUnavailable(memberName);
656 private void markMemberUnavailable(final String memberName) {
657 for(ShardInformation info : localShards.values()){
658 String leaderId = info.getLeaderId();
659 if(leaderId != null && leaderId.contains(memberName)) {
660 LOG.debug("Marking Leader {} as unavailable.", leaderId);
661 info.setLeaderAvailable(false);
663 primaryShardInfoCache.remove(info.getShardName());
666 info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
670 private void markMemberAvailable(final String memberName) {
671 for(ShardInformation info : localShards.values()){
672 String leaderId = info.getLeaderId();
673 if(leaderId != null && leaderId.contains(memberName)) {
674 LOG.debug("Marking Leader {} as available.", leaderId);
675 info.setLeaderAvailable(true);
678 info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
682 private void onDatastoreContextFactory(DatastoreContextFactory factory) {
683 datastoreContextFactory = factory;
684 for (ShardInformation info : localShards.values()) {
685 info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
689 private void onSwitchShardBehavior(SwitchShardBehavior message) {
690 ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
692 ShardInformation shardInformation = localShards.get(identifier.getShardName());
694 if(shardInformation != null && shardInformation.getActor() != null) {
695 shardInformation.getActor().tell(
696 new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
698 LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
699 message.getShardName(), message.getNewState());
704 * Notifies all the local shards of a change in the schema context
708 private void updateSchemaContext(final Object message) {
709 schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
711 LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
713 for (ShardInformation info : localShards.values()) {
714 if (info.getActor() == null) {
715 LOG.debug("Creating Shard {}", info.getShardId());
716 info.setActor(newShardActor(schemaContext, info));
718 info.getActor().tell(message, getSelf());
724 protected ClusterWrapper getCluster() {
729 protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
730 return getContext().actorOf(info.newProps(schemaContext)
731 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
734 private void findPrimary(FindPrimary message) {
735 LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
737 final String shardName = message.getShardName();
738 final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
740 // First see if the there is a local replica for the shard
741 final ShardInformation info = localShards.get(shardName);
742 if (info != null && info.isActiveMember()) {
743 sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
745 public Object get() {
746 String primaryPath = info.getSerializedLeaderActor();
747 Object found = canReturnLocalShardState && info.isLeader() ?
748 new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
749 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
751 if(LOG.isDebugEnabled()) {
752 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
762 for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
763 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
766 getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
767 message.isWaitUntilReady()), getContext());
771 LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
773 getSender().tell(new PrimaryNotFoundException(
774 String.format("No primary shard found for %s.", shardName)), getSelf());
778 * Construct the name of the shard actor given the name of the member on
779 * which the shard resides and the name of the shard
785 private ShardIdentifier getShardIdentifier(String memberName, String shardName){
786 return peerAddressResolver.getShardIdentifier(memberName, shardName);
790 * Create shards that are local to the member on which the ShardManager
794 private void createLocalShards() {
795 String memberName = this.cluster.getCurrentMemberName();
796 Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
798 Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
799 if(restoreFromSnapshot != null)
801 for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
802 shardSnapshots.put(snapshot.getName(), snapshot);
806 restoreFromSnapshot = null; // null out to GC
808 for(String shardName : memberShardNames){
809 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
810 Map<String, String> peerAddresses = getPeerAddresses(shardName);
811 localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
812 newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
813 shardSnapshots.get(shardName)), peerAddressResolver));
814 mBean.addLocalShard(shardId.toString());
819 * Given the name of the shard find the addresses of all it's peers
823 private Map<String, String> getPeerAddresses(String shardName) {
824 Collection<String> members = configuration.getMembersFromShardName(shardName);
825 Map<String, String> peerAddresses = new HashMap<>();
827 String currentMemberName = this.cluster.getCurrentMemberName();
829 for(String memberName : members) {
830 if(!currentMemberName.equals(memberName)) {
831 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
832 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
833 peerAddresses.put(shardId.toString(), address);
836 return peerAddresses;
840 public SupervisorStrategy supervisorStrategy() {
842 return new OneForOneStrategy(10, Duration.create("1 minute"),
843 new Function<Throwable, SupervisorStrategy.Directive>() {
845 public SupervisorStrategy.Directive apply(Throwable t) {
846 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
847 return SupervisorStrategy.resume();
855 public String persistenceId() {
856 return persistenceId;
860 ShardManagerInfoMBean getMBean(){
864 private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
865 if (shardReplicaOperationsInProgress.contains(shardName)) {
866 String msg = String.format("A shard replica operation for %s is already in progress", shardName);
867 LOG.debug ("{}: {}", persistenceId(), msg);
868 sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
875 private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
876 final String shardName = shardReplicaMsg.getShardName();
878 LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
880 // verify the shard with the specified name is present in the cluster configuration
881 if (!(this.configuration.isShardConfigured(shardName))) {
882 String msg = String.format("No module configuration exists for shard %s", shardName);
883 LOG.debug ("{}: {}", persistenceId(), msg);
884 getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
888 // Create the localShard
889 if (schemaContext == null) {
890 String msg = String.format(
891 "No SchemaContext is available in order to create a local shard instance for %s", shardName);
892 LOG.debug ("{}: {}", persistenceId(), msg);
893 getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
897 findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
899 public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
900 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
905 public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
906 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
912 private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
913 String msg = String.format("Local shard %s already exists", shardName);
914 LOG.debug ("{}: {}", persistenceId(), msg);
915 sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
918 private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
919 if(isShardReplicaOperationInProgress(shardName, sender)) {
923 shardReplicaOperationsInProgress.add(shardName);
925 final ShardInformation shardInfo;
926 final boolean removeShardOnFailure;
927 ShardInformation existingShardInfo = localShards.get(shardName);
928 if(existingShardInfo == null) {
929 removeShardOnFailure = true;
930 ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
932 DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
933 DisableElectionsRaftPolicy.class.getName()).build();
935 shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
936 Shard.builder(), peerAddressResolver);
937 shardInfo.setActiveMember(false);
938 localShards.put(shardName, shardInfo);
939 shardInfo.setActor(newShardActor(schemaContext, shardInfo));
941 removeShardOnFailure = false;
942 shardInfo = existingShardInfo;
945 String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
947 //inform ShardLeader to add this shard as a replica by sending an AddServer message
948 LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
949 response.getPrimaryPath(), shardInfo.getShardId());
951 Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
953 Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
954 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
956 futureObj.onComplete(new OnComplete<Object>() {
958 public void onComplete(Throwable failure, Object addServerResponse) {
959 if (failure != null) {
960 LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
961 response.getPrimaryPath(), shardName, failure);
963 String msg = String.format("AddServer request to leader %s for shard %s failed",
964 response.getPrimaryPath(), shardName);
965 self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
967 self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
968 response.getPrimaryPath(), removeShardOnFailure), sender);
971 }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
974 private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
975 boolean removeShardOnFailure) {
976 shardReplicaOperationsInProgress.remove(shardName);
978 if(removeShardOnFailure) {
979 ShardInformation shardInfo = localShards.remove(shardName);
980 if (shardInfo.getActor() != null) {
981 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
985 sender.tell(new akka.actor.Status.Failure(message == null ? failure :
986 new RuntimeException(message, failure)), getSelf());
989 private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
990 String leaderPath, boolean removeShardOnFailure) {
991 String shardName = shardInfo.getShardName();
992 shardReplicaOperationsInProgress.remove(shardName);
994 LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
996 if (replyMsg.getStatus() == ServerChangeStatus.OK) {
997 LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
999 // Make the local shard voting capable
1000 shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1001 shardInfo.setActiveMember(true);
1004 mBean.addLocalShard(shardInfo.getShardId().toString());
1005 sender.tell(new akka.actor.Status.Success(null), getSelf());
1006 } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1007 sendLocalReplicaAlreadyExistsReply(shardName, sender);
1009 LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1010 persistenceId(), shardName, replyMsg.getStatus());
1013 switch (replyMsg.getStatus()) {
1015 failure = new TimeoutException(String.format(
1016 "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1017 "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1018 leaderPath, shardName));
1021 failure = createNoShardLeaderException(shardInfo.getShardId());
1024 failure = new RuntimeException(String.format(
1025 "AddServer request to leader %s for shard %s failed with status %s",
1026 leaderPath, shardName, replyMsg.getStatus()));
1029 onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1033 private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1034 String shardName = shardReplicaMsg.getShardName();
1036 // verify the local shard replica is available in the controller node
1037 if (!localShards.containsKey(shardName)) {
1038 String msg = String.format("Local shard %s does not", shardName);
1039 LOG.debug ("{}: {}", persistenceId(), msg);
1040 getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1043 // call RemoveShard for the shardName
1044 getSender().tell(new akka.actor.Status.Success(true), getSelf());
1048 private void persistShardList() {
1049 List<String> shardList = new ArrayList<>(localShards.keySet());
1050 for (ShardInformation shardInfo : localShards.values()) {
1051 if (!shardInfo.isActiveMember()) {
1052 shardList.remove(shardInfo.getShardName());
1055 LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1056 saveSnapshot(new ShardManagerSnapshot(shardList));
1059 private void handleShardRecovery(SnapshotOffer offer) {
1060 LOG.debug ("{}: in handleShardRecovery", persistenceId());
1061 ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
1062 String currentMember = cluster.getCurrentMemberName();
1063 Set<String> configuredShardList =
1064 new HashSet<>(configuration.getMemberShardNames(currentMember));
1065 for (String shard : snapshot.getShardList()) {
1066 if (!configuredShardList.contains(shard)) {
1067 // add the current member as a replica for the shard
1068 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1069 configuration.addMemberReplicaForShard(shard, currentMember);
1071 configuredShardList.remove(shard);
1074 for (String shard : configuredShardList) {
1075 // remove the member as a replica for the shard
1076 LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1077 configuration.removeMemberReplicaForShard(shard, currentMember);
1081 private static class ForwardedAddServerReply {
1082 ShardInformation shardInfo;
1083 AddServerReply addServerReply;
1085 boolean removeShardOnFailure;
1087 ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1088 boolean removeShardOnFailure) {
1089 this.shardInfo = shardInfo;
1090 this.addServerReply = addServerReply;
1091 this.leaderPath = leaderPath;
1092 this.removeShardOnFailure = removeShardOnFailure;
1096 private static class ForwardedAddServerFailure {
1098 String failureMessage;
1100 boolean removeShardOnFailure;
1102 ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1103 boolean removeShardOnFailure) {
1104 this.shardName = shardName;
1105 this.failureMessage = failureMessage;
1106 this.failure = failure;
1107 this.removeShardOnFailure = removeShardOnFailure;
1112 protected static class ShardInformation {
1113 private final ShardIdentifier shardId;
1114 private final String shardName;
1115 private ActorRef actor;
1116 private ActorPath actorPath;
1117 private final Map<String, String> initialPeerAddresses;
1118 private Optional<DataTree> localShardDataTree;
1119 private boolean leaderAvailable = false;
1121 // flag that determines if the actor is ready for business
1122 private boolean actorInitialized = false;
1124 private boolean followerSyncStatus = false;
1126 private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1127 private String role ;
1128 private String leaderId;
1129 private short leaderVersion;
1131 private DatastoreContext datastoreContext;
1132 private Shard.AbstractBuilder<?, ?> builder;
1133 private final ShardPeerAddressResolver addressResolver;
1134 private boolean isActiveMember = true;
1136 private ShardInformation(String shardName, ShardIdentifier shardId,
1137 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1138 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1139 this.shardName = shardName;
1140 this.shardId = shardId;
1141 this.initialPeerAddresses = initialPeerAddresses;
1142 this.datastoreContext = datastoreContext;
1143 this.builder = builder;
1144 this.addressResolver = addressResolver;
1147 Props newProps(SchemaContext schemaContext) {
1148 Preconditions.checkNotNull(builder);
1149 Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1150 schemaContext(schemaContext).props();
1155 String getShardName() {
1160 ActorRef getActor(){
1164 ActorPath getActorPath() {
1168 void setActor(ActorRef actor) {
1170 this.actorPath = actor.path();
1173 ShardIdentifier getShardId() {
1177 void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1178 this.localShardDataTree = localShardDataTree;
1181 Optional<DataTree> getLocalShardDataTree() {
1182 return localShardDataTree;
1185 DatastoreContext getDatastoreContext() {
1186 return datastoreContext;
1189 void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1190 this.datastoreContext = datastoreContext;
1191 if (actor != null) {
1192 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1193 actor.tell(this.datastoreContext, sender);
1197 void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1198 LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1201 if(LOG.isDebugEnabled()) {
1202 LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1203 peerId, peerAddress, actor.path());
1206 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1209 notifyOnShardInitializedCallbacks();
1212 void peerDown(String memberName, String peerId, ActorRef sender) {
1214 actor.tell(new PeerDown(memberName, peerId), sender);
1218 void peerUp(String memberName, String peerId, ActorRef sender) {
1220 actor.tell(new PeerUp(memberName, peerId), sender);
1224 boolean isShardReady() {
1225 return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1228 boolean isShardReadyWithLeaderId() {
1229 return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1230 (isLeader() || addressResolver.resolve(leaderId) != null);
1233 boolean isShardInitialized() {
1234 return getActor() != null && actorInitialized;
1237 boolean isLeader() {
1238 return Objects.equal(leaderId, shardId.toString());
1241 String getSerializedLeaderActor() {
1243 return Serialization.serializedActorPath(getActor());
1245 return addressResolver.resolve(leaderId);
1249 void setActorInitialized() {
1250 LOG.debug("Shard {} is initialized", shardId);
1252 this.actorInitialized = true;
1254 notifyOnShardInitializedCallbacks();
1257 private void notifyOnShardInitializedCallbacks() {
1258 if(onShardInitializedSet.isEmpty()) {
1262 boolean ready = isShardReadyWithLeaderId();
1264 if(LOG.isDebugEnabled()) {
1265 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1266 ready ? "ready" : "initialized", onShardInitializedSet.size());
1269 Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1270 while(iter.hasNext()) {
1271 OnShardInitialized onShardInitialized = iter.next();
1272 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1274 onShardInitialized.getTimeoutSchedule().cancel();
1275 onShardInitialized.getReplyRunnable().run();
1280 void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1281 onShardInitializedSet.add(onShardInitialized);
1284 void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1285 onShardInitializedSet.remove(onShardInitialized);
1288 void setRole(String newRole) {
1289 this.role = newRole;
1291 notifyOnShardInitializedCallbacks();
1294 void setFollowerSyncStatus(boolean syncStatus){
1295 this.followerSyncStatus = syncStatus;
1299 if(RaftState.Follower.name().equals(this.role)){
1300 return followerSyncStatus;
1301 } else if(RaftState.Leader.name().equals(this.role)){
1308 boolean setLeaderId(String leaderId) {
1309 boolean changed = !Objects.equal(this.leaderId, leaderId);
1310 this.leaderId = leaderId;
1311 if(leaderId != null) {
1312 this.leaderAvailable = true;
1314 notifyOnShardInitializedCallbacks();
1319 String getLeaderId() {
1323 void setLeaderAvailable(boolean leaderAvailable) {
1324 this.leaderAvailable = leaderAvailable;
1327 short getLeaderVersion() {
1328 return leaderVersion;
1331 void setLeaderVersion(short leaderVersion) {
1332 this.leaderVersion = leaderVersion;
1335 boolean isActiveMember() {
1336 return isActiveMember;
1339 void setActiveMember(boolean isActiveMember) {
1340 this.isActiveMember = isActiveMember;
1344 private static class OnShardInitialized {
1345 private final Runnable replyRunnable;
1346 private Cancellable timeoutSchedule;
1348 OnShardInitialized(Runnable replyRunnable) {
1349 this.replyRunnable = replyRunnable;
1352 Runnable getReplyRunnable() {
1353 return replyRunnable;
1356 Cancellable getTimeoutSchedule() {
1357 return timeoutSchedule;
1360 void setTimeoutSchedule(Cancellable timeoutSchedule) {
1361 this.timeoutSchedule = timeoutSchedule;
1365 private static class OnShardReady extends OnShardInitialized {
1366 OnShardReady(Runnable replyRunnable) {
1367 super(replyRunnable);
1371 private static class ShardNotInitializedTimeout {
1372 private final ActorRef sender;
1373 private final ShardInformation shardInfo;
1374 private final OnShardInitialized onShardInitialized;
1376 ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1377 this.sender = sender;
1378 this.shardInfo = shardInfo;
1379 this.onShardInitialized = onShardInitialized;
1382 ActorRef getSender() {
1386 ShardInformation getShardInfo() {
1390 OnShardInitialized getOnShardInitialized() {
1391 return onShardInitialized;
1396 * We no longer persist SchemaContextModules but keep this class around for now for backwards
1397 * compatibility so we don't get de-serialization failures on upgrade from Helium.
1400 static class SchemaContextModules implements Serializable {
1401 private static final long serialVersionUID = -8884620101025936590L;
1403 private final Set<String> modules;
1405 SchemaContextModules(Set<String> modules){
1406 this.modules = modules;
1409 public Set<String> getModules() {
1414 public static Builder builder() {
1415 return new Builder();
1418 public static class Builder {
1419 private ClusterWrapper cluster;
1420 private Configuration configuration;
1421 private DatastoreContextFactory datastoreContextFactory;
1422 private CountDownLatch waitTillReadyCountdownLatch;
1423 private PrimaryShardInfoFutureCache primaryShardInfoCache;
1424 private DatastoreSnapshot restoreFromSnapshot;
1425 private volatile boolean sealed;
1427 protected void checkSealed() {
1428 Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1431 public Builder cluster(ClusterWrapper cluster) {
1433 this.cluster = cluster;
1437 public Builder configuration(Configuration configuration) {
1439 this.configuration = configuration;
1443 public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1445 this.datastoreContextFactory = datastoreContextFactory;
1449 public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1451 this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1455 public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1457 this.primaryShardInfoCache = primaryShardInfoCache;
1461 public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1463 this.restoreFromSnapshot = restoreFromSnapshot;
1467 protected void verify() {
1469 Preconditions.checkNotNull(cluster, "cluster should not be null");
1470 Preconditions.checkNotNull(configuration, "configuration should not be null");
1471 Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1472 Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1473 Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1476 public Props props() {
1478 return Props.create(ShardManager.class, this);
1482 private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1483 Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1484 getShardInitializationTimeout().duration().$times(2));
1487 Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1488 futureObj.onComplete(new OnComplete<Object>() {
1490 public void onComplete(Throwable failure, Object response) {
1491 if (failure != null) {
1492 handler.onFailure(failure);
1494 if(response instanceof RemotePrimaryShardFound) {
1495 handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1496 } else if(response instanceof LocalPrimaryShardFound) {
1497 handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1499 handler.onUnknownResponse(response);
1503 }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1507 * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1508 * a remote or local find primary message is processed
1510 private static interface FindPrimaryResponseHandler {
1512 * Invoked when a Failure message is received as a response
1516 void onFailure(Throwable failure);
1519 * Invoked when a RemotePrimaryShardFound response is received
1523 void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1526 * Invoked when a LocalPrimaryShardFound response is received
1529 void onLocalPrimaryFound(LocalPrimaryShardFound response);
1532 * Invoked when an unknown response is received. This is another type of failure.
1536 void onUnknownResponse(Object response);
1540 * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1541 * replica and sends a wrapped Failure response to some targetActor
1543 private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1544 private final ActorRef targetActor;
1545 private final String shardName;
1546 private final String persistenceId;
1547 private final ActorRef shardManagerActor;
1550 * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1551 * @param shardName The name of the shard for which the primary replica had to be found
1552 * @param persistenceId The persistenceId for the ShardManager
1553 * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1555 protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1556 this.targetActor = Preconditions.checkNotNull(targetActor);
1557 this.shardName = Preconditions.checkNotNull(shardName);
1558 this.persistenceId = Preconditions.checkNotNull(persistenceId);
1559 this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1562 public ActorRef getTargetActor() {
1566 public String getShardName() {
1570 public ActorRef getShardManagerActor() {
1571 return shardManagerActor;
1575 public void onFailure(Throwable failure) {
1576 LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1577 targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1578 String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1582 public void onUnknownResponse(Object response) {
1583 String msg = String.format("Failed to find leader for shard %s: received response: %s",
1584 shardName, response);
1585 LOG.debug ("{}: {}", persistenceId, msg);
1586 targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1587 new RuntimeException(msg)), shardManagerActor);
1593 * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1594 * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1595 * as a successful response to find primary.
1597 private static class PrimaryShardFoundForContext {
1598 private final String shardName;
1599 private final Object contextMessage;
1600 private final RemotePrimaryShardFound remotePrimaryShardFound;
1601 private final LocalPrimaryShardFound localPrimaryShardFound;
1603 public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
1604 this.shardName = Preconditions.checkNotNull(shardName);
1605 this.contextMessage = Preconditions.checkNotNull(contextMessage);
1606 Preconditions.checkNotNull(primaryFoundMessage);
1607 this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
1608 this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
1612 public String getPrimaryPath(){
1613 if(remotePrimaryShardFound != null){
1614 return remotePrimaryShardFound.getPrimaryPath();
1616 return localPrimaryShardFound.getPrimaryPath();
1620 public Object getContextMessage() {
1621 return contextMessage;
1625 public RemotePrimaryShardFound getRemotePrimaryShardFound(){
1626 return remotePrimaryShardFound;
1630 public LocalPrimaryShardFound getLocalPrimaryShardFound(){
1631 return localPrimaryShardFound;
1634 boolean isPrimaryLocal(){
1635 return (remotePrimaryShardFound == null);
1639 public String getShardName() {