2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import akka.persistence.RecoveryFailure;
17 import akka.serialization.Serialization;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.io.IOException;
25 import java.util.HashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
31 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
32 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
33 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
34 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
37 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
41 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
42 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
44 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
50 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
51 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
52 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
53 import org.opendaylight.controller.cluster.datastore.modification.Modification;
54 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
55 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
56 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
57 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
58 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
59 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
60 import org.opendaylight.controller.cluster.raft.RaftActor;
61 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
62 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
63 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
65 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
66 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
67 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
68 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
70 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
71 import scala.concurrent.duration.Duration;
72 import scala.concurrent.duration.FiniteDuration;
75 * A Shard represents a portion of the logical data tree <br/>
77 * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
80 public class Shard extends RaftActor {
82 private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
85 static final String DEFAULT_NAME = "default";
87 // The state of this Shard
88 private final InMemoryDOMDataStore store;
90 /// The name of this shard
91 private final String name;
93 private final ShardStats shardMBean;
95 private DatastoreContext datastoreContext;
97 private final ShardCommitCoordinator commitCoordinator;
99 private long transactionCommitTimeout;
101 private Cancellable txCommitTimeoutCheckSchedule;
103 private final Optional<ActorRef> roleChangeNotifier;
105 private final MessageTracker appendEntriesReplyTracker;
107 private final DOMTransactionFactory domTransactionFactory;
109 private final ShardTransactionActorFactory transactionActorFactory;
111 private final ShardSnapshotCohort snapshotCohort;
113 private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
114 private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
116 protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
117 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
118 super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
120 this.name = name.toString();
121 this.datastoreContext = datastoreContext;
123 setPersistence(datastoreContext.isPersistent());
125 LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
127 store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
128 datastoreContext.getDataStoreProperties());
130 if (schemaContext != null) {
131 store.onGlobalContextUpdated(schemaContext);
134 shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
135 datastoreContext.getDataStoreMXBeanType());
136 shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
137 shardMBean.setShardActor(getSelf());
139 if (isMetricsCaptureEnabled()) {
140 getContext().become(new MeteringBehavior(this));
143 domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
145 commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
146 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
147 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
149 setTransactionCommitTimeout();
151 // create a notifier actor for each cluster member
152 roleChangeNotifier = createRoleChangeNotifier(name.toString());
154 appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
155 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
157 transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
158 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
159 Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
161 snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
164 private void setTransactionCommitTimeout() {
165 transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
166 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
169 public static Props props(final ShardIdentifier name,
170 final Map<String, String> peerAddresses,
171 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
172 Preconditions.checkNotNull(name, "name should not be null");
173 Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
174 Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
175 Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
177 return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
180 private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
181 ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
182 RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
183 return Optional.of(shardRoleChangeNotifier);
187 public void postStop() {
188 LOG.info("Stopping Shard {}", persistenceId());
192 if(txCommitTimeoutCheckSchedule != null) {
193 txCommitTimeoutCheckSchedule.cancel();
196 shardMBean.unregisterMBean();
200 public void onReceiveRecover(final Object message) throws Exception {
201 if(LOG.isDebugEnabled()) {
202 LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
203 message.getClass().toString(), getSender());
206 if (message instanceof RecoveryFailure){
207 LOG.error("{}: Recovery failed because of this cause",
208 persistenceId(), ((RecoveryFailure) message).cause());
210 // Even though recovery failed, we still need to finish our recovery, eg send the
211 // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
212 onRecoveryComplete();
214 super.onReceiveRecover(message);
215 if(LOG.isTraceEnabled()) {
216 appendEntriesReplyTracker.begin();
222 public void onReceiveCommand(final Object message) throws Exception {
224 MessageTracker.Context context = appendEntriesReplyTracker.received(message);
226 if(context.error().isPresent()){
227 LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
232 if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
233 handleCreateTransaction(message);
234 } else if (BatchedModifications.class.isInstance(message)) {
235 handleBatchedModifications((BatchedModifications)message);
236 } else if (message instanceof ForwardedReadyTransaction) {
237 commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
239 } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
240 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
241 } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
242 handleCommitTransaction(CommitTransaction.fromSerializable(message));
243 } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
244 handleAbortTransaction(AbortTransaction.fromSerializable(message));
245 } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
246 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
247 } else if (message instanceof RegisterChangeListener) {
248 changeSupport.onMessage((RegisterChangeListener) message, isLeader());
249 } else if (message instanceof RegisterDataTreeChangeListener) {
250 treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
251 } else if (message instanceof UpdateSchemaContext) {
252 updateSchemaContext((UpdateSchemaContext) message);
253 } else if (message instanceof PeerAddressResolved) {
254 PeerAddressResolved resolved = (PeerAddressResolved) message;
255 setPeerAddress(resolved.getPeerId().toString(),
256 resolved.getPeerAddress());
257 } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
258 handleTransactionCommitTimeoutCheck();
259 } else if(message instanceof DatastoreContext) {
260 onDatastoreContext((DatastoreContext)message);
261 } else if(message instanceof RegisterRoleChangeListener){
262 roleChangeNotifier.get().forward(message, context());
263 } else if (message instanceof FollowerInitialSyncUpStatus){
264 shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
265 context().parent().tell(message, self());
267 super.onReceiveCommand(message);
275 protected Optional<ActorRef> getRoleChangeNotifier() {
276 return roleChangeNotifier;
279 private void onDatastoreContext(DatastoreContext context) {
280 datastoreContext = context;
282 commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
284 setTransactionCommitTimeout();
286 if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
287 setPersistence(true);
288 } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
289 setPersistence(false);
292 updateConfigParams(datastoreContext.getShardRaftConfig());
295 private void handleTransactionCommitTimeoutCheck() {
296 CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
297 if(cohortEntry != null) {
298 long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
299 if(elapsed > transactionCommitTimeout) {
300 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
301 persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
303 doAbortTransaction(cohortEntry.getTransactionID(), null);
308 void continueCommit(final CohortEntry cohortEntry) throws Exception {
309 // If we do not have any followers and we are not using persistence
310 // or if cohortEntry has no modifications
311 // we can apply modification to the state immediately
312 if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
313 applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
315 Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
316 new ModificationPayload(cohortEntry.getModification()));
320 private void handleCommitTransaction(final CommitTransaction commit) {
321 if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
322 shardMBean.incrementFailedTransactionsCount();
326 private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
327 // With persistence enabled, this method is called via applyState by the leader strategy
328 // after the commit has been replicated to a majority of the followers.
330 CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
331 if(cohortEntry == null) {
332 // The transaction is no longer the current commit. This can happen if the transaction
333 // was aborted prior, most likely due to timeout in the front-end. We need to finish
334 // committing the transaction though since it was successfully persisted and replicated
335 // however we can't use the original cohort b/c it was already preCommitted and may
336 // conflict with the current commit or may have been aborted so we commit with a new
338 cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
339 if(cohortEntry != null) {
340 commitWithNewTransaction(cohortEntry.getModification());
341 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
343 // This really shouldn't happen - it likely means that persistence or replication
344 // took so long to complete such that the cohort entry was expired from the cache.
345 IllegalStateException ex = new IllegalStateException(
346 String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
347 persistenceId(), transactionID));
348 LOG.error(ex.getMessage());
349 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
355 LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
358 // We block on the future here so we don't have to worry about possibly accessing our
359 // state on a different thread outside of our dispatcher. Also, the data store
360 // currently uses a same thread executor anyway.
361 cohortEntry.getCohort().commit().get();
363 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
365 shardMBean.incrementCommittedTransactionCount();
366 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
368 } catch (Exception e) {
369 sender.tell(new akka.actor.Status.Failure(e), getSelf());
371 LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
373 shardMBean.incrementFailedTransactionsCount();
375 commitCoordinator.currentTransactionComplete(transactionID, true);
379 private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
380 LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
381 commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
384 private void handleBatchedModifications(BatchedModifications batched) {
385 // This message is sent to prepare the modificationsa transaction directly on the Shard as an
386 // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
387 // BatchedModifications message, the caller sets the ready flag in the message indicating
388 // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
389 // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
390 // ReadyTransaction message.
392 // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
393 // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
394 // the primary/leader shard. However with timing and caching on the front-end, there's a small
395 // window where it could have a stale leader during leadership transitions.
399 commitCoordinator.handleBatchedModifications(batched, getSender(), this);
400 } catch (Exception e) {
401 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
402 batched.getTransactionID(), e);
403 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
406 ActorSelection leader = getLeader();
408 // TODO: what if this is not the first batch and leadership changed in between batched messages?
409 // We could check if the commitCoordinator already has a cached entry and forward all the previous
410 // batched modifications.
411 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
412 leader.forward(batched, getContext());
414 // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
415 // it more resilient in case we're in the process of electing a new leader.
416 getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
417 "Could not find the leader for shard %s. This typically happens" +
418 " when the system is coming up or recovering and a leader is being elected. Try again" +
419 " later.", persistenceId()))), getSelf());
424 private void handleAbortTransaction(final AbortTransaction abort) {
425 doAbortTransaction(abort.getTransactionID(), getSender());
428 void doAbortTransaction(final String transactionID, final ActorRef sender) {
429 final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
430 if(cohortEntry != null) {
431 LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
433 // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
434 // aborted during replication in which case we may still commit locally if replication
436 commitCoordinator.currentTransactionComplete(transactionID, false);
438 final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
439 final ActorRef self = getSelf();
441 Futures.addCallback(future, new FutureCallback<Void>() {
443 public void onSuccess(final Void v) {
444 shardMBean.incrementAbortTransactionsCount();
447 sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
452 public void onFailure(final Throwable t) {
453 LOG.error("{}: An exception happened during abort", persistenceId(), t);
456 sender.tell(new akka.actor.Status.Failure(t), self);
463 private void handleCreateTransaction(final Object message) {
465 createTransaction(CreateTransaction.fromSerializable(message));
466 } else if (getLeader() != null) {
467 getLeader().forward(message, getContext());
469 getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
470 "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
471 " when the system is coming up or recovering and a leader is being elected. Try again" +
472 " later.", persistenceId()))), getSelf());
476 private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
477 domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
480 private ActorRef createTypedTransactionActor(int transactionType,
481 ShardTransactionIdentifier transactionId, String transactionChainId,
482 short clientVersion ) {
484 return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
485 transactionId, transactionChainId, clientVersion);
488 private void createTransaction(CreateTransaction createTransaction) {
490 ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
491 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
492 createTransaction.getVersion());
494 getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
495 createTransaction.getTransactionId()).toSerializable(), getSelf());
496 } catch (Exception e) {
497 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
501 private ActorRef createTransaction(int transactionType, String remoteTransactionId,
502 String transactionChainId, short clientVersion) {
505 ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
507 if(LOG.isDebugEnabled()) {
508 LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
511 ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
512 transactionChainId, clientVersion);
514 return transactionActor;
517 private void commitWithNewTransaction(final Modification modification) {
518 DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
519 modification.apply(tx);
521 snapshotCohort.syncCommitTransaction(tx);
522 shardMBean.incrementCommittedTransactionCount();
523 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
524 } catch (InterruptedException | ExecutionException e) {
525 shardMBean.incrementFailedTransactionsCount();
526 LOG.error("{}: Failed to commit", persistenceId(), e);
530 private void updateSchemaContext(final UpdateSchemaContext message) {
531 updateSchemaContext(message.getSchemaContext());
535 void updateSchemaContext(final SchemaContext schemaContext) {
536 store.onGlobalContextUpdated(schemaContext);
539 private boolean isMetricsCaptureEnabled() {
540 CommonConfig config = new CommonConfig(getContext().system().settings().config());
541 return config.isMetricCaptureEnabled();
545 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
546 return snapshotCohort;
551 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
552 return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
556 protected void onRecoveryComplete() {
557 //notify shard manager
558 getContext().parent().tell(new ActorInitialized(), getSelf());
560 // Being paranoid here - this method should only be called once but just in case...
561 if(txCommitTimeoutCheckSchedule == null) {
562 // Schedule a message to be periodically sent to check if the current in-progress
563 // transaction should be expired and aborted.
564 FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
565 txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
566 period, period, getSelf(),
567 TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
572 protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
574 if(data instanceof ModificationPayload) {
576 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
577 } catch (ClassNotFoundException | IOException e) {
578 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
581 else if (data instanceof CompositeModificationPayload) {
582 Object modification = ((CompositeModificationPayload) data).getModification();
584 applyModificationToState(clientActor, identifier, modification);
585 } else if(data instanceof CompositeModificationByteStringPayload ){
586 Object modification = ((CompositeModificationByteStringPayload) data).getModification();
588 applyModificationToState(clientActor, identifier, modification);
590 LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
591 persistenceId(), data, data.getClass().getClassLoader(),
592 CompositeModificationPayload.class.getClassLoader());
596 private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
597 if(modification == null) {
599 "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
600 persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
601 } else if(clientActor == null) {
602 // There's no clientActor to which to send a commit reply so we must be applying
603 // replicated state from the leader.
604 commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
606 // This must be the OK to commit after replication consensus.
607 finishCommit(clientActor, identifier);
612 protected void onStateChanged() {
613 boolean isLeader = isLeader();
614 changeSupport.onLeadershipChange(isLeader);
615 treeChangeSupport.onLeadershipChange(isLeader);
617 // If this actor is no longer the leader close all the transaction chains
619 if(LOG.isDebugEnabled()) {
621 "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
622 persistenceId(), getId());
625 domTransactionFactory.closeAllTransactionChains();
630 protected void onLeaderChanged(String oldLeader, String newLeader) {
631 shardMBean.incrementLeadershipChangeCount();
635 public String persistenceId() {
640 ShardCommitCoordinator getCommitCoordinator() {
641 return commitCoordinator;
645 private static class ShardCreator implements Creator<Shard> {
647 private static final long serialVersionUID = 1L;
649 final ShardIdentifier name;
650 final Map<String, String> peerAddresses;
651 final DatastoreContext datastoreContext;
652 final SchemaContext schemaContext;
654 ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
655 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
657 this.peerAddresses = peerAddresses;
658 this.datastoreContext = datastoreContext;
659 this.schemaContext = schemaContext;
663 public Shard create() throws Exception {
664 return new Shard(name, peerAddresses, datastoreContext, schemaContext);
669 public InMemoryDOMDataStore getDataStore() {
674 ShardStats getShardMBean() {