Merge "Bug 2055: Handle shard not initialized resiliently"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import akka.japi.Creator;
19 import akka.persistence.RecoveryFailure;
20 import akka.serialization.Serialization;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.collect.Lists;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.protobuf.ByteString;
29 import com.google.protobuf.InvalidProtocolBufferException;
30 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
31 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
32 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
36 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
37 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
40 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
42 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
47 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
49 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
50 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
51 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
53 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
54 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
55 import org.opendaylight.controller.cluster.datastore.modification.Modification;
56 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
57 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
58 import org.opendaylight.controller.cluster.raft.RaftActor;
59 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
60 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
62 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
63 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
64 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
65 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
66 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
68 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
71 import org.opendaylight.yangtools.concepts.ListenerRegistration;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.duration.Duration;
76 import scala.concurrent.duration.FiniteDuration;
77
78 import javax.annotation.Nonnull;
79 import java.util.ArrayList;
80 import java.util.Collection;
81 import java.util.HashMap;
82 import java.util.List;
83 import java.util.Map;
84 import java.util.concurrent.ExecutionException;
85 import java.util.concurrent.TimeUnit;
86
87 /**
88  * A Shard represents a portion of the logical data tree <br/>
89  * <p>
90  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
91  * </p>
92  */
93 public class Shard extends RaftActor {
94
95     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
96
97     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
98
99     public static final String DEFAULT_NAME = "default";
100
101     // The state of this Shard
102     private final InMemoryDOMDataStore store;
103
104     private final LoggingAdapter LOG =
105         Logging.getLogger(getContext().system(), this);
106
107     // By default persistent will be true and can be turned off using the system
108     // property shard.persistent
109     private final boolean persistent;
110
111     /// The name of this shard
112     private final ShardIdentifier name;
113
114     private final ShardStats shardMBean;
115
116     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
117
118     private final DatastoreContext datastoreContext;
119
120     private SchemaContext schemaContext;
121
122     private ActorRef createSnapshotTransaction;
123
124     private int createSnapshotTransactionCounter;
125
126     private final ShardCommitCoordinator commitCoordinator;
127
128     private final long transactionCommitTimeout;
129
130     private Cancellable txCommitTimeoutCheckSchedule;
131
132     /**
133      * Coordinates persistence recovery on startup.
134      */
135     private ShardRecoveryCoordinator recoveryCoordinator;
136     private List<Object> currentLogRecoveryBatch;
137
138     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
139
140     protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
141             DatastoreContext datastoreContext, SchemaContext schemaContext) {
142         super(name.toString(), mapPeerAddresses(peerAddresses),
143                 Optional.of(datastoreContext.getShardRaftConfig()));
144
145         this.name = name;
146         this.datastoreContext = datastoreContext;
147         this.schemaContext = schemaContext;
148
149         String setting = System.getProperty("shard.persistent");
150
151         this.persistent = !"false".equals(setting);
152
153         LOG.info("Shard created : {} persistent : {}", name, persistent);
154
155         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
156                 datastoreContext.getDataStoreProperties());
157
158         if(schemaContext != null) {
159             store.onGlobalContextUpdated(schemaContext);
160         }
161
162         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
163                 datastoreContext.getDataStoreMXBeanType());
164         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
165         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
166
167         if (isMetricsCaptureEnabled()) {
168             getContext().become(new MeteringBehavior(this));
169         }
170
171         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
172                 datastoreContext.getShardTransactionCommitQueueCapacity());
173
174         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
175                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
176     }
177
178     private static Map<String, String> mapPeerAddresses(
179         Map<ShardIdentifier, String> peerAddresses) {
180         Map<String, String> map = new HashMap<>();
181
182         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
183             .entrySet()) {
184             map.put(entry.getKey().toString(), entry.getValue());
185         }
186
187         return map;
188     }
189
190     public static Props props(final ShardIdentifier name,
191         final Map<ShardIdentifier, String> peerAddresses,
192         DatastoreContext datastoreContext, SchemaContext schemaContext) {
193         Preconditions.checkNotNull(name, "name should not be null");
194         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
195         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
196         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
197
198         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
199     }
200
201     @Override
202     public void postStop() {
203         super.postStop();
204
205         if(txCommitTimeoutCheckSchedule != null) {
206             txCommitTimeoutCheckSchedule.cancel();
207         }
208     }
209
210     @Override
211     public void onReceiveRecover(Object message) {
212         if(LOG.isDebugEnabled()) {
213             LOG.debug("onReceiveRecover: Received message {} from {}",
214                 message.getClass().toString(),
215                 getSender());
216         }
217
218         if (message instanceof RecoveryFailure){
219             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
220
221             // Even though recovery failed, we still need to finish our recovery, eg send the
222             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
223             onRecoveryComplete();
224         } else {
225             super.onReceiveRecover(message);
226         }
227     }
228
229     @Override
230     public void onReceiveCommand(Object message) {
231         if(LOG.isDebugEnabled()) {
232             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
233         }
234
235         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
236             handleReadDataReply(message);
237         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
238             handleCreateTransaction(message);
239         } else if(message instanceof ForwardedReadyTransaction) {
240             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
241         } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
242             handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
243         } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
244             handleCommitTransaction(CommitTransaction.fromSerializable(message));
245         } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
246             handleAbortTransaction(AbortTransaction.fromSerializable(message));
247         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
248             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
249         } else if (message instanceof RegisterChangeListener) {
250             registerChangeListener((RegisterChangeListener) message);
251         } else if (message instanceof UpdateSchemaContext) {
252             updateSchemaContext((UpdateSchemaContext) message);
253         } else if (message instanceof PeerAddressResolved) {
254             PeerAddressResolved resolved = (PeerAddressResolved) message;
255             setPeerAddress(resolved.getPeerId().toString(),
256                 resolved.getPeerAddress());
257         } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
258             handleTransactionCommitTimeoutCheck();
259         } else {
260             super.onReceiveCommand(message);
261         }
262     }
263
264     private void handleTransactionCommitTimeoutCheck() {
265         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
266         if(cohortEntry != null) {
267             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
268             if(elapsed > transactionCommitTimeout) {
269                 LOG.warning("Current transaction {} has timed out after {} ms - aborting",
270                         cohortEntry.getTransactionID(), transactionCommitTimeout);
271
272                 doAbortTransaction(cohortEntry.getTransactionID(), null);
273             }
274         }
275     }
276
277     private void handleCommitTransaction(CommitTransaction commit) {
278         final String transactionID = commit.getTransactionID();
279
280         LOG.debug("Committing transaction {}", transactionID);
281
282         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
283         // this transaction.
284         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
285         if(cohortEntry == null) {
286             // We're not the current Tx - the Tx was likely expired b/c it took too long in
287             // between the canCommit and commit messages.
288             IllegalStateException ex = new IllegalStateException(
289                     String.format("Cannot commit transaction %s - it is not the current transaction",
290                             transactionID));
291             LOG.error(ex.getMessage());
292             shardMBean.incrementFailedTransactionsCount();
293             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
294             return;
295         }
296
297         // We perform the preCommit phase here atomically with the commit phase. This is an
298         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
299         // coordination of preCommit across shards in case of failure but preCommit should not
300         // normally fail since we ensure only one concurrent 3-phase commit.
301
302         try {
303             // We block on the future here so we don't have to worry about possibly accessing our
304             // state on a different thread outside of our dispatcher. Also, the data store
305             // currently uses a same thread executor anyway.
306             cohortEntry.getCohort().preCommit().get();
307
308             if(persistent) {
309                 Shard.this.persistData(getSender(), transactionID,
310                         new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
311             } else {
312                 Shard.this.finishCommit(getSender(), transactionID);
313             }
314         } catch (InterruptedException | ExecutionException e) {
315             LOG.error(e, "An exception occurred while preCommitting transaction {}",
316                     cohortEntry.getTransactionID());
317             shardMBean.incrementFailedTransactionsCount();
318             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
319         }
320
321         cohortEntry.updateLastAccessTime();
322     }
323
324     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
325         // With persistence enabled, this method is called via applyState by the leader strategy
326         // after the commit has been replicated to a majority of the followers.
327
328         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
329         if(cohortEntry == null) {
330             // The transaction is no longer the current commit. This can happen if the transaction
331             // was aborted prior, most likely due to timeout in the front-end. We need to finish
332             // committing the transaction though since it was successfully persisted and replicated
333             // however we can't use the original cohort b/c it was already preCommitted and may
334             // conflict with the current commit or may have been aborted so we commit with a new
335             // transaction.
336             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
337             if(cohortEntry != null) {
338                 commitWithNewTransaction(cohortEntry.getModification());
339                 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
340             } else {
341                 // This really shouldn't happen - it likely means that persistence or replication
342                 // took so long to complete such that the cohort entry was expired from the cache.
343                 IllegalStateException ex = new IllegalStateException(
344                         String.format("Could not finish committing transaction %s - no CohortEntry found",
345                                 transactionID));
346                 LOG.error(ex.getMessage());
347                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
348             }
349
350             return;
351         }
352
353         LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
354
355         try {
356             // We block on the future here so we don't have to worry about possibly accessing our
357             // state on a different thread outside of our dispatcher. Also, the data store
358             // currently uses a same thread executor anyway.
359             cohortEntry.getCohort().commit().get();
360
361             sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
362
363             shardMBean.incrementCommittedTransactionCount();
364             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
365
366         } catch (InterruptedException | ExecutionException e) {
367             sender.tell(new akka.actor.Status.Failure(e), getSelf());
368
369             LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
370             shardMBean.incrementFailedTransactionsCount();
371         }
372
373         commitCoordinator.currentTransactionComplete(transactionID, true);
374     }
375
376     private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
377         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
378         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
379     }
380
381     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
382         LOG.debug("Readying transaction {}", ready.getTransactionID());
383
384         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
385         // commitCoordinator in preparation for the subsequent three phase commit initiated by
386         // the front-end.
387         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
388                 ready.getModification());
389
390         // Return our actor path as we'll handle the three phase commit.
391         ReadyTransactionReply readyTransactionReply =
392             new ReadyTransactionReply(Serialization.serializedActorPath(self()));
393         getSender().tell(
394             ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
395             getSelf());
396     }
397
398     private void handleAbortTransaction(AbortTransaction abort) {
399         doAbortTransaction(abort.getTransactionID(), getSender());
400     }
401
402     private void doAbortTransaction(String transactionID, final ActorRef sender) {
403         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
404         if(cohortEntry != null) {
405             LOG.debug("Aborting transaction {}", transactionID);
406
407             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
408             // aborted during replication in which case we may still commit locally if replication
409             // succeeds.
410             commitCoordinator.currentTransactionComplete(transactionID, false);
411
412             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
413             final ActorRef self = getSelf();
414
415             Futures.addCallback(future, new FutureCallback<Void>() {
416                 @Override
417                 public void onSuccess(Void v) {
418                     shardMBean.incrementAbortTransactionsCount();
419
420                     if(sender != null) {
421                         sender.tell(new AbortTransactionReply().toSerializable(), self);
422                     }
423                 }
424
425                 @Override
426                 public void onFailure(Throwable t) {
427                     LOG.error(t, "An exception happened during abort");
428
429                     if(sender != null) {
430                         sender.tell(new akka.actor.Status.Failure(t), self);
431                     }
432                 }
433             });
434         }
435     }
436
437     private void handleCreateTransaction(Object message) {
438         if (isLeader()) {
439             createTransaction(CreateTransaction.fromSerializable(message));
440         } else if (getLeader() != null) {
441             getLeader().forward(message, getContext());
442         } else {
443             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
444                 "Could not find shard leader so transaction cannot be created. This typically happens" +
445                 " when system is coming up or recovering and a leader is being elected. Try again" +
446                 " later.")), getSelf());
447         }
448     }
449
450     private void handleReadDataReply(Object message) {
451         // This must be for install snapshot. Don't want to open this up and trigger
452         // deSerialization
453
454         self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
455                 self());
456
457         createSnapshotTransaction = null;
458
459         // Send a PoisonPill instead of sending close transaction because we do not really need
460         // a response
461         getSender().tell(PoisonPill.getInstance(), self());
462     }
463
464     private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
465         DOMStoreTransactionChain chain =
466             transactionChains.remove(closeTransactionChain.getTransactionChainId());
467
468         if(chain != null) {
469             chain.close();
470         }
471     }
472
473     private ActorRef createTypedTransactionActor(
474         int transactionType,
475         ShardTransactionIdentifier transactionId,
476         String transactionChainId ) {
477
478         DOMStoreTransactionFactory factory = store;
479
480         if(!transactionChainId.isEmpty()) {
481             factory = transactionChains.get(transactionChainId);
482             if(factory == null){
483                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
484                 transactionChains.put(transactionChainId, transactionChain);
485                 factory = transactionChain;
486             }
487         }
488
489         if(this.schemaContext == null){
490             throw new NullPointerException("schemaContext should not be null");
491         }
492
493         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
494
495             shardMBean.incrementReadOnlyTransactionCount();
496
497             return getContext().actorOf(
498                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
499                         schemaContext,datastoreContext, shardMBean,
500                         transactionId.getRemoteTransactionId()), transactionId.toString());
501
502         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
503
504             shardMBean.incrementReadWriteTransactionCount();
505
506             return getContext().actorOf(
507                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
508                         schemaContext, datastoreContext, shardMBean,
509                         transactionId.getRemoteTransactionId()), transactionId.toString());
510
511
512         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
513
514             shardMBean.incrementWriteOnlyTransactionCount();
515
516             return getContext().actorOf(
517                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
518                         schemaContext, datastoreContext, shardMBean,
519                         transactionId.getRemoteTransactionId()), transactionId.toString());
520         } else {
521             throw new IllegalArgumentException(
522                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
523                     + transactionType);
524         }
525     }
526
527     private void createTransaction(CreateTransaction createTransaction) {
528         createTransaction(createTransaction.getTransactionType(),
529             createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
530     }
531
532     private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
533
534         ShardTransactionIdentifier transactionId =
535             ShardTransactionIdentifier.builder()
536                 .remoteTransactionId(remoteTransactionId)
537                 .build();
538         if(LOG.isDebugEnabled()) {
539             LOG.debug("Creating transaction : {} ", transactionId);
540         }
541         ActorRef transactionActor =
542             createTypedTransactionActor(transactionType, transactionId, transactionChainId);
543
544         getSender()
545             .tell(new CreateTransactionReply(
546                     Serialization.serializedActorPath(transactionActor),
547                     remoteTransactionId).toSerializable(),
548                 getSelf());
549
550         return transactionActor;
551     }
552
553     private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
554         throws ExecutionException, InterruptedException {
555         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
556         commitCohort.preCommit().get();
557         commitCohort.commit().get();
558     }
559
560     private void commitWithNewTransaction(Modification modification) {
561         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
562         modification.apply(tx);
563         try {
564             syncCommitTransaction(tx);
565             shardMBean.incrementCommittedTransactionCount();
566             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
567         } catch (InterruptedException | ExecutionException e) {
568             shardMBean.incrementFailedTransactionsCount();
569             LOG.error(e, "Failed to commit");
570         }
571     }
572
573     private void updateSchemaContext(UpdateSchemaContext message) {
574         this.schemaContext = message.getSchemaContext();
575         updateSchemaContext(message.getSchemaContext());
576         store.onGlobalContextUpdated(message.getSchemaContext());
577     }
578
579     @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
580         store.onGlobalContextUpdated(schemaContext);
581     }
582
583     private void registerChangeListener(
584         RegisterChangeListener registerChangeListener) {
585
586         if(LOG.isDebugEnabled()) {
587             LOG.debug("registerDataChangeListener for {}", registerChangeListener
588                 .getPath());
589         }
590
591
592         ActorSelection dataChangeListenerPath = getContext()
593             .system().actorSelection(
594                 registerChangeListener.getDataChangeListenerPath());
595
596
597         // Notify the listener if notifications should be enabled or not
598         // If this shard is the leader then it will enable notifications else
599         // it will not
600         dataChangeListenerPath
601             .tell(new EnableNotification(isLeader()), getSelf());
602
603         // Now store a reference to the data change listener so it can be notified
604         // at a later point if notifications should be enabled or disabled
605         dataChangeListeners.add(dataChangeListenerPath);
606
607         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
608             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
609
610         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
611             registration = store.registerChangeListener(registerChangeListener.getPath(),
612                 listener, registerChangeListener.getScope());
613         ActorRef listenerRegistration =
614             getContext().actorOf(
615                 DataChangeListenerRegistration.props(registration));
616
617         if(LOG.isDebugEnabled()) {
618             LOG.debug(
619                 "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
620                 , listenerRegistration.path().toString());
621         }
622
623         getSender()
624             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
625                 getSelf());
626     }
627
628     private boolean isMetricsCaptureEnabled(){
629         CommonConfig config = new CommonConfig(getContext().system().settings().config());
630         return config.isMetricCaptureEnabled();
631     }
632
633     @Override
634     protected
635     void startLogRecoveryBatch(int maxBatchSize) {
636         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
637
638         if(LOG.isDebugEnabled()) {
639             LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
640         }
641     }
642
643     @Override
644     protected void appendRecoveredLogEntry(Payload data) {
645         if (data instanceof CompositeModificationPayload) {
646             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
647         } else {
648             LOG.error("Unknown state received {} during recovery", data);
649         }
650     }
651
652     @Override
653     protected void applyRecoverySnapshot(ByteString snapshot) {
654         if(recoveryCoordinator == null) {
655             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
656         }
657
658         recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
659
660         if(LOG.isDebugEnabled()) {
661             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
662         }
663     }
664
665     @Override
666     protected void applyCurrentLogRecoveryBatch() {
667         if(recoveryCoordinator == null) {
668             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
669         }
670
671         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
672
673         if(LOG.isDebugEnabled()) {
674             LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
675                     currentLogRecoveryBatch.size());
676         }
677     }
678
679     @Override
680     protected void onRecoveryComplete() {
681         if(recoveryCoordinator != null) {
682             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
683
684             if(LOG.isDebugEnabled()) {
685                 LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
686             }
687
688             for(DOMStoreWriteTransaction tx: txList) {
689                 try {
690                     syncCommitTransaction(tx);
691                     shardMBean.incrementCommittedTransactionCount();
692                 } catch (InterruptedException | ExecutionException e) {
693                     shardMBean.incrementFailedTransactionsCount();
694                     LOG.error(e, "Failed to commit");
695                 }
696             }
697         }
698
699         recoveryCoordinator = null;
700         currentLogRecoveryBatch = null;
701         updateJournalStats();
702
703         //notify shard manager
704         getContext().parent().tell(new ActorInitialized(), getSelf());
705
706         // Being paranoid here - this method should only be called once but just in case...
707         if(txCommitTimeoutCheckSchedule == null) {
708             // Schedule a message to be periodically sent to check if the current in-progress
709             // transaction should be expired and aborted.
710             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
711             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
712                     period, period, getSelf(),
713                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
714         }
715     }
716
717     @Override
718     protected void applyState(ActorRef clientActor, String identifier, Object data) {
719
720         if (data instanceof CompositeModificationPayload) {
721             Object modification = ((CompositeModificationPayload) data).getModification();
722
723             if(modification == null) {
724                 LOG.error(
725                      "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
726                      identifier, clientActor != null ? clientActor.path().toString() : null);
727             } else if(clientActor == null) {
728                 // There's no clientActor to which to send a commit reply so we must be applying
729                 // replicated state from the leader.
730                 commitWithNewTransaction(MutableCompositeModification.fromSerializable(
731                         modification, schemaContext));
732             } else {
733                 // This must be the OK to commit after replication consensus.
734                 finishCommit(clientActor, identifier);
735             }
736         } else {
737             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
738                     data, data.getClass().getClassLoader(),
739                     CompositeModificationPayload.class.getClassLoader());
740         }
741
742         updateJournalStats();
743
744     }
745
746     private void updateJournalStats() {
747         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
748
749         if (lastLogEntry != null) {
750             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
751             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
752         }
753
754         shardMBean.setCommitIndex(getCommitIndex());
755         shardMBean.setLastApplied(getLastApplied());
756     }
757
758     @Override
759     protected void createSnapshot() {
760         if (createSnapshotTransaction == null) {
761
762             // Create a transaction. We are really going to treat the transaction as a worker
763             // so that this actor does not get block building the snapshot
764             createSnapshotTransaction = createTransaction(
765                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
766                 "createSnapshot" + ++createSnapshotTransactionCounter, "");
767
768             createSnapshotTransaction.tell(
769                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
770
771         }
772     }
773
774     @VisibleForTesting
775     @Override
776     protected void applySnapshot(ByteString snapshot) {
777         // Since this will be done only on Recovery or when this actor is a Follower
778         // we can safely commit everything in here. We not need to worry about event notifications
779         // as they would have already been disabled on the follower
780
781         LOG.info("Applying snapshot");
782         try {
783             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
784             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
785             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
786                 .decode(serializedNode);
787
788             // delete everything first
789             transaction.delete(YangInstanceIdentifier.builder().build());
790
791             // Add everything from the remote node back
792             transaction.write(YangInstanceIdentifier.builder().build(), node);
793             syncCommitTransaction(transaction);
794         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
795             LOG.error(e, "An exception occurred when applying snapshot");
796         } finally {
797             LOG.info("Done applying snapshot");
798         }
799     }
800
801     @Override protected void onStateChanged() {
802         for (ActorSelection dataChangeListener : dataChangeListeners) {
803             dataChangeListener
804                 .tell(new EnableNotification(isLeader()), getSelf());
805         }
806
807         shardMBean.setRaftState(getRaftState().name());
808         shardMBean.setCurrentTerm(getCurrentTerm());
809
810         // If this actor is no longer the leader close all the transaction chains
811         if(!isLeader()){
812             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
813                 if(LOG.isDebugEnabled()) {
814                     LOG.debug(
815                         "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
816                         entry.getKey(), getId());
817                 }
818                 entry.getValue().close();
819             }
820
821             transactionChains.clear();
822         }
823     }
824
825     @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
826         shardMBean.setLeader(newLeader);
827     }
828
829     @Override public String persistenceId() {
830         return this.name.toString();
831     }
832
833     private static class ShardCreator implements Creator<Shard> {
834
835         private static final long serialVersionUID = 1L;
836
837         final ShardIdentifier name;
838         final Map<ShardIdentifier, String> peerAddresses;
839         final DatastoreContext datastoreContext;
840         final SchemaContext schemaContext;
841
842         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
843                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
844             this.name = name;
845             this.peerAddresses = peerAddresses;
846             this.datastoreContext = datastoreContext;
847             this.schemaContext = schemaContext;
848         }
849
850         @Override
851         public Shard create() throws Exception {
852             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
853         }
854     }
855
856     @VisibleForTesting
857     InMemoryDOMDataStore getDataStore() {
858         return store;
859     }
860
861     @VisibleForTesting
862     ShardStats getShardMBean() {
863         return shardMBean;
864     }
865 }