Bug 2265: Use new NormalizedNode streaming in messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import akka.japi.Creator;
19 import akka.persistence.RecoveryFailure;
20 import akka.serialization.Serialization;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.collect.Lists;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.protobuf.ByteString;
29 import com.google.protobuf.InvalidProtocolBufferException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import javax.annotation.Nonnull;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
39 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
40 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
41 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
42 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
45 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
57 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
67 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
69 import org.opendaylight.controller.cluster.raft.RaftActor;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
72 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
74 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
75 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
76 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
77 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
78 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
79 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
81 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
82 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
83 import org.opendaylight.yangtools.concepts.ListenerRegistration;
84 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
85 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
86 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
87 import scala.concurrent.duration.Duration;
88 import scala.concurrent.duration.FiniteDuration;
89
90 /**
91  * A Shard represents a portion of the logical data tree <br/>
92  * <p>
93  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
94  * </p>
95  */
96 public class Shard extends RaftActor {
97
98     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
99
100     public static final String DEFAULT_NAME = "default";
101
102     // The state of this Shard
103     private final InMemoryDOMDataStore store;
104
105     private final LoggingAdapter LOG =
106         Logging.getLogger(getContext().system(), this);
107
108     /// The name of this shard
109     private final ShardIdentifier name;
110
111     private final ShardStats shardMBean;
112
113     private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
114
115     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
116                                                                        Lists.newArrayList();
117
118     private final DatastoreContext datastoreContext;
119
120     private final DataPersistenceProvider dataPersistenceProvider;
121
122     private SchemaContext schemaContext;
123
124     private ActorRef createSnapshotTransaction;
125
126     private int createSnapshotTransactionCounter;
127
128     private final ShardCommitCoordinator commitCoordinator;
129
130     private final long transactionCommitTimeout;
131
132     private Cancellable txCommitTimeoutCheckSchedule;
133
134     private final Optional<ActorRef> roleChangeNotifier;
135
136     /**
137      * Coordinates persistence recovery on startup.
138      */
139     private ShardRecoveryCoordinator recoveryCoordinator;
140     private List<Object> currentLogRecoveryBatch;
141
142     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
143
144     protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
145             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
146         super(name.toString(), mapPeerAddresses(peerAddresses),
147                 Optional.of(datastoreContext.getShardRaftConfig()));
148
149         this.name = name;
150         this.datastoreContext = datastoreContext;
151         this.schemaContext = schemaContext;
152         this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
153
154         LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
155
156         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
157                 datastoreContext.getDataStoreProperties());
158
159         if(schemaContext != null) {
160             store.onGlobalContextUpdated(schemaContext);
161         }
162
163         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
164                 datastoreContext.getDataStoreMXBeanType());
165         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
166
167         if (isMetricsCaptureEnabled()) {
168             getContext().become(new MeteringBehavior(this));
169         }
170
171         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
172                 datastoreContext.getShardTransactionCommitQueueCapacity());
173
174         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
175                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
176
177         // create a notifier actor for each cluster member
178         roleChangeNotifier = createRoleChangeNotifier(name.toString());
179     }
180
181     private static Map<String, String> mapPeerAddresses(
182         final Map<ShardIdentifier, String> peerAddresses) {
183         Map<String, String> map = new HashMap<>();
184
185         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
186             .entrySet()) {
187             map.put(entry.getKey().toString(), entry.getValue());
188         }
189
190         return map;
191     }
192
193     public static Props props(final ShardIdentifier name,
194         final Map<ShardIdentifier, String> peerAddresses,
195         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
196         Preconditions.checkNotNull(name, "name should not be null");
197         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
198         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
199         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
200
201         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
202     }
203
204     private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
205         ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
206             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
207         return Optional.<ActorRef>of(shardRoleChangeNotifier);
208     }
209
210     @Override
211     public void postStop() {
212         super.postStop();
213
214         if(txCommitTimeoutCheckSchedule != null) {
215             txCommitTimeoutCheckSchedule.cancel();
216         }
217     }
218
219     @Override
220     public void onReceiveRecover(final Object message) throws Exception {
221         if(LOG.isDebugEnabled()) {
222             LOG.debug("onReceiveRecover: Received message {} from {}",
223                 message.getClass().toString(),
224                 getSender());
225         }
226
227         if (message instanceof RecoveryFailure){
228             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
229
230             // Even though recovery failed, we still need to finish our recovery, eg send the
231             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
232             onRecoveryComplete();
233         } else {
234             super.onReceiveRecover(message);
235         }
236     }
237
238     @Override
239     public void onReceiveCommand(final Object message) throws Exception {
240         if(LOG.isDebugEnabled()) {
241             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
242         }
243
244         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
245             handleReadDataReply(message);
246         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
247             handleCreateTransaction(message);
248         } else if(message instanceof ForwardedReadyTransaction) {
249             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
250         } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
251             handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
252         } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
253             handleCommitTransaction(CommitTransaction.fromSerializable(message));
254         } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
255             handleAbortTransaction(AbortTransaction.fromSerializable(message));
256         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
257             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
258         } else if (message instanceof RegisterChangeListener) {
259             registerChangeListener((RegisterChangeListener) message);
260         } else if (message instanceof UpdateSchemaContext) {
261             updateSchemaContext((UpdateSchemaContext) message);
262         } else if (message instanceof PeerAddressResolved) {
263             PeerAddressResolved resolved = (PeerAddressResolved) message;
264             setPeerAddress(resolved.getPeerId().toString(),
265                 resolved.getPeerAddress());
266         } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
267             handleTransactionCommitTimeoutCheck();
268         } else {
269             super.onReceiveCommand(message);
270         }
271     }
272
273     @Override
274     protected Optional<ActorRef> getRoleChangeNotifier() {
275         return roleChangeNotifier;
276     }
277
278     private void handleTransactionCommitTimeoutCheck() {
279         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
280         if(cohortEntry != null) {
281             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
282             if(elapsed > transactionCommitTimeout) {
283                 LOG.warning("Current transaction {} has timed out after {} ms - aborting",
284                         cohortEntry.getTransactionID(), transactionCommitTimeout);
285
286                 doAbortTransaction(cohortEntry.getTransactionID(), null);
287             }
288         }
289     }
290
291     private void handleCommitTransaction(final CommitTransaction commit) {
292         final String transactionID = commit.getTransactionID();
293
294         LOG.debug("Committing transaction {}", transactionID);
295
296         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
297         // this transaction.
298         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
299         if(cohortEntry == null) {
300             // We're not the current Tx - the Tx was likely expired b/c it took too long in
301             // between the canCommit and commit messages.
302             IllegalStateException ex = new IllegalStateException(
303                     String.format("Cannot commit transaction %s - it is not the current transaction",
304                             transactionID));
305             LOG.error(ex.getMessage());
306             shardMBean.incrementFailedTransactionsCount();
307             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
308             return;
309         }
310
311         // We perform the preCommit phase here atomically with the commit phase. This is an
312         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
313         // coordination of preCommit across shards in case of failure but preCommit should not
314         // normally fail since we ensure only one concurrent 3-phase commit.
315
316         try {
317             // We block on the future here so we don't have to worry about possibly accessing our
318             // state on a different thread outside of our dispatcher. Also, the data store
319             // currently uses a same thread executor anyway.
320             cohortEntry.getCohort().preCommit().get();
321
322             // If we do not have any followers and we are not using persistence we can
323             // apply modification to the state immediately
324             if(!hasFollowers() && !persistence().isRecoveryApplicable()){
325                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
326             } else {
327                 Shard.this.persistData(getSender(), transactionID,
328                         new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
329             }
330         } catch (InterruptedException | ExecutionException e) {
331             LOG.error(e, "An exception occurred while preCommitting transaction {}",
332                     cohortEntry.getTransactionID());
333             shardMBean.incrementFailedTransactionsCount();
334             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
335         }
336
337         cohortEntry.updateLastAccessTime();
338     }
339
340     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
341         // With persistence enabled, this method is called via applyState by the leader strategy
342         // after the commit has been replicated to a majority of the followers.
343
344         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
345         if(cohortEntry == null) {
346             // The transaction is no longer the current commit. This can happen if the transaction
347             // was aborted prior, most likely due to timeout in the front-end. We need to finish
348             // committing the transaction though since it was successfully persisted and replicated
349             // however we can't use the original cohort b/c it was already preCommitted and may
350             // conflict with the current commit or may have been aborted so we commit with a new
351             // transaction.
352             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
353             if(cohortEntry != null) {
354                 commitWithNewTransaction(cohortEntry.getModification());
355                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
356             } else {
357                 // This really shouldn't happen - it likely means that persistence or replication
358                 // took so long to complete such that the cohort entry was expired from the cache.
359                 IllegalStateException ex = new IllegalStateException(
360                         String.format("Could not finish committing transaction %s - no CohortEntry found",
361                                 transactionID));
362                 LOG.error(ex.getMessage());
363                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
364             }
365
366             return;
367         }
368
369         LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
370
371         try {
372             // We block on the future here so we don't have to worry about possibly accessing our
373             // state on a different thread outside of our dispatcher. Also, the data store
374             // currently uses a same thread executor anyway.
375             cohortEntry.getCohort().commit().get();
376
377             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
378
379             shardMBean.incrementCommittedTransactionCount();
380             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
381
382         } catch (InterruptedException | ExecutionException e) {
383             sender.tell(new akka.actor.Status.Failure(e), getSelf());
384
385             LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
386             shardMBean.incrementFailedTransactionsCount();
387         }
388
389         commitCoordinator.currentTransactionComplete(transactionID, true);
390     }
391
392     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
393         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
394         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
395     }
396
397     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
398         LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
399                 ready.getTxnClientVersion());
400
401         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
402         // commitCoordinator in preparation for the subsequent three phase commit initiated by
403         // the front-end.
404         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
405                 ready.getModification());
406
407         // Return our actor path as we'll handle the three phase commit, except if the Tx client
408         // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
409         // node. In that case, the subsequent 3-phase commit messages won't contain the
410         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
411         // to provide the compatible behavior.
412         ActorRef replyActorPath = self();
413         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
414             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
415             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
416                     ready.getTransactionID()));
417         }
418
419         ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
420                 Serialization.serializedActorPath(replyActorPath));
421         getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
422                 readyTransactionReply, getSelf());
423     }
424
425     private void handleAbortTransaction(final AbortTransaction abort) {
426         doAbortTransaction(abort.getTransactionID(), getSender());
427     }
428
429     void doAbortTransaction(final String transactionID, final ActorRef sender) {
430         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
431         if(cohortEntry != null) {
432             LOG.debug("Aborting transaction {}", transactionID);
433
434             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
435             // aborted during replication in which case we may still commit locally if replication
436             // succeeds.
437             commitCoordinator.currentTransactionComplete(transactionID, false);
438
439             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
440             final ActorRef self = getSelf();
441
442             Futures.addCallback(future, new FutureCallback<Void>() {
443                 @Override
444                 public void onSuccess(final Void v) {
445                     shardMBean.incrementAbortTransactionsCount();
446
447                     if(sender != null) {
448                         sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
449                     }
450                 }
451
452                 @Override
453                 public void onFailure(final Throwable t) {
454                     LOG.error(t, "An exception happened during abort");
455
456                     if(sender != null) {
457                         sender.tell(new akka.actor.Status.Failure(t), self);
458                     }
459                 }
460             });
461         }
462     }
463
464     private void handleCreateTransaction(final Object message) {
465         if (isLeader()) {
466             createTransaction(CreateTransaction.fromSerializable(message));
467         } else if (getLeader() != null) {
468             getLeader().forward(message, getContext());
469         } else {
470             getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
471                 "Could not find shard leader so transaction cannot be created. This typically happens" +
472                 " when the system is coming up or recovering and a leader is being elected. Try again" +
473                 " later.")), getSelf());
474         }
475     }
476
477     private void handleReadDataReply(final Object message) {
478         // This must be for install snapshot. Don't want to open this up and trigger
479         // deSerialization
480
481         self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
482                 self());
483
484         createSnapshotTransaction = null;
485
486         // Send a PoisonPill instead of sending close transaction because we do not really need
487         // a response
488         getSender().tell(PoisonPill.getInstance(), self());
489     }
490
491     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
492         DOMStoreTransactionChain chain =
493             transactionChains.remove(closeTransactionChain.getTransactionChainId());
494
495         if(chain != null) {
496             chain.close();
497         }
498     }
499
500     private ActorRef createTypedTransactionActor(int transactionType,
501             ShardTransactionIdentifier transactionId, String transactionChainId,
502             short clientVersion ) {
503
504         DOMStoreTransactionFactory factory = store;
505
506         if(!transactionChainId.isEmpty()) {
507             factory = transactionChains.get(transactionChainId);
508             if(factory == null){
509                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
510                 transactionChains.put(transactionChainId, transactionChain);
511                 factory = transactionChain;
512             }
513         }
514
515         if(this.schemaContext == null) {
516             throw new IllegalStateException("SchemaContext is not set");
517         }
518
519         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
520
521             shardMBean.incrementReadOnlyTransactionCount();
522
523             return getContext().actorOf(
524                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
525                         schemaContext,datastoreContext, shardMBean,
526                         transactionId.getRemoteTransactionId(), clientVersion),
527                         transactionId.toString());
528
529         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
530
531             shardMBean.incrementReadWriteTransactionCount();
532
533             return getContext().actorOf(
534                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
535                         schemaContext, datastoreContext, shardMBean,
536                         transactionId.getRemoteTransactionId(), clientVersion),
537                         transactionId.toString());
538
539
540         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
541
542             shardMBean.incrementWriteOnlyTransactionCount();
543
544             return getContext().actorOf(
545                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
546                         schemaContext, datastoreContext, shardMBean,
547                         transactionId.getRemoteTransactionId(), clientVersion),
548                         transactionId.toString());
549         } else {
550             throw new IllegalArgumentException(
551                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
552                     + transactionType);
553         }
554     }
555
556     private void createTransaction(CreateTransaction createTransaction) {
557         try {
558             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
559                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
560                 createTransaction.getVersion());
561
562             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
563                     createTransaction.getTransactionId()).toSerializable(), getSelf());
564         } catch (Exception e) {
565             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
566         }
567     }
568
569     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
570             String transactionChainId, short clientVersion) {
571
572         ShardTransactionIdentifier transactionId =
573             ShardTransactionIdentifier.builder()
574                 .remoteTransactionId(remoteTransactionId)
575                 .build();
576
577         if(LOG.isDebugEnabled()) {
578             LOG.debug("Creating transaction : {} ", transactionId);
579         }
580
581         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
582                 transactionChainId, clientVersion);
583
584         return transactionActor;
585     }
586
587     private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
588         throws ExecutionException, InterruptedException {
589         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
590         commitCohort.preCommit().get();
591         commitCohort.commit().get();
592     }
593
594     private void commitWithNewTransaction(final Modification modification) {
595         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
596         modification.apply(tx);
597         try {
598             syncCommitTransaction(tx);
599             shardMBean.incrementCommittedTransactionCount();
600             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
601         } catch (InterruptedException | ExecutionException e) {
602             shardMBean.incrementFailedTransactionsCount();
603             LOG.error(e, "Failed to commit");
604         }
605     }
606
607     private void updateSchemaContext(final UpdateSchemaContext message) {
608         this.schemaContext = message.getSchemaContext();
609         updateSchemaContext(message.getSchemaContext());
610         store.onGlobalContextUpdated(message.getSchemaContext());
611     }
612
613     @VisibleForTesting
614     void updateSchemaContext(final SchemaContext schemaContext) {
615         store.onGlobalContextUpdated(schemaContext);
616     }
617
618     private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
619
620         LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
621
622         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
623                                                      NormalizedNode<?, ?>>> registration;
624         if(isLeader()) {
625             registration = doChangeListenerRegistration(registerChangeListener);
626         } else {
627             LOG.debug("Shard is not the leader - delaying registration");
628
629             DelayedListenerRegistration delayedReg =
630                     new DelayedListenerRegistration(registerChangeListener);
631             delayedListenerRegistrations.add(delayedReg);
632             registration = delayedReg;
633         }
634
635         ActorRef listenerRegistration = getContext().actorOf(
636                 DataChangeListenerRegistration.props(registration));
637
638         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
639                     listenerRegistration.path());
640
641         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
642     }
643
644     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
645                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
646             final RegisterChangeListener registerChangeListener) {
647
648         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
649                 registerChangeListener.getDataChangeListenerPath());
650
651         // Notify the listener if notifications should be enabled or not
652         // If this shard is the leader then it will enable notifications else
653         // it will not
654         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
655
656         // Now store a reference to the data change listener so it can be notified
657         // at a later point if notifications should be enabled or disabled
658         dataChangeListeners.add(dataChangeListenerPath);
659
660         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
661                 new DataChangeListenerProxy(dataChangeListenerPath);
662
663         LOG.debug("Registering for path {}", registerChangeListener.getPath());
664
665         return store.registerChangeListener(registerChangeListener.getPath(), listener,
666                 registerChangeListener.getScope());
667     }
668
669     private boolean isMetricsCaptureEnabled(){
670         CommonConfig config = new CommonConfig(getContext().system().settings().config());
671         return config.isMetricCaptureEnabled();
672     }
673
674     @Override
675     protected
676     void startLogRecoveryBatch(final int maxBatchSize) {
677         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
678
679         if(LOG.isDebugEnabled()) {
680             LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
681         }
682     }
683
684     @Override
685     protected void appendRecoveredLogEntry(final Payload data) {
686         if (data instanceof CompositeModificationPayload) {
687             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
688         } else if (data instanceof CompositeModificationByteStringPayload) {
689             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
690         } else {
691             LOG.error("Unknown state received {} during recovery", data);
692         }
693     }
694
695     @Override
696     protected void applyRecoverySnapshot(final ByteString snapshot) {
697         if(recoveryCoordinator == null) {
698             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
699         }
700
701         recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
702
703         if(LOG.isDebugEnabled()) {
704             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
705         }
706     }
707
708     @Override
709     protected void applyCurrentLogRecoveryBatch() {
710         if(recoveryCoordinator == null) {
711             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
712         }
713
714         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
715
716         if(LOG.isDebugEnabled()) {
717             LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
718                     currentLogRecoveryBatch.size());
719         }
720     }
721
722     @Override
723     protected void onRecoveryComplete() {
724         if(recoveryCoordinator != null) {
725             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
726
727             if(LOG.isDebugEnabled()) {
728                 LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
729             }
730
731             for(DOMStoreWriteTransaction tx: txList) {
732                 try {
733                     syncCommitTransaction(tx);
734                     shardMBean.incrementCommittedTransactionCount();
735                 } catch (InterruptedException | ExecutionException e) {
736                     shardMBean.incrementFailedTransactionsCount();
737                     LOG.error(e, "Failed to commit");
738                 }
739             }
740         }
741
742         recoveryCoordinator = null;
743         currentLogRecoveryBatch = null;
744         updateJournalStats();
745
746         //notify shard manager
747         getContext().parent().tell(new ActorInitialized(), getSelf());
748
749         // Being paranoid here - this method should only be called once but just in case...
750         if(txCommitTimeoutCheckSchedule == null) {
751             // Schedule a message to be periodically sent to check if the current in-progress
752             // transaction should be expired and aborted.
753             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
754             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
755                     period, period, getSelf(),
756                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
757         }
758     }
759
760     @Override
761     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
762
763         if (data instanceof CompositeModificationPayload) {
764             Object modification = ((CompositeModificationPayload) data).getModification();
765
766             applyModificationToState(clientActor, identifier, modification);
767         } else if(data instanceof CompositeModificationByteStringPayload ){
768             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
769
770             applyModificationToState(clientActor, identifier, modification);
771
772         } else {
773             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
774                     data, data.getClass().getClassLoader(),
775                     CompositeModificationPayload.class.getClassLoader());
776         }
777
778         updateJournalStats();
779
780     }
781
782     private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
783         if(modification == null) {
784             LOG.error(
785                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
786                     identifier, clientActor != null ? clientActor.path().toString() : null);
787         } else if(clientActor == null) {
788             // There's no clientActor to which to send a commit reply so we must be applying
789             // replicated state from the leader.
790             commitWithNewTransaction(MutableCompositeModification.fromSerializable(
791                     modification, schemaContext));
792         } else {
793             // This must be the OK to commit after replication consensus.
794             finishCommit(clientActor, identifier);
795         }
796     }
797
798     private void updateJournalStats() {
799         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
800
801         if (lastLogEntry != null) {
802             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
803             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
804         }
805
806         shardMBean.setCommitIndex(getCommitIndex());
807         shardMBean.setLastApplied(getLastApplied());
808         shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
809     }
810
811     @Override
812     protected void createSnapshot() {
813         if (createSnapshotTransaction == null) {
814
815             // Create a transaction. We are really going to treat the transaction as a worker
816             // so that this actor does not get block building the snapshot
817             createSnapshotTransaction = createTransaction(
818                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
819                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
820                 DataStoreVersions.CURRENT_VERSION);
821
822             createSnapshotTransaction.tell(
823                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
824
825         }
826     }
827
828     @VisibleForTesting
829     @Override
830     protected void applySnapshot(final ByteString snapshot) {
831         // Since this will be done only on Recovery or when this actor is a Follower
832         // we can safely commit everything in here. We not need to worry about event notifications
833         // as they would have already been disabled on the follower
834
835         LOG.info("Applying snapshot");
836         try {
837             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
838             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
839             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
840                 .decode(serializedNode);
841
842             // delete everything first
843             transaction.delete(YangInstanceIdentifier.builder().build());
844
845             // Add everything from the remote node back
846             transaction.write(YangInstanceIdentifier.builder().build(), node);
847             syncCommitTransaction(transaction);
848         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
849             LOG.error(e, "An exception occurred when applying snapshot");
850         } finally {
851             LOG.info("Done applying snapshot");
852         }
853     }
854
855     @Override
856     protected void onStateChanged() {
857         boolean isLeader = isLeader();
858         for (ActorSelection dataChangeListener : dataChangeListeners) {
859             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
860         }
861
862         if(isLeader) {
863             for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
864                 if(!reg.isClosed()) {
865                     reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
866                 }
867             }
868
869             delayedListenerRegistrations.clear();
870         }
871
872         shardMBean.setRaftState(getRaftState().name());
873         shardMBean.setCurrentTerm(getCurrentTerm());
874
875         // If this actor is no longer the leader close all the transaction chains
876         if(!isLeader){
877             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
878                 if(LOG.isDebugEnabled()) {
879                     LOG.debug(
880                         "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
881                         entry.getKey(), getId());
882                 }
883                 entry.getValue().close();
884             }
885
886             transactionChains.clear();
887         }
888     }
889
890     @Override
891     protected DataPersistenceProvider persistence() {
892         return dataPersistenceProvider;
893     }
894
895     @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
896         shardMBean.setLeader(newLeader);
897     }
898
899     @Override public String persistenceId() {
900         return this.name.toString();
901     }
902
903     @VisibleForTesting
904     DataPersistenceProvider getDataPersistenceProvider() {
905         return dataPersistenceProvider;
906     }
907
908     private static class ShardCreator implements Creator<Shard> {
909
910         private static final long serialVersionUID = 1L;
911
912         final ShardIdentifier name;
913         final Map<ShardIdentifier, String> peerAddresses;
914         final DatastoreContext datastoreContext;
915         final SchemaContext schemaContext;
916
917         ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
918                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
919             this.name = name;
920             this.peerAddresses = peerAddresses;
921             this.datastoreContext = datastoreContext;
922             this.schemaContext = schemaContext;
923         }
924
925         @Override
926         public Shard create() throws Exception {
927             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
928         }
929     }
930
931     @VisibleForTesting
932     InMemoryDOMDataStore getDataStore() {
933         return store;
934     }
935
936     @VisibleForTesting
937     ShardStats getShardMBean() {
938         return shardMBean;
939     }
940
941     private static class DelayedListenerRegistration implements
942         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
943
944         private volatile boolean closed;
945
946         private final RegisterChangeListener registerChangeListener;
947
948         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
949                                                              NormalizedNode<?, ?>>> delegate;
950
951         DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
952             this.registerChangeListener = registerChangeListener;
953         }
954
955         void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
956                                             NormalizedNode<?, ?>>> registration) {
957             this.delegate = registration;
958         }
959
960         boolean isClosed() {
961             return closed;
962         }
963
964         RegisterChangeListener getRegisterChangeListener() {
965             return registerChangeListener;
966         }
967
968         @Override
969         public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
970             return delegate != null ? delegate.getInstance() : null;
971         }
972
973         @Override
974         public void close() {
975             closed = true;
976             if(delegate != null) {
977                 delegate.close();
978             }
979         }
980     }
981 }