Merge "BUG 2718 : Create a diagnostic utility to track append entries replies"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import akka.persistence.RecoveryFailure;
17 import akka.serialization.Serialization;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import java.io.IOException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import javax.annotation.Nonnull;
33 import org.opendaylight.controller.cluster.DataPersistenceProvider;
34 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
35 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
36 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
37 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
40 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
41 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
42 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
43 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
46 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
51 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.modification.Modification;
61 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
62 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
63 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
64 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
65 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
66 import org.opendaylight.controller.cluster.raft.RaftActor;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
69 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
70 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
71 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
72 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
73 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
74 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
77 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
78 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
79 import org.opendaylight.yangtools.concepts.ListenerRegistration;
80 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import scala.concurrent.duration.Duration;
84 import scala.concurrent.duration.FiniteDuration;
85
86 /**
87  * A Shard represents a portion of the logical data tree <br/>
88  * <p>
89  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
90  * </p>
91  */
92 public class Shard extends RaftActor {
93
94     private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
95
96     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
97
98     @VisibleForTesting
99     static final String DEFAULT_NAME = "default";
100
101     // The state of this Shard
102     private final InMemoryDOMDataStore store;
103
104     /// The name of this shard
105     private final ShardIdentifier name;
106
107     private final ShardStats shardMBean;
108
109     private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
110
111     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
112                                                                        Lists.newArrayList();
113
114     private final DatastoreContext datastoreContext;
115
116     private final DataPersistenceProvider dataPersistenceProvider;
117
118     private SchemaContext schemaContext;
119
120     private int createSnapshotTransactionCounter;
121
122     private final ShardCommitCoordinator commitCoordinator;
123
124     private final long transactionCommitTimeout;
125
126     private Cancellable txCommitTimeoutCheckSchedule;
127
128     private final Optional<ActorRef> roleChangeNotifier;
129
130     private final MessageTracker appendEntriesReplyTracker;
131
132     /**
133      * Coordinates persistence recovery on startup.
134      */
135     private ShardRecoveryCoordinator recoveryCoordinator;
136     private List<Object> currentLogRecoveryBatch;
137
138     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
139
140     protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
141             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
142         super(name.toString(), mapPeerAddresses(peerAddresses),
143                 Optional.of(datastoreContext.getShardRaftConfig()));
144
145         this.name = name;
146         this.datastoreContext = datastoreContext;
147         this.schemaContext = schemaContext;
148         this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
149
150         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
151
152         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
153                 datastoreContext.getDataStoreProperties());
154
155         if(schemaContext != null) {
156             store.onGlobalContextUpdated(schemaContext);
157         }
158
159         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
160                 datastoreContext.getDataStoreMXBeanType());
161         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
162
163         if (isMetricsCaptureEnabled()) {
164             getContext().become(new MeteringBehavior(this));
165         }
166
167         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
168                 datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
169
170         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
171                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
172
173         // create a notifier actor for each cluster member
174         roleChangeNotifier = createRoleChangeNotifier(name.toString());
175
176         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
177                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
178     }
179
180     private static Map<String, String> mapPeerAddresses(
181         final Map<ShardIdentifier, String> peerAddresses) {
182         Map<String, String> map = new HashMap<>();
183
184         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
185             .entrySet()) {
186             map.put(entry.getKey().toString(), entry.getValue());
187         }
188
189         return map;
190     }
191
192     public static Props props(final ShardIdentifier name,
193         final Map<ShardIdentifier, String> peerAddresses,
194         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
195         Preconditions.checkNotNull(name, "name should not be null");
196         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
197         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
198         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
199
200         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
201     }
202
203     private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
204         ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
205             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
206         return Optional.<ActorRef>of(shardRoleChangeNotifier);
207     }
208
209     @Override
210     public void postStop() {
211         super.postStop();
212
213         if(txCommitTimeoutCheckSchedule != null) {
214             txCommitTimeoutCheckSchedule.cancel();
215         }
216     }
217
218     @Override
219     public void onReceiveRecover(final Object message) throws Exception {
220         if(LOG.isDebugEnabled()) {
221             LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
222                 message.getClass().toString(), getSender());
223         }
224
225         if (message instanceof RecoveryFailure){
226             LOG.error("{}: Recovery failed because of this cause",
227                     persistenceId(), ((RecoveryFailure) message).cause());
228
229             // Even though recovery failed, we still need to finish our recovery, eg send the
230             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
231             onRecoveryComplete();
232         } else {
233             super.onReceiveRecover(message);
234             if(LOG.isTraceEnabled()) {
235                 appendEntriesReplyTracker.begin();
236             }
237         }
238     }
239
240     @Override
241     public void onReceiveCommand(final Object message) throws Exception {
242
243         MessageTracker.Context context = appendEntriesReplyTracker.received(message);
244
245         if(context.error().isPresent()){
246             LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
247                     context.error());
248         }
249
250         try {
251             if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
252                 handleCreateTransaction(message);
253             } else if (message instanceof ForwardedReadyTransaction) {
254                 handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
255             } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
256                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
257             } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
258                 handleCommitTransaction(CommitTransaction.fromSerializable(message));
259             } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
260                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
261             } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
262                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
263             } else if (message instanceof RegisterChangeListener) {
264                 registerChangeListener((RegisterChangeListener) message);
265             } else if (message instanceof UpdateSchemaContext) {
266                 updateSchemaContext((UpdateSchemaContext) message);
267             } else if (message instanceof PeerAddressResolved) {
268                 PeerAddressResolved resolved = (PeerAddressResolved) message;
269                 setPeerAddress(resolved.getPeerId().toString(),
270                         resolved.getPeerAddress());
271             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
272                 handleTransactionCommitTimeoutCheck();
273             } else {
274                 super.onReceiveCommand(message);
275             }
276         } finally {
277             context.done();
278         }
279     }
280
281     @Override
282     protected Optional<ActorRef> getRoleChangeNotifier() {
283         return roleChangeNotifier;
284     }
285
286     private void handleTransactionCommitTimeoutCheck() {
287         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
288         if(cohortEntry != null) {
289             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
290             if(elapsed > transactionCommitTimeout) {
291                 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
292                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
293
294                 doAbortTransaction(cohortEntry.getTransactionID(), null);
295             }
296         }
297     }
298
299     private void handleCommitTransaction(final CommitTransaction commit) {
300         final String transactionID = commit.getTransactionID();
301
302         LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
303
304         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
305         // this transaction.
306         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
307         if(cohortEntry == null) {
308             // We're not the current Tx - the Tx was likely expired b/c it took too long in
309             // between the canCommit and commit messages.
310             IllegalStateException ex = new IllegalStateException(
311                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
312                             persistenceId(), transactionID));
313             LOG.error(ex.getMessage());
314             shardMBean.incrementFailedTransactionsCount();
315             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
316             return;
317         }
318
319         // We perform the preCommit phase here atomically with the commit phase. This is an
320         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
321         // coordination of preCommit across shards in case of failure but preCommit should not
322         // normally fail since we ensure only one concurrent 3-phase commit.
323
324         try {
325             // We block on the future here so we don't have to worry about possibly accessing our
326             // state on a different thread outside of our dispatcher. Also, the data store
327             // currently uses a same thread executor anyway.
328             cohortEntry.getCohort().preCommit().get();
329
330             // If we do not have any followers and we are not using persistence we can
331             // apply modification to the state immediately
332             if(!hasFollowers() && !persistence().isRecoveryApplicable()){
333                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
334             } else {
335                 Shard.this.persistData(getSender(), transactionID,
336                         new ModificationPayload(cohortEntry.getModification()));
337             }
338         } catch (Exception e) {
339             LOG.error("{} An exception occurred while preCommitting transaction {}",
340                     persistenceId(), cohortEntry.getTransactionID(), e);
341             shardMBean.incrementFailedTransactionsCount();
342             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
343         }
344
345         cohortEntry.updateLastAccessTime();
346     }
347
348     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
349         // With persistence enabled, this method is called via applyState by the leader strategy
350         // after the commit has been replicated to a majority of the followers.
351
352         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
353         if(cohortEntry == null) {
354             // The transaction is no longer the current commit. This can happen if the transaction
355             // was aborted prior, most likely due to timeout in the front-end. We need to finish
356             // committing the transaction though since it was successfully persisted and replicated
357             // however we can't use the original cohort b/c it was already preCommitted and may
358             // conflict with the current commit or may have been aborted so we commit with a new
359             // transaction.
360             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
361             if(cohortEntry != null) {
362                 commitWithNewTransaction(cohortEntry.getModification());
363                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
364             } else {
365                 // This really shouldn't happen - it likely means that persistence or replication
366                 // took so long to complete such that the cohort entry was expired from the cache.
367                 IllegalStateException ex = new IllegalStateException(
368                         String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
369                                 persistenceId(), transactionID));
370                 LOG.error(ex.getMessage());
371                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
372             }
373
374             return;
375         }
376
377         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
378
379         try {
380             // We block on the future here so we don't have to worry about possibly accessing our
381             // state on a different thread outside of our dispatcher. Also, the data store
382             // currently uses a same thread executor anyway.
383             cohortEntry.getCohort().commit().get();
384
385             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
386
387             shardMBean.incrementCommittedTransactionCount();
388             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
389
390         } catch (Exception e) {
391             sender.tell(new akka.actor.Status.Failure(e), getSelf());
392
393             LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
394                     transactionID, e);
395             shardMBean.incrementFailedTransactionsCount();
396         } finally {
397             commitCoordinator.currentTransactionComplete(transactionID, true);
398         }
399     }
400
401     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
402         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
403         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
404     }
405
406     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
407         LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
408                 ready.getTransactionID(), ready.getTxnClientVersion());
409
410         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
411         // commitCoordinator in preparation for the subsequent three phase commit initiated by
412         // the front-end.
413         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
414                 ready.getModification());
415
416         // Return our actor path as we'll handle the three phase commit, except if the Tx client
417         // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
418         // node. In that case, the subsequent 3-phase commit messages won't contain the
419         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
420         // to provide the compatible behavior.
421         ActorRef replyActorPath = self();
422         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
423             LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
424             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
425                     ready.getTransactionID()));
426         }
427
428         ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
429                 Serialization.serializedActorPath(replyActorPath));
430         getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
431                 readyTransactionReply, getSelf());
432     }
433
434     private void handleAbortTransaction(final AbortTransaction abort) {
435         doAbortTransaction(abort.getTransactionID(), getSender());
436     }
437
438     void doAbortTransaction(final String transactionID, final ActorRef sender) {
439         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
440         if(cohortEntry != null) {
441             LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
442
443             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
444             // aborted during replication in which case we may still commit locally if replication
445             // succeeds.
446             commitCoordinator.currentTransactionComplete(transactionID, false);
447
448             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
449             final ActorRef self = getSelf();
450
451             Futures.addCallback(future, new FutureCallback<Void>() {
452                 @Override
453                 public void onSuccess(final Void v) {
454                     shardMBean.incrementAbortTransactionsCount();
455
456                     if(sender != null) {
457                         sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
458                     }
459                 }
460
461                 @Override
462                 public void onFailure(final Throwable t) {
463                     LOG.error("{}: An exception happened during abort", persistenceId(), t);
464
465                     if(sender != null) {
466                         sender.tell(new akka.actor.Status.Failure(t), self);
467                     }
468                 }
469             });
470         }
471     }
472
473     private void handleCreateTransaction(final Object message) {
474         if (isLeader()) {
475             createTransaction(CreateTransaction.fromSerializable(message));
476         } else if (getLeader() != null) {
477             getLeader().forward(message, getContext());
478         } else {
479             getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
480                 "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
481                 " when the system is coming up or recovering and a leader is being elected. Try again" +
482                 " later.", persistenceId()))), getSelf());
483         }
484     }
485
486     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
487         DOMStoreTransactionChain chain =
488             transactionChains.remove(closeTransactionChain.getTransactionChainId());
489
490         if(chain != null) {
491             chain.close();
492         }
493     }
494
495     private ActorRef createTypedTransactionActor(int transactionType,
496             ShardTransactionIdentifier transactionId, String transactionChainId,
497             short clientVersion ) {
498
499         DOMStoreTransactionFactory factory = store;
500
501         if(!transactionChainId.isEmpty()) {
502             factory = transactionChains.get(transactionChainId);
503             if(factory == null){
504                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
505                 transactionChains.put(transactionChainId, transactionChain);
506                 factory = transactionChain;
507             }
508         }
509
510         if(this.schemaContext == null) {
511             throw new IllegalStateException("SchemaContext is not set");
512         }
513
514         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
515
516             shardMBean.incrementReadOnlyTransactionCount();
517
518             return getContext().actorOf(
519                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
520                         schemaContext, datastoreContext, shardMBean,
521                         transactionId.getRemoteTransactionId(), clientVersion),
522                         transactionId.toString());
523
524         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
525
526             shardMBean.incrementReadWriteTransactionCount();
527
528             return getContext().actorOf(
529                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
530                         schemaContext, datastoreContext, shardMBean,
531                         transactionId.getRemoteTransactionId(), clientVersion),
532                         transactionId.toString());
533
534
535         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
536
537             shardMBean.incrementWriteOnlyTransactionCount();
538
539             return getContext().actorOf(
540                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
541                         schemaContext, datastoreContext, shardMBean,
542                         transactionId.getRemoteTransactionId(), clientVersion),
543                         transactionId.toString());
544         } else {
545             throw new IllegalArgumentException(
546                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
547                     + transactionType);
548         }
549     }
550
551     private void createTransaction(CreateTransaction createTransaction) {
552         try {
553             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
554                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
555                 createTransaction.getVersion());
556
557             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
558                     createTransaction.getTransactionId()).toSerializable(), getSelf());
559         } catch (Exception e) {
560             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
561         }
562     }
563
564     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
565             String transactionChainId, short clientVersion) {
566
567         ShardTransactionIdentifier transactionId =
568             ShardTransactionIdentifier.builder()
569                 .remoteTransactionId(remoteTransactionId)
570                 .build();
571
572         if(LOG.isDebugEnabled()) {
573             LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
574         }
575
576         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
577                 transactionChainId, clientVersion);
578
579         return transactionActor;
580     }
581
582     private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
583         throws ExecutionException, InterruptedException {
584         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
585         commitCohort.preCommit().get();
586         commitCohort.commit().get();
587     }
588
589     private void commitWithNewTransaction(final Modification modification) {
590         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
591         modification.apply(tx);
592         try {
593             syncCommitTransaction(tx);
594             shardMBean.incrementCommittedTransactionCount();
595             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
596         } catch (InterruptedException | ExecutionException e) {
597             shardMBean.incrementFailedTransactionsCount();
598             LOG.error("{}: Failed to commit", persistenceId(), e);
599         }
600     }
601
602     private void updateSchemaContext(final UpdateSchemaContext message) {
603         this.schemaContext = message.getSchemaContext();
604         updateSchemaContext(message.getSchemaContext());
605         store.onGlobalContextUpdated(message.getSchemaContext());
606     }
607
608     @VisibleForTesting
609     void updateSchemaContext(final SchemaContext schemaContext) {
610         store.onGlobalContextUpdated(schemaContext);
611     }
612
613     private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
614
615         LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
616
617         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
618                                                      NormalizedNode<?, ?>>> registration;
619         if(isLeader()) {
620             registration = doChangeListenerRegistration(registerChangeListener);
621         } else {
622             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
623
624             DelayedListenerRegistration delayedReg =
625                     new DelayedListenerRegistration(registerChangeListener);
626             delayedListenerRegistrations.add(delayedReg);
627             registration = delayedReg;
628         }
629
630         ActorRef listenerRegistration = getContext().actorOf(
631                 DataChangeListenerRegistration.props(registration));
632
633         LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
634                 persistenceId(), listenerRegistration.path());
635
636         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
637     }
638
639     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
640                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
641             final RegisterChangeListener registerChangeListener) {
642
643         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
644                 registerChangeListener.getDataChangeListenerPath());
645
646         // Notify the listener if notifications should be enabled or not
647         // If this shard is the leader then it will enable notifications else
648         // it will not
649         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
650
651         // Now store a reference to the data change listener so it can be notified
652         // at a later point if notifications should be enabled or disabled
653         dataChangeListeners.add(dataChangeListenerPath);
654
655         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
656                 new DataChangeListenerProxy(dataChangeListenerPath);
657
658         LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
659
660         return store.registerChangeListener(registerChangeListener.getPath(), listener,
661                 registerChangeListener.getScope());
662     }
663
664     private boolean isMetricsCaptureEnabled(){
665         CommonConfig config = new CommonConfig(getContext().system().settings().config());
666         return config.isMetricCaptureEnabled();
667     }
668
669     @Override
670     protected
671     void startLogRecoveryBatch(final int maxBatchSize) {
672         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
673
674         if(LOG.isDebugEnabled()) {
675             LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
676         }
677     }
678
679     @Override
680     protected void appendRecoveredLogEntry(final Payload data) {
681         if(data instanceof ModificationPayload) {
682             try {
683                 currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
684             } catch (ClassNotFoundException | IOException e) {
685                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
686             }
687         } else if (data instanceof CompositeModificationPayload) {
688             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
689         } else if (data instanceof CompositeModificationByteStringPayload) {
690             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
691         } else {
692             LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
693         }
694     }
695
696     @Override
697     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
698         if(recoveryCoordinator == null) {
699             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
700                     LOG, name.toString());
701         }
702
703         recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
704
705         if(LOG.isDebugEnabled()) {
706             LOG.debug("{}: submitted recovery sbapshot", persistenceId());
707         }
708     }
709
710     @Override
711     protected void applyCurrentLogRecoveryBatch() {
712         if(recoveryCoordinator == null) {
713             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
714                     LOG, name.toString());
715         }
716
717         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
718
719         if(LOG.isDebugEnabled()) {
720             LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
721                     currentLogRecoveryBatch.size());
722         }
723     }
724
725     @Override
726     protected void onRecoveryComplete() {
727         if(recoveryCoordinator != null) {
728             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
729
730             if(LOG.isDebugEnabled()) {
731                 LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
732             }
733
734             for(DOMStoreWriteTransaction tx: txList) {
735                 try {
736                     syncCommitTransaction(tx);
737                     shardMBean.incrementCommittedTransactionCount();
738                 } catch (InterruptedException | ExecutionException e) {
739                     shardMBean.incrementFailedTransactionsCount();
740                     LOG.error("{}: Failed to commit", persistenceId(), e);
741                 }
742             }
743         }
744
745         recoveryCoordinator = null;
746         currentLogRecoveryBatch = null;
747         updateJournalStats();
748
749         //notify shard manager
750         getContext().parent().tell(new ActorInitialized(), getSelf());
751
752         // Being paranoid here - this method should only be called once but just in case...
753         if(txCommitTimeoutCheckSchedule == null) {
754             // Schedule a message to be periodically sent to check if the current in-progress
755             // transaction should be expired and aborted.
756             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
757             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
758                     period, period, getSelf(),
759                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
760         }
761     }
762
763     @Override
764     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
765
766         if(data instanceof ModificationPayload) {
767             try {
768                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
769             } catch (ClassNotFoundException | IOException e) {
770                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
771             }
772         }
773         else if (data instanceof CompositeModificationPayload) {
774             Object modification = ((CompositeModificationPayload) data).getModification();
775
776             applyModificationToState(clientActor, identifier, modification);
777         } else if(data instanceof CompositeModificationByteStringPayload ){
778             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
779
780             applyModificationToState(clientActor, identifier, modification);
781         } else {
782             LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
783                     persistenceId(), data, data.getClass().getClassLoader(),
784                     CompositeModificationPayload.class.getClassLoader());
785         }
786
787         updateJournalStats();
788
789     }
790
791     private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
792         if(modification == null) {
793             LOG.error(
794                     "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
795                     persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
796         } else if(clientActor == null) {
797             // There's no clientActor to which to send a commit reply so we must be applying
798             // replicated state from the leader.
799             commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
800         } else {
801             // This must be the OK to commit after replication consensus.
802             finishCommit(clientActor, identifier);
803         }
804     }
805
806     private void updateJournalStats() {
807         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
808
809         if (lastLogEntry != null) {
810             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
811             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
812         }
813
814         shardMBean.setCommitIndex(getCommitIndex());
815         shardMBean.setLastApplied(getLastApplied());
816         shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
817     }
818
819     @Override
820     protected void createSnapshot() {
821         // Create a transaction actor. We are really going to treat the transaction as a worker
822         // so that this actor does not get block building the snapshot. THe transaction actor will
823         // after processing the CreateSnapshot message.
824
825         ActorRef createSnapshotTransaction = createTransaction(
826                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
827                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
828                 DataStoreVersions.CURRENT_VERSION);
829
830         createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
831     }
832
833     @VisibleForTesting
834     @Override
835     protected void applySnapshot(final byte[] snapshotBytes) {
836         // Since this will be done only on Recovery or when this actor is a Follower
837         // we can safely commit everything in here. We not need to worry about event notifications
838         // as they would have already been disabled on the follower
839
840         LOG.info("{}: Applying snapshot", persistenceId());
841         try {
842             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
843
844             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
845
846             // delete everything first
847             transaction.delete(DATASTORE_ROOT);
848
849             // Add everything from the remote node back
850             transaction.write(DATASTORE_ROOT, node);
851             syncCommitTransaction(transaction);
852         } catch (InterruptedException | ExecutionException e) {
853             LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
854         } finally {
855             LOG.info("{}: Done applying snapshot", persistenceId());
856         }
857     }
858
859     @Override
860     protected void onStateChanged() {
861         boolean isLeader = isLeader();
862         for (ActorSelection dataChangeListener : dataChangeListeners) {
863             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
864         }
865
866         if(isLeader) {
867             for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
868                 if(!reg.isClosed()) {
869                     reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
870                 }
871             }
872
873             delayedListenerRegistrations.clear();
874         }
875
876         shardMBean.setRaftState(getRaftState().name());
877         shardMBean.setCurrentTerm(getCurrentTerm());
878
879         // If this actor is no longer the leader close all the transaction chains
880         if(!isLeader){
881             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
882                 if(LOG.isDebugEnabled()) {
883                     LOG.debug(
884                         "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
885                         persistenceId(), entry.getKey(), getId());
886                 }
887                 entry.getValue().close();
888             }
889
890             transactionChains.clear();
891         }
892     }
893
894     @Override
895     protected DataPersistenceProvider persistence() {
896         return dataPersistenceProvider;
897     }
898
899     @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
900         shardMBean.setLeader(newLeader);
901     }
902
903     @Override public String persistenceId() {
904         return this.name.toString();
905     }
906
907     @VisibleForTesting
908     DataPersistenceProvider getDataPersistenceProvider() {
909         return dataPersistenceProvider;
910     }
911
912     private static class ShardCreator implements Creator<Shard> {
913
914         private static final long serialVersionUID = 1L;
915
916         final ShardIdentifier name;
917         final Map<ShardIdentifier, String> peerAddresses;
918         final DatastoreContext datastoreContext;
919         final SchemaContext schemaContext;
920
921         ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
922                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
923             this.name = name;
924             this.peerAddresses = peerAddresses;
925             this.datastoreContext = datastoreContext;
926             this.schemaContext = schemaContext;
927         }
928
929         @Override
930         public Shard create() throws Exception {
931             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
932         }
933     }
934
935     @VisibleForTesting
936     InMemoryDOMDataStore getDataStore() {
937         return store;
938     }
939
940     @VisibleForTesting
941     ShardStats getShardMBean() {
942         return shardMBean;
943     }
944
945     private static class DelayedListenerRegistration implements
946         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
947
948         private volatile boolean closed;
949
950         private final RegisterChangeListener registerChangeListener;
951
952         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
953                                                              NormalizedNode<?, ?>>> delegate;
954
955         DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
956             this.registerChangeListener = registerChangeListener;
957         }
958
959         void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
960                                             NormalizedNode<?, ?>>> registration) {
961             this.delegate = registration;
962         }
963
964         boolean isClosed() {
965             return closed;
966         }
967
968         RegisterChangeListener getRegisterChangeListener() {
969             return registerChangeListener;
970         }
971
972         @Override
973         public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
974             return delegate != null ? delegate.getInstance() : null;
975         }
976
977         @Override
978         public void close() {
979             closed = true;
980             if(delegate != null) {
981                 delegate.close();
982             }
983         }
984     }
985 }