2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import java.util.Collection;
12 import java.util.HashMap;
13 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.TimeUnit;
17 import javax.annotation.Nonnull;
18 import org.opendaylight.controller.cluster.DataPersistenceProvider;
19 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
20 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
21 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
22 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
23 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
27 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
31 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
38 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
44 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.controller.cluster.datastore.modification.Modification;
47 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.raft.RaftActor;
50 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
51 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
53 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
54 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
55 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
56 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
57 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
58 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
60 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
62 import org.opendaylight.yangtools.concepts.ListenerRegistration;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
65 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
66 import scala.concurrent.duration.Duration;
67 import scala.concurrent.duration.FiniteDuration;
68 import akka.actor.ActorRef;
69 import akka.actor.ActorSelection;
70 import akka.actor.Cancellable;
71 import akka.actor.PoisonPill;
72 import akka.actor.Props;
73 import akka.event.Logging;
74 import akka.event.LoggingAdapter;
75 import akka.japi.Creator;
76 import akka.persistence.RecoveryFailure;
77 import akka.serialization.Serialization;
78 import com.google.common.annotations.VisibleForTesting;
79 import com.google.common.base.Optional;
80 import com.google.common.base.Preconditions;
81 import com.google.common.collect.Lists;
82 import com.google.common.util.concurrent.FutureCallback;
83 import com.google.common.util.concurrent.Futures;
84 import com.google.common.util.concurrent.ListenableFuture;
85 import com.google.protobuf.ByteString;
86 import com.google.protobuf.InvalidProtocolBufferException;
89 * A Shard represents a portion of the logical data tree <br/>
91 * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
94 public class Shard extends RaftActor {
96 private static final int HELIUM_1_TX_VERSION = 1;
98 private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
100 private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
102 public static final String DEFAULT_NAME = "default";
104 // The state of this Shard
105 private final InMemoryDOMDataStore store;
107 private final LoggingAdapter LOG =
108 Logging.getLogger(getContext().system(), this);
110 /// The name of this shard
111 private final ShardIdentifier name;
113 private final ShardStats shardMBean;
115 private final List<ActorSelection> dataChangeListeners = Lists.newArrayList();
117 private final List<DelayedListenerRegistration> delayedListenerRegistrations =
118 Lists.newArrayList();
120 private final DatastoreContext datastoreContext;
122 private final DataPersistenceProvider dataPersistenceProvider;
124 private SchemaContext schemaContext;
126 private ActorRef createSnapshotTransaction;
128 private int createSnapshotTransactionCounter;
130 private final ShardCommitCoordinator commitCoordinator;
132 private final long transactionCommitTimeout;
134 private Cancellable txCommitTimeoutCheckSchedule;
137 * Coordinates persistence recovery on startup.
139 private ShardRecoveryCoordinator recoveryCoordinator;
140 private List<Object> currentLogRecoveryBatch;
142 private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
144 protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
145 DatastoreContext datastoreContext, SchemaContext schemaContext) {
146 super(name.toString(), mapPeerAddresses(peerAddresses),
147 Optional.of(datastoreContext.getShardRaftConfig()));
150 this.datastoreContext = datastoreContext;
151 this.schemaContext = schemaContext;
152 this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
154 LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
156 store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
157 datastoreContext.getDataStoreProperties());
159 if(schemaContext != null) {
160 store.onGlobalContextUpdated(schemaContext);
163 shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
164 datastoreContext.getDataStoreMXBeanType());
165 shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
166 shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
168 if (isMetricsCaptureEnabled()) {
169 getContext().become(new MeteringBehavior(this));
172 commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
173 datastoreContext.getShardTransactionCommitQueueCapacity());
175 transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
176 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
179 private static Map<String, String> mapPeerAddresses(
180 Map<ShardIdentifier, String> peerAddresses) {
181 Map<String, String> map = new HashMap<>();
183 for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
185 map.put(entry.getKey().toString(), entry.getValue());
191 public static Props props(final ShardIdentifier name,
192 final Map<ShardIdentifier, String> peerAddresses,
193 DatastoreContext datastoreContext, SchemaContext schemaContext) {
194 Preconditions.checkNotNull(name, "name should not be null");
195 Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
196 Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
197 Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
199 return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
203 public void postStop() {
206 if(txCommitTimeoutCheckSchedule != null) {
207 txCommitTimeoutCheckSchedule.cancel();
212 public void onReceiveRecover(Object message) throws Exception {
213 if(LOG.isDebugEnabled()) {
214 LOG.debug("onReceiveRecover: Received message {} from {}",
215 message.getClass().toString(),
219 if (message instanceof RecoveryFailure){
220 LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
222 // Even though recovery failed, we still need to finish our recovery, eg send the
223 // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
224 onRecoveryComplete();
226 super.onReceiveRecover(message);
231 public void onReceiveCommand(Object message) throws Exception {
232 if(LOG.isDebugEnabled()) {
233 LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
236 if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
237 handleReadDataReply(message);
238 } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
239 handleCreateTransaction(message);
240 } else if(message instanceof ForwardedReadyTransaction) {
241 handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
242 } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
243 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
244 } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
245 handleCommitTransaction(CommitTransaction.fromSerializable(message));
246 } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
247 handleAbortTransaction(AbortTransaction.fromSerializable(message));
248 } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
249 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
250 } else if (message instanceof RegisterChangeListener) {
251 registerChangeListener((RegisterChangeListener) message);
252 } else if (message instanceof UpdateSchemaContext) {
253 updateSchemaContext((UpdateSchemaContext) message);
254 } else if (message instanceof PeerAddressResolved) {
255 PeerAddressResolved resolved = (PeerAddressResolved) message;
256 setPeerAddress(resolved.getPeerId().toString(),
257 resolved.getPeerAddress());
258 } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
259 handleTransactionCommitTimeoutCheck();
261 super.onReceiveCommand(message);
265 private void handleTransactionCommitTimeoutCheck() {
266 CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
267 if(cohortEntry != null) {
268 long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
269 if(elapsed > transactionCommitTimeout) {
270 LOG.warning("Current transaction {} has timed out after {} ms - aborting",
271 cohortEntry.getTransactionID(), transactionCommitTimeout);
273 doAbortTransaction(cohortEntry.getTransactionID(), null);
278 private void handleCommitTransaction(CommitTransaction commit) {
279 final String transactionID = commit.getTransactionID();
281 LOG.debug("Committing transaction {}", transactionID);
283 // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
285 final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
286 if(cohortEntry == null) {
287 // We're not the current Tx - the Tx was likely expired b/c it took too long in
288 // between the canCommit and commit messages.
289 IllegalStateException ex = new IllegalStateException(
290 String.format("Cannot commit transaction %s - it is not the current transaction",
292 LOG.error(ex.getMessage());
293 shardMBean.incrementFailedTransactionsCount();
294 getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
298 // We perform the preCommit phase here atomically with the commit phase. This is an
299 // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
300 // coordination of preCommit across shards in case of failure but preCommit should not
301 // normally fail since we ensure only one concurrent 3-phase commit.
304 // We block on the future here so we don't have to worry about possibly accessing our
305 // state on a different thread outside of our dispatcher. Also, the data store
306 // currently uses a same thread executor anyway.
307 cohortEntry.getCohort().preCommit().get();
309 Shard.this.persistData(getSender(), transactionID,
310 new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
311 } catch (InterruptedException | ExecutionException e) {
312 LOG.error(e, "An exception occurred while preCommitting transaction {}",
313 cohortEntry.getTransactionID());
314 shardMBean.incrementFailedTransactionsCount();
315 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
318 cohortEntry.updateLastAccessTime();
321 private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
322 // With persistence enabled, this method is called via applyState by the leader strategy
323 // after the commit has been replicated to a majority of the followers.
325 CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
326 if(cohortEntry == null) {
327 // The transaction is no longer the current commit. This can happen if the transaction
328 // was aborted prior, most likely due to timeout in the front-end. We need to finish
329 // committing the transaction though since it was successfully persisted and replicated
330 // however we can't use the original cohort b/c it was already preCommitted and may
331 // conflict with the current commit or may have been aborted so we commit with a new
333 cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
334 if(cohortEntry != null) {
335 commitWithNewTransaction(cohortEntry.getModification());
336 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
338 // This really shouldn't happen - it likely means that persistence or replication
339 // took so long to complete such that the cohort entry was expired from the cache.
340 IllegalStateException ex = new IllegalStateException(
341 String.format("Could not finish committing transaction %s - no CohortEntry found",
343 LOG.error(ex.getMessage());
344 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
350 LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
353 // We block on the future here so we don't have to worry about possibly accessing our
354 // state on a different thread outside of our dispatcher. Also, the data store
355 // currently uses a same thread executor anyway.
356 cohortEntry.getCohort().commit().get();
358 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
360 shardMBean.incrementCommittedTransactionCount();
361 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
363 } catch (InterruptedException | ExecutionException e) {
364 sender.tell(new akka.actor.Status.Failure(e), getSelf());
366 LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
367 shardMBean.incrementFailedTransactionsCount();
370 commitCoordinator.currentTransactionComplete(transactionID, true);
373 private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
374 LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
375 commitCoordinator.handleCanCommit(canCommit, getSender(), self());
378 private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
379 LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
380 ready.getTxnClientVersion());
382 // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
383 // commitCoordinator in preparation for the subsequent three phase commit initiated by
385 commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
386 ready.getModification());
388 // Return our actor path as we'll handle the three phase commit, except if the Tx client
389 // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
390 // node. In that case, the subsequent 3-phase commit messages won't contain the
391 // transactionId so to maintain backwards compatibility, we create a separate cohort actor
392 // to provide the compatible behavior.
393 ActorRef replyActorPath = self();
394 if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
395 LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
396 replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
397 ready.getTransactionID()));
400 ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
401 Serialization.serializedActorPath(replyActorPath));
402 getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
403 readyTransactionReply, getSelf());
406 private void handleAbortTransaction(AbortTransaction abort) {
407 doAbortTransaction(abort.getTransactionID(), getSender());
410 private void doAbortTransaction(String transactionID, final ActorRef sender) {
411 final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
412 if(cohortEntry != null) {
413 LOG.debug("Aborting transaction {}", transactionID);
415 // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
416 // aborted during replication in which case we may still commit locally if replication
418 commitCoordinator.currentTransactionComplete(transactionID, false);
420 final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
421 final ActorRef self = getSelf();
423 Futures.addCallback(future, new FutureCallback<Void>() {
425 public void onSuccess(Void v) {
426 shardMBean.incrementAbortTransactionsCount();
429 sender.tell(new AbortTransactionReply().toSerializable(), self);
434 public void onFailure(Throwable t) {
435 LOG.error(t, "An exception happened during abort");
438 sender.tell(new akka.actor.Status.Failure(t), self);
445 private void handleCreateTransaction(Object message) {
447 createTransaction(CreateTransaction.fromSerializable(message));
448 } else if (getLeader() != null) {
449 getLeader().forward(message, getContext());
451 getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
452 "Could not find shard leader so transaction cannot be created. This typically happens" +
453 " when the system is coming up or recovering and a leader is being elected. Try again" +
454 " later.")), getSelf());
458 private void handleReadDataReply(Object message) {
459 // This must be for install snapshot. Don't want to open this up and trigger
462 self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
465 createSnapshotTransaction = null;
467 // Send a PoisonPill instead of sending close transaction because we do not really need
469 getSender().tell(PoisonPill.getInstance(), self());
472 private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
473 DOMStoreTransactionChain chain =
474 transactionChains.remove(closeTransactionChain.getTransactionChainId());
481 private ActorRef createTypedTransactionActor(int transactionType,
482 ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
484 DOMStoreTransactionFactory factory = store;
486 if(!transactionChainId.isEmpty()) {
487 factory = transactionChains.get(transactionChainId);
489 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
490 transactionChains.put(transactionChainId, transactionChain);
491 factory = transactionChain;
495 if(this.schemaContext == null){
496 throw new NullPointerException("schemaContext should not be null");
499 if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
501 shardMBean.incrementReadOnlyTransactionCount();
503 return getContext().actorOf(
504 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
505 schemaContext,datastoreContext, shardMBean,
506 transactionId.getRemoteTransactionId(), clientVersion),
507 transactionId.toString());
509 } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
511 shardMBean.incrementReadWriteTransactionCount();
513 return getContext().actorOf(
514 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
515 schemaContext, datastoreContext, shardMBean,
516 transactionId.getRemoteTransactionId(), clientVersion),
517 transactionId.toString());
520 } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
522 shardMBean.incrementWriteOnlyTransactionCount();
524 return getContext().actorOf(
525 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
526 schemaContext, datastoreContext, shardMBean,
527 transactionId.getRemoteTransactionId(), clientVersion),
528 transactionId.toString());
530 throw new IllegalArgumentException(
531 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
536 private void createTransaction(CreateTransaction createTransaction) {
537 createTransaction(createTransaction.getTransactionType(),
538 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
539 createTransaction.getClientVersion());
542 private ActorRef createTransaction(int transactionType, String remoteTransactionId,
543 String transactionChainId, int clientVersion) {
545 ShardTransactionIdentifier transactionId =
546 ShardTransactionIdentifier.builder()
547 .remoteTransactionId(remoteTransactionId)
549 if(LOG.isDebugEnabled()) {
550 LOG.debug("Creating transaction : {} ", transactionId);
552 ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
553 transactionChainId, clientVersion);
556 .tell(new CreateTransactionReply(
557 Serialization.serializedActorPath(transactionActor),
558 remoteTransactionId).toSerializable(),
561 return transactionActor;
564 private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
565 throws ExecutionException, InterruptedException {
566 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
567 commitCohort.preCommit().get();
568 commitCohort.commit().get();
571 private void commitWithNewTransaction(Modification modification) {
572 DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
573 modification.apply(tx);
575 syncCommitTransaction(tx);
576 shardMBean.incrementCommittedTransactionCount();
577 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
578 } catch (InterruptedException | ExecutionException e) {
579 shardMBean.incrementFailedTransactionsCount();
580 LOG.error(e, "Failed to commit");
584 private void updateSchemaContext(UpdateSchemaContext message) {
585 this.schemaContext = message.getSchemaContext();
586 updateSchemaContext(message.getSchemaContext());
587 store.onGlobalContextUpdated(message.getSchemaContext());
591 void updateSchemaContext(SchemaContext schemaContext) {
592 store.onGlobalContextUpdated(schemaContext);
595 private void registerChangeListener(RegisterChangeListener registerChangeListener) {
597 LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
599 ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
600 NormalizedNode<?, ?>>> registration;
602 registration = doChangeListenerRegistration(registerChangeListener);
604 LOG.debug("Shard is not the leader - delaying registration");
606 DelayedListenerRegistration delayedReg =
607 new DelayedListenerRegistration(registerChangeListener);
608 delayedListenerRegistrations.add(delayedReg);
609 registration = delayedReg;
612 ActorRef listenerRegistration = getContext().actorOf(
613 DataChangeListenerRegistration.props(registration));
615 LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
616 listenerRegistration.path());
618 getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
621 private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
622 NormalizedNode<?, ?>>> doChangeListenerRegistration(
623 RegisterChangeListener registerChangeListener) {
625 ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
626 registerChangeListener.getDataChangeListenerPath());
628 // Notify the listener if notifications should be enabled or not
629 // If this shard is the leader then it will enable notifications else
631 dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
633 // Now store a reference to the data change listener so it can be notified
634 // at a later point if notifications should be enabled or disabled
635 dataChangeListeners.add(dataChangeListenerPath);
637 AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
638 new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
640 LOG.debug("Registering for path {}", registerChangeListener.getPath());
642 return store.registerChangeListener(registerChangeListener.getPath(), listener,
643 registerChangeListener.getScope());
646 private boolean isMetricsCaptureEnabled(){
647 CommonConfig config = new CommonConfig(getContext().system().settings().config());
648 return config.isMetricCaptureEnabled();
653 void startLogRecoveryBatch(int maxBatchSize) {
654 currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
656 if(LOG.isDebugEnabled()) {
657 LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
662 protected void appendRecoveredLogEntry(Payload data) {
663 if (data instanceof CompositeModificationPayload) {
664 currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
666 LOG.error("Unknown state received {} during recovery", data);
671 protected void applyRecoverySnapshot(ByteString snapshot) {
672 if(recoveryCoordinator == null) {
673 recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
676 recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
678 if(LOG.isDebugEnabled()) {
679 LOG.debug("{} : submitted recovery sbapshot", persistenceId());
684 protected void applyCurrentLogRecoveryBatch() {
685 if(recoveryCoordinator == null) {
686 recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
689 recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
691 if(LOG.isDebugEnabled()) {
692 LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
693 currentLogRecoveryBatch.size());
698 protected void onRecoveryComplete() {
699 if(recoveryCoordinator != null) {
700 Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
702 if(LOG.isDebugEnabled()) {
703 LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
706 for(DOMStoreWriteTransaction tx: txList) {
708 syncCommitTransaction(tx);
709 shardMBean.incrementCommittedTransactionCount();
710 } catch (InterruptedException | ExecutionException e) {
711 shardMBean.incrementFailedTransactionsCount();
712 LOG.error(e, "Failed to commit");
717 recoveryCoordinator = null;
718 currentLogRecoveryBatch = null;
719 updateJournalStats();
721 //notify shard manager
722 getContext().parent().tell(new ActorInitialized(), getSelf());
724 // Being paranoid here - this method should only be called once but just in case...
725 if(txCommitTimeoutCheckSchedule == null) {
726 // Schedule a message to be periodically sent to check if the current in-progress
727 // transaction should be expired and aborted.
728 FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
729 txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
730 period, period, getSelf(),
731 TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
736 protected void applyState(ActorRef clientActor, String identifier, Object data) {
738 if (data instanceof CompositeModificationPayload) {
739 Object modification = ((CompositeModificationPayload) data).getModification();
741 if(modification == null) {
743 "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
744 identifier, clientActor != null ? clientActor.path().toString() : null);
745 } else if(clientActor == null) {
746 // There's no clientActor to which to send a commit reply so we must be applying
747 // replicated state from the leader.
748 commitWithNewTransaction(MutableCompositeModification.fromSerializable(
749 modification, schemaContext));
751 // This must be the OK to commit after replication consensus.
752 finishCommit(clientActor, identifier);
755 LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
756 data, data.getClass().getClassLoader(),
757 CompositeModificationPayload.class.getClassLoader());
760 updateJournalStats();
764 private void updateJournalStats() {
765 ReplicatedLogEntry lastLogEntry = getLastLogEntry();
767 if (lastLogEntry != null) {
768 shardMBean.setLastLogIndex(lastLogEntry.getIndex());
769 shardMBean.setLastLogTerm(lastLogEntry.getTerm());
772 shardMBean.setCommitIndex(getCommitIndex());
773 shardMBean.setLastApplied(getLastApplied());
777 protected void createSnapshot() {
778 if (createSnapshotTransaction == null) {
780 // Create a transaction. We are really going to treat the transaction as a worker
781 // so that this actor does not get block building the snapshot
782 createSnapshotTransaction = createTransaction(
783 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
784 "createSnapshot" + ++createSnapshotTransactionCounter, "",
785 CreateTransaction.CURRENT_CLIENT_VERSION);
787 createSnapshotTransaction.tell(
788 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
795 protected void applySnapshot(ByteString snapshot) {
796 // Since this will be done only on Recovery or when this actor is a Follower
797 // we can safely commit everything in here. We not need to worry about event notifications
798 // as they would have already been disabled on the follower
800 LOG.info("Applying snapshot");
802 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
803 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
804 NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
805 .decode(serializedNode);
807 // delete everything first
808 transaction.delete(YangInstanceIdentifier.builder().build());
810 // Add everything from the remote node back
811 transaction.write(YangInstanceIdentifier.builder().build(), node);
812 syncCommitTransaction(transaction);
813 } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
814 LOG.error(e, "An exception occurred when applying snapshot");
816 LOG.info("Done applying snapshot");
821 protected void onStateChanged() {
822 boolean isLeader = isLeader();
823 for (ActorSelection dataChangeListener : dataChangeListeners) {
824 dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
828 for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
829 if(!reg.isClosed()) {
830 reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
834 delayedListenerRegistrations.clear();
837 shardMBean.setRaftState(getRaftState().name());
838 shardMBean.setCurrentTerm(getCurrentTerm());
840 // If this actor is no longer the leader close all the transaction chains
842 for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
843 if(LOG.isDebugEnabled()) {
845 "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
846 entry.getKey(), getId());
848 entry.getValue().close();
851 transactionChains.clear();
856 protected DataPersistenceProvider persistence() {
857 return dataPersistenceProvider;
860 @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
861 shardMBean.setLeader(newLeader);
864 @Override public String persistenceId() {
865 return this.name.toString();
869 DataPersistenceProvider getDataPersistenceProvider() {
870 return dataPersistenceProvider;
873 private static class ShardCreator implements Creator<Shard> {
875 private static final long serialVersionUID = 1L;
877 final ShardIdentifier name;
878 final Map<ShardIdentifier, String> peerAddresses;
879 final DatastoreContext datastoreContext;
880 final SchemaContext schemaContext;
882 ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
883 DatastoreContext datastoreContext, SchemaContext schemaContext) {
885 this.peerAddresses = peerAddresses;
886 this.datastoreContext = datastoreContext;
887 this.schemaContext = schemaContext;
891 public Shard create() throws Exception {
892 return new Shard(name, peerAddresses, datastoreContext, schemaContext);
897 InMemoryDOMDataStore getDataStore() {
902 ShardStats getShardMBean() {
906 private static class DelayedListenerRegistration implements
907 ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
909 private volatile boolean closed;
911 private final RegisterChangeListener registerChangeListener;
913 private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
914 NormalizedNode<?, ?>>> delegate;
916 DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
917 this.registerChangeListener = registerChangeListener;
920 void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
921 NormalizedNode<?, ?>>> registration) {
922 this.delegate = registration;
929 RegisterChangeListener getRegisterChangeListener() {
930 return registerChangeListener;
934 public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
935 return delegate != null ? delegate.getInstance() : null;
939 public void close() {
941 if(delegate != null) {