770cdec39ceb430b8b1be1cbd1b4ef63ba0abbd7
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import akka.japi.Creator;
19 import akka.persistence.RecoveryFailure;
20 import akka.serialization.Serialization;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.collect.Lists;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.protobuf.ByteString;
29 import com.google.protobuf.InvalidProtocolBufferException;
30 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
31 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
32 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
33 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
34 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
37 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
41 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
48 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
50 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
51 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
54 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.modification.Modification;
57 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
58 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
59 import org.opendaylight.controller.cluster.raft.RaftActor;
60 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
61 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
63 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
64 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
65 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
66 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
67 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
68 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
72 import org.opendaylight.yangtools.concepts.ListenerRegistration;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
74 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
75 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76 import scala.concurrent.duration.Duration;
77 import scala.concurrent.duration.FiniteDuration;
78
79 import javax.annotation.Nonnull;
80 import java.util.Collection;
81 import java.util.HashMap;
82 import java.util.List;
83 import java.util.Map;
84 import java.util.concurrent.ExecutionException;
85 import java.util.concurrent.TimeUnit;
86
87 /**
88  * A Shard represents a portion of the logical data tree <br/>
89  * <p>
90  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
91  * </p>
92  */
93 public class Shard extends RaftActor {
94
95     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
96
97     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
98
99     public static final String DEFAULT_NAME = "default";
100
101     // The state of this Shard
102     private final InMemoryDOMDataStore store;
103
104     private final LoggingAdapter LOG =
105         Logging.getLogger(getContext().system(), this);
106
107     // By default persistent will be true and can be turned off using the system
108     // property shard.persistent
109     private final boolean persistent;
110
111     /// The name of this shard
112     private final ShardIdentifier name;
113
114     private final ShardStats shardMBean;
115
116     private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
117
118     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
119                                                                        Lists.newArrayList();
120
121     private final DatastoreContext datastoreContext;
122
123     private SchemaContext schemaContext;
124
125     private ActorRef createSnapshotTransaction;
126
127     private int createSnapshotTransactionCounter;
128
129     private final ShardCommitCoordinator commitCoordinator;
130
131     private final long transactionCommitTimeout;
132
133     private Cancellable txCommitTimeoutCheckSchedule;
134
135     /**
136      * Coordinates persistence recovery on startup.
137      */
138     private ShardRecoveryCoordinator recoveryCoordinator;
139     private List<Object> currentLogRecoveryBatch;
140
141     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
142
143     protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
144             DatastoreContext datastoreContext, SchemaContext schemaContext) {
145         super(name.toString(), mapPeerAddresses(peerAddresses),
146                 Optional.of(datastoreContext.getShardRaftConfig()));
147
148         this.name = name;
149         this.datastoreContext = datastoreContext;
150         this.schemaContext = schemaContext;
151
152         String setting = System.getProperty("shard.persistent");
153
154         this.persistent = !"false".equals(setting);
155
156         LOG.info("Shard created : {} persistent : {}", name, persistent);
157
158         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
159                 datastoreContext.getDataStoreProperties());
160
161         if(schemaContext != null) {
162             store.onGlobalContextUpdated(schemaContext);
163         }
164
165         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
166                 datastoreContext.getDataStoreMXBeanType());
167         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
168         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
169
170         if (isMetricsCaptureEnabled()) {
171             getContext().become(new MeteringBehavior(this));
172         }
173
174         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
175                 datastoreContext.getShardTransactionCommitQueueCapacity());
176
177         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
178                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
179     }
180
181     private static Map<String, String> mapPeerAddresses(
182         Map<ShardIdentifier, String> peerAddresses) {
183         Map<String, String> map = new HashMap<>();
184
185         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
186             .entrySet()) {
187             map.put(entry.getKey().toString(), entry.getValue());
188         }
189
190         return map;
191     }
192
193     public static Props props(final ShardIdentifier name,
194         final Map<ShardIdentifier, String> peerAddresses,
195         DatastoreContext datastoreContext, SchemaContext schemaContext) {
196         Preconditions.checkNotNull(name, "name should not be null");
197         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
198         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
199         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
200
201         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
202     }
203
204     @Override
205     public void postStop() {
206         super.postStop();
207
208         if(txCommitTimeoutCheckSchedule != null) {
209             txCommitTimeoutCheckSchedule.cancel();
210         }
211     }
212
213     @Override
214     public void onReceiveRecover(Object message) {
215         if(LOG.isDebugEnabled()) {
216             LOG.debug("onReceiveRecover: Received message {} from {}",
217                 message.getClass().toString(),
218                 getSender());
219         }
220
221         if (message instanceof RecoveryFailure){
222             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
223
224             // Even though recovery failed, we still need to finish our recovery, eg send the
225             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
226             onRecoveryComplete();
227         } else {
228             super.onReceiveRecover(message);
229         }
230     }
231
232     @Override
233     public void onReceiveCommand(Object message) {
234         if(LOG.isDebugEnabled()) {
235             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
236         }
237
238         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
239             handleReadDataReply(message);
240         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
241             handleCreateTransaction(message);
242         } else if(message instanceof ForwardedReadyTransaction) {
243             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
244         } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
245             handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
246         } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
247             handleCommitTransaction(CommitTransaction.fromSerializable(message));
248         } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
249             handleAbortTransaction(AbortTransaction.fromSerializable(message));
250         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
251             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
252         } else if (message instanceof RegisterChangeListener) {
253             registerChangeListener((RegisterChangeListener) message);
254         } else if (message instanceof UpdateSchemaContext) {
255             updateSchemaContext((UpdateSchemaContext) message);
256         } else if (message instanceof PeerAddressResolved) {
257             PeerAddressResolved resolved = (PeerAddressResolved) message;
258             setPeerAddress(resolved.getPeerId().toString(),
259                 resolved.getPeerAddress());
260         } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
261             handleTransactionCommitTimeoutCheck();
262         } else {
263             super.onReceiveCommand(message);
264         }
265     }
266
267     private void handleTransactionCommitTimeoutCheck() {
268         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
269         if(cohortEntry != null) {
270             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
271             if(elapsed > transactionCommitTimeout) {
272                 LOG.warning("Current transaction {} has timed out after {} ms - aborting",
273                         cohortEntry.getTransactionID(), transactionCommitTimeout);
274
275                 doAbortTransaction(cohortEntry.getTransactionID(), null);
276             }
277         }
278     }
279
280     private void handleCommitTransaction(CommitTransaction commit) {
281         final String transactionID = commit.getTransactionID();
282
283         LOG.debug("Committing transaction {}", transactionID);
284
285         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
286         // this transaction.
287         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
288         if(cohortEntry == null) {
289             // We're not the current Tx - the Tx was likely expired b/c it took too long in
290             // between the canCommit and commit messages.
291             IllegalStateException ex = new IllegalStateException(
292                     String.format("Cannot commit transaction %s - it is not the current transaction",
293                             transactionID));
294             LOG.error(ex.getMessage());
295             shardMBean.incrementFailedTransactionsCount();
296             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
297             return;
298         }
299
300         // We perform the preCommit phase here atomically with the commit phase. This is an
301         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
302         // coordination of preCommit across shards in case of failure but preCommit should not
303         // normally fail since we ensure only one concurrent 3-phase commit.
304
305         try {
306             // We block on the future here so we don't have to worry about possibly accessing our
307             // state on a different thread outside of our dispatcher. Also, the data store
308             // currently uses a same thread executor anyway.
309             cohortEntry.getCohort().preCommit().get();
310
311             if(persistent) {
312                 Shard.this.persistData(getSender(), transactionID,
313                         new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
314             } else {
315                 Shard.this.finishCommit(getSender(), transactionID);
316             }
317         } catch (InterruptedException | ExecutionException e) {
318             LOG.error(e, "An exception occurred while preCommitting transaction {}",
319                     cohortEntry.getTransactionID());
320             shardMBean.incrementFailedTransactionsCount();
321             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
322         }
323
324         cohortEntry.updateLastAccessTime();
325     }
326
327     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
328         // With persistence enabled, this method is called via applyState by the leader strategy
329         // after the commit has been replicated to a majority of the followers.
330
331         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
332         if(cohortEntry == null) {
333             // The transaction is no longer the current commit. This can happen if the transaction
334             // was aborted prior, most likely due to timeout in the front-end. We need to finish
335             // committing the transaction though since it was successfully persisted and replicated
336             // however we can't use the original cohort b/c it was already preCommitted and may
337             // conflict with the current commit or may have been aborted so we commit with a new
338             // transaction.
339             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
340             if(cohortEntry != null) {
341                 commitWithNewTransaction(cohortEntry.getModification());
342                 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
343             } else {
344                 // This really shouldn't happen - it likely means that persistence or replication
345                 // took so long to complete such that the cohort entry was expired from the cache.
346                 IllegalStateException ex = new IllegalStateException(
347                         String.format("Could not finish committing transaction %s - no CohortEntry found",
348                                 transactionID));
349                 LOG.error(ex.getMessage());
350                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
351             }
352
353             return;
354         }
355
356         LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
357
358         try {
359             // We block on the future here so we don't have to worry about possibly accessing our
360             // state on a different thread outside of our dispatcher. Also, the data store
361             // currently uses a same thread executor anyway.
362             cohortEntry.getCohort().commit().get();
363
364             sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
365
366             shardMBean.incrementCommittedTransactionCount();
367             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
368
369         } catch (InterruptedException | ExecutionException e) {
370             sender.tell(new akka.actor.Status.Failure(e), getSelf());
371
372             LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
373             shardMBean.incrementFailedTransactionsCount();
374         }
375
376         commitCoordinator.currentTransactionComplete(transactionID, true);
377     }
378
379     private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
380         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
381         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
382     }
383
384     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
385         LOG.debug("Readying transaction {}", ready.getTransactionID());
386
387         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
388         // commitCoordinator in preparation for the subsequent three phase commit initiated by
389         // the front-end.
390         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
391                 ready.getModification());
392
393         // Return our actor path as we'll handle the three phase commit.
394         ReadyTransactionReply readyTransactionReply =
395             new ReadyTransactionReply(Serialization.serializedActorPath(self()));
396         getSender().tell(
397             ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
398             getSelf());
399     }
400
401     private void handleAbortTransaction(AbortTransaction abort) {
402         doAbortTransaction(abort.getTransactionID(), getSender());
403     }
404
405     private void doAbortTransaction(String transactionID, final ActorRef sender) {
406         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
407         if(cohortEntry != null) {
408             LOG.debug("Aborting transaction {}", transactionID);
409
410             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
411             // aborted during replication in which case we may still commit locally if replication
412             // succeeds.
413             commitCoordinator.currentTransactionComplete(transactionID, false);
414
415             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
416             final ActorRef self = getSelf();
417
418             Futures.addCallback(future, new FutureCallback<Void>() {
419                 @Override
420                 public void onSuccess(Void v) {
421                     shardMBean.incrementAbortTransactionsCount();
422
423                     if(sender != null) {
424                         sender.tell(new AbortTransactionReply().toSerializable(), self);
425                     }
426                 }
427
428                 @Override
429                 public void onFailure(Throwable t) {
430                     LOG.error(t, "An exception happened during abort");
431
432                     if(sender != null) {
433                         sender.tell(new akka.actor.Status.Failure(t), self);
434                     }
435                 }
436             });
437         }
438     }
439
440     private void handleCreateTransaction(Object message) {
441         if (isLeader()) {
442             createTransaction(CreateTransaction.fromSerializable(message));
443         } else if (getLeader() != null) {
444             getLeader().forward(message, getContext());
445         } else {
446             getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
447                 "Could not find shard leader so transaction cannot be created. This typically happens" +
448                 " when the system is coming up or recovering and a leader is being elected. Try again" +
449                 " later.")), getSelf());
450         }
451     }
452
453     private void handleReadDataReply(Object message) {
454         // This must be for install snapshot. Don't want to open this up and trigger
455         // deSerialization
456
457         self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
458                 self());
459
460         createSnapshotTransaction = null;
461
462         // Send a PoisonPill instead of sending close transaction because we do not really need
463         // a response
464         getSender().tell(PoisonPill.getInstance(), self());
465     }
466
467     private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
468         DOMStoreTransactionChain chain =
469             transactionChains.remove(closeTransactionChain.getTransactionChainId());
470
471         if(chain != null) {
472             chain.close();
473         }
474     }
475
476     private ActorRef createTypedTransactionActor(
477         int transactionType,
478         ShardTransactionIdentifier transactionId,
479         String transactionChainId ) {
480
481         DOMStoreTransactionFactory factory = store;
482
483         if(!transactionChainId.isEmpty()) {
484             factory = transactionChains.get(transactionChainId);
485             if(factory == null){
486                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
487                 transactionChains.put(transactionChainId, transactionChain);
488                 factory = transactionChain;
489             }
490         }
491
492         if(this.schemaContext == null){
493             throw new NullPointerException("schemaContext should not be null");
494         }
495
496         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
497
498             shardMBean.incrementReadOnlyTransactionCount();
499
500             return getContext().actorOf(
501                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
502                         schemaContext,datastoreContext, shardMBean,
503                         transactionId.getRemoteTransactionId()), transactionId.toString());
504
505         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
506
507             shardMBean.incrementReadWriteTransactionCount();
508
509             return getContext().actorOf(
510                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
511                         schemaContext, datastoreContext, shardMBean,
512                         transactionId.getRemoteTransactionId()), transactionId.toString());
513
514
515         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
516
517             shardMBean.incrementWriteOnlyTransactionCount();
518
519             return getContext().actorOf(
520                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
521                         schemaContext, datastoreContext, shardMBean,
522                         transactionId.getRemoteTransactionId()), transactionId.toString());
523         } else {
524             throw new IllegalArgumentException(
525                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
526                     + transactionType);
527         }
528     }
529
530     private void createTransaction(CreateTransaction createTransaction) {
531         createTransaction(createTransaction.getTransactionType(),
532             createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
533     }
534
535     private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
536
537         ShardTransactionIdentifier transactionId =
538             ShardTransactionIdentifier.builder()
539                 .remoteTransactionId(remoteTransactionId)
540                 .build();
541         if(LOG.isDebugEnabled()) {
542             LOG.debug("Creating transaction : {} ", transactionId);
543         }
544         ActorRef transactionActor =
545             createTypedTransactionActor(transactionType, transactionId, transactionChainId);
546
547         getSender()
548             .tell(new CreateTransactionReply(
549                     Serialization.serializedActorPath(transactionActor),
550                     remoteTransactionId).toSerializable(),
551                 getSelf());
552
553         return transactionActor;
554     }
555
556     private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
557         throws ExecutionException, InterruptedException {
558         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
559         commitCohort.preCommit().get();
560         commitCohort.commit().get();
561     }
562
563     private void commitWithNewTransaction(Modification modification) {
564         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
565         modification.apply(tx);
566         try {
567             syncCommitTransaction(tx);
568             shardMBean.incrementCommittedTransactionCount();
569             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
570         } catch (InterruptedException | ExecutionException e) {
571             shardMBean.incrementFailedTransactionsCount();
572             LOG.error(e, "Failed to commit");
573         }
574     }
575
576     private void updateSchemaContext(UpdateSchemaContext message) {
577         this.schemaContext = message.getSchemaContext();
578         updateSchemaContext(message.getSchemaContext());
579         store.onGlobalContextUpdated(message.getSchemaContext());
580     }
581
582     @VisibleForTesting
583     void updateSchemaContext(SchemaContext schemaContext) {
584         store.onGlobalContextUpdated(schemaContext);
585     }
586
587     private void registerChangeListener(RegisterChangeListener registerChangeListener) {
588
589         LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
590
591         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
592                                                      NormalizedNode<?, ?>>> registration;
593         if(isLeader()) {
594             registration = doChangeListenerRegistration(registerChangeListener);
595         } else {
596             LOG.debug("Shard is not the leader - delaying registration");
597
598             DelayedListenerRegistration delayedReg =
599                     new DelayedListenerRegistration(registerChangeListener);
600             delayedListenerRegistrations.add(delayedReg);
601             registration = delayedReg;
602         }
603
604         ActorRef listenerRegistration = getContext().actorOf(
605                 DataChangeListenerRegistration.props(registration));
606
607         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
608                     listenerRegistration.path());
609
610         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
611     }
612
613     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
614                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
615             RegisterChangeListener registerChangeListener) {
616
617         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
618                 registerChangeListener.getDataChangeListenerPath());
619
620         // Notify the listener if notifications should be enabled or not
621         // If this shard is the leader then it will enable notifications else
622         // it will not
623         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
624
625         // Now store a reference to the data change listener so it can be notified
626         // at a later point if notifications should be enabled or disabled
627         dataChangeListeners.add(dataChangeListenerPath);
628
629         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
630                 new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
631
632         LOG.debug("Registering for path {}", registerChangeListener.getPath());
633
634         return store.registerChangeListener(registerChangeListener.getPath(), listener,
635                 registerChangeListener.getScope());
636     }
637
638     private boolean isMetricsCaptureEnabled(){
639         CommonConfig config = new CommonConfig(getContext().system().settings().config());
640         return config.isMetricCaptureEnabled();
641     }
642
643     @Override
644     protected
645     void startLogRecoveryBatch(int maxBatchSize) {
646         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
647
648         if(LOG.isDebugEnabled()) {
649             LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
650         }
651     }
652
653     @Override
654     protected void appendRecoveredLogEntry(Payload data) {
655         if (data instanceof CompositeModificationPayload) {
656             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
657         } else {
658             LOG.error("Unknown state received {} during recovery", data);
659         }
660     }
661
662     @Override
663     protected void applyRecoverySnapshot(ByteString snapshot) {
664         if(recoveryCoordinator == null) {
665             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
666         }
667
668         recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
669
670         if(LOG.isDebugEnabled()) {
671             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
672         }
673     }
674
675     @Override
676     protected void applyCurrentLogRecoveryBatch() {
677         if(recoveryCoordinator == null) {
678             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
679         }
680
681         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
682
683         if(LOG.isDebugEnabled()) {
684             LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
685                     currentLogRecoveryBatch.size());
686         }
687     }
688
689     @Override
690     protected void onRecoveryComplete() {
691         if(recoveryCoordinator != null) {
692             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
693
694             if(LOG.isDebugEnabled()) {
695                 LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
696             }
697
698             for(DOMStoreWriteTransaction tx: txList) {
699                 try {
700                     syncCommitTransaction(tx);
701                     shardMBean.incrementCommittedTransactionCount();
702                 } catch (InterruptedException | ExecutionException e) {
703                     shardMBean.incrementFailedTransactionsCount();
704                     LOG.error(e, "Failed to commit");
705                 }
706             }
707         }
708
709         recoveryCoordinator = null;
710         currentLogRecoveryBatch = null;
711         updateJournalStats();
712
713         //notify shard manager
714         getContext().parent().tell(new ActorInitialized(), getSelf());
715
716         // Being paranoid here - this method should only be called once but just in case...
717         if(txCommitTimeoutCheckSchedule == null) {
718             // Schedule a message to be periodically sent to check if the current in-progress
719             // transaction should be expired and aborted.
720             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
721             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
722                     period, period, getSelf(),
723                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
724         }
725     }
726
727     @Override
728     protected void applyState(ActorRef clientActor, String identifier, Object data) {
729
730         if (data instanceof CompositeModificationPayload) {
731             Object modification = ((CompositeModificationPayload) data).getModification();
732
733             if(modification == null) {
734                 LOG.error(
735                      "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
736                      identifier, clientActor != null ? clientActor.path().toString() : null);
737             } else if(clientActor == null) {
738                 // There's no clientActor to which to send a commit reply so we must be applying
739                 // replicated state from the leader.
740                 commitWithNewTransaction(MutableCompositeModification.fromSerializable(
741                         modification, schemaContext));
742             } else {
743                 // This must be the OK to commit after replication consensus.
744                 finishCommit(clientActor, identifier);
745             }
746         } else {
747             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
748                     data, data.getClass().getClassLoader(),
749                     CompositeModificationPayload.class.getClassLoader());
750         }
751
752         updateJournalStats();
753
754     }
755
756     private void updateJournalStats() {
757         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
758
759         if (lastLogEntry != null) {
760             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
761             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
762         }
763
764         shardMBean.setCommitIndex(getCommitIndex());
765         shardMBean.setLastApplied(getLastApplied());
766     }
767
768     @Override
769     protected void createSnapshot() {
770         if (createSnapshotTransaction == null) {
771
772             // Create a transaction. We are really going to treat the transaction as a worker
773             // so that this actor does not get block building the snapshot
774             createSnapshotTransaction = createTransaction(
775                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
776                 "createSnapshot" + ++createSnapshotTransactionCounter, "");
777
778             createSnapshotTransaction.tell(
779                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
780
781         }
782     }
783
784     @VisibleForTesting
785     @Override
786     protected void applySnapshot(ByteString snapshot) {
787         // Since this will be done only on Recovery or when this actor is a Follower
788         // we can safely commit everything in here. We not need to worry about event notifications
789         // as they would have already been disabled on the follower
790
791         LOG.info("Applying snapshot");
792         try {
793             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
794             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
795             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
796                 .decode(serializedNode);
797
798             // delete everything first
799             transaction.delete(YangInstanceIdentifier.builder().build());
800
801             // Add everything from the remote node back
802             transaction.write(YangInstanceIdentifier.builder().build(), node);
803             syncCommitTransaction(transaction);
804         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
805             LOG.error(e, "An exception occurred when applying snapshot");
806         } finally {
807             LOG.info("Done applying snapshot");
808         }
809     }
810
811     @Override
812     protected void onStateChanged() {
813         boolean isLeader = isLeader();
814         for (ActorSelection dataChangeListener : dataChangeListeners) {
815             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
816         }
817
818         if(isLeader) {
819             for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
820                 if(!reg.isClosed()) {
821                     reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
822                 }
823             }
824
825             delayedListenerRegistrations.clear();
826         }
827
828         shardMBean.setRaftState(getRaftState().name());
829         shardMBean.setCurrentTerm(getCurrentTerm());
830
831         // If this actor is no longer the leader close all the transaction chains
832         if(!isLeader){
833             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
834                 if(LOG.isDebugEnabled()) {
835                     LOG.debug(
836                         "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
837                         entry.getKey(), getId());
838                 }
839                 entry.getValue().close();
840             }
841
842             transactionChains.clear();
843         }
844     }
845
846     @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
847         shardMBean.setLeader(newLeader);
848     }
849
850     @Override public String persistenceId() {
851         return this.name.toString();
852     }
853
854     private static class ShardCreator implements Creator<Shard> {
855
856         private static final long serialVersionUID = 1L;
857
858         final ShardIdentifier name;
859         final Map<ShardIdentifier, String> peerAddresses;
860         final DatastoreContext datastoreContext;
861         final SchemaContext schemaContext;
862
863         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
864                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
865             this.name = name;
866             this.peerAddresses = peerAddresses;
867             this.datastoreContext = datastoreContext;
868             this.schemaContext = schemaContext;
869         }
870
871         @Override
872         public Shard create() throws Exception {
873             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
874         }
875     }
876
877     @VisibleForTesting
878     InMemoryDOMDataStore getDataStore() {
879         return store;
880     }
881
882     @VisibleForTesting
883     ShardStats getShardMBean() {
884         return shardMBean;
885     }
886
887     private static class DelayedListenerRegistration implements
888         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
889
890         private volatile boolean closed;
891
892         private final RegisterChangeListener registerChangeListener;
893
894         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
895                                                              NormalizedNode<?, ?>>> delegate;
896
897         DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
898             this.registerChangeListener = registerChangeListener;
899         }
900
901         void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
902                                             NormalizedNode<?, ?>>> registration) {
903             this.delegate = registration;
904         }
905
906         boolean isClosed() {
907             return closed;
908         }
909
910         RegisterChangeListener getRegisterChangeListener() {
911             return registerChangeListener;
912         }
913
914         @Override
915         public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
916             return delegate != null ? delegate.getInstance() : null;
917         }
918
919         @Override
920         public void close() {
921             closed = true;
922             if(delegate != null) {
923                 delegate.close();
924             }
925         }
926     }
927 }