Merge "Bug 2260: Reduce overhead of unused TransactionProxy instances"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import akka.persistence.RecoveryFailure;
17 import akka.serialization.Serialization;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import java.io.IOException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import javax.annotation.Nonnull;
33 import org.opendaylight.controller.cluster.DataPersistenceProvider;
34 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
35 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
36 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
37 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
40 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
41 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
42 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
43 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
46 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
51 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.modification.Modification;
61 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
62 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
63 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
64 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
65 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
66 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
67 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
68 import org.opendaylight.controller.cluster.raft.RaftActor;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
71 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
72 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
74 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
75 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
76 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
77 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
78 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
79 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
80 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
81 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
82 import org.opendaylight.yangtools.concepts.ListenerRegistration;
83 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
84 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
85 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
86 import scala.concurrent.duration.Duration;
87 import scala.concurrent.duration.FiniteDuration;
88
89 /**
90  * A Shard represents a portion of the logical data tree <br/>
91  * <p>
92  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
93  * </p>
94  */
95 public class Shard extends RaftActor {
96
97     private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
98
99     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
100
101     @VisibleForTesting
102     static final String DEFAULT_NAME = "default";
103
104     // The state of this Shard
105     private final InMemoryDOMDataStore store;
106
107     /// The name of this shard
108     private final ShardIdentifier name;
109
110     private final ShardStats shardMBean;
111
112     private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
113
114     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
115                                                                        Lists.newArrayList();
116
117     private DatastoreContext datastoreContext;
118
119     private DataPersistenceProvider dataPersistenceProvider;
120
121     private SchemaContext schemaContext;
122
123     private int createSnapshotTransactionCounter;
124
125     private final ShardCommitCoordinator commitCoordinator;
126
127     private long transactionCommitTimeout;
128
129     private Cancellable txCommitTimeoutCheckSchedule;
130
131     private final Optional<ActorRef> roleChangeNotifier;
132
133     private final MessageTracker appendEntriesReplyTracker;
134
135     private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
136             Serialization.serializedActorPath(getSelf()));
137
138
139     /**
140      * Coordinates persistence recovery on startup.
141      */
142     private ShardRecoveryCoordinator recoveryCoordinator;
143     private List<Object> currentLogRecoveryBatch;
144
145     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
146
147     private final String txnDispatcherPath;
148
149     protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
150             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
151         super(name.toString(), mapPeerAddresses(peerAddresses),
152                 Optional.of(datastoreContext.getShardRaftConfig()));
153
154         this.name = name;
155         this.datastoreContext = datastoreContext;
156         this.schemaContext = schemaContext;
157         this.dataPersistenceProvider = (datastoreContext.isPersistent())
158                 ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
159         this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
160                 .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
161
162
163         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
164
165         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
166                 datastoreContext.getDataStoreProperties());
167
168         if(schemaContext != null) {
169             store.onGlobalContextUpdated(schemaContext);
170         }
171
172         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
173                 datastoreContext.getDataStoreMXBeanType());
174         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
175
176         if (isMetricsCaptureEnabled()) {
177             getContext().become(new MeteringBehavior(this));
178         }
179
180         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
181                 datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
182
183         setTransactionCommitTimeout();
184
185         // create a notifier actor for each cluster member
186         roleChangeNotifier = createRoleChangeNotifier(name.toString());
187
188         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
189                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
190     }
191
192     private void setTransactionCommitTimeout() {
193         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
194                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
195     }
196
197     private static Map<String, String> mapPeerAddresses(
198         final Map<ShardIdentifier, String> peerAddresses) {
199         Map<String, String> map = new HashMap<>();
200
201         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
202             .entrySet()) {
203             map.put(entry.getKey().toString(), entry.getValue());
204         }
205
206         return map;
207     }
208
209     public static Props props(final ShardIdentifier name,
210         final Map<ShardIdentifier, String> peerAddresses,
211         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
212         Preconditions.checkNotNull(name, "name should not be null");
213         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
214         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
215         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
216
217         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
218     }
219
220     private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
221         ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
222             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
223         return Optional.of(shardRoleChangeNotifier);
224     }
225
226     @Override
227     public void postStop() {
228         LOG.info("Stopping Shard {}", persistenceId());
229
230         super.postStop();
231
232         if(txCommitTimeoutCheckSchedule != null) {
233             txCommitTimeoutCheckSchedule.cancel();
234         }
235
236         shardMBean.unregisterMBean();
237     }
238
239     @Override
240     public void onReceiveRecover(final Object message) throws Exception {
241         if(LOG.isDebugEnabled()) {
242             LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
243                 message.getClass().toString(), getSender());
244         }
245
246         if (message instanceof RecoveryFailure){
247             LOG.error("{}: Recovery failed because of this cause",
248                     persistenceId(), ((RecoveryFailure) message).cause());
249
250             // Even though recovery failed, we still need to finish our recovery, eg send the
251             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
252             onRecoveryComplete();
253         } else {
254             super.onReceiveRecover(message);
255             if(LOG.isTraceEnabled()) {
256                 appendEntriesReplyTracker.begin();
257             }
258         }
259     }
260
261     @Override
262     public void onReceiveCommand(final Object message) throws Exception {
263
264         MessageTracker.Context context = appendEntriesReplyTracker.received(message);
265
266         if(context.error().isPresent()){
267             LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
268                     context.error());
269         }
270
271         try {
272             if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
273                 handleCreateTransaction(message);
274             } else if (message instanceof ForwardedReadyTransaction) {
275                 handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
276             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
277                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
278             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
279                 handleCommitTransaction(CommitTransaction.fromSerializable(message));
280             } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
281                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
282             } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
283                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
284             } else if (message instanceof RegisterChangeListener) {
285                 registerChangeListener((RegisterChangeListener) message);
286             } else if (message instanceof UpdateSchemaContext) {
287                 updateSchemaContext((UpdateSchemaContext) message);
288             } else if (message instanceof PeerAddressResolved) {
289                 PeerAddressResolved resolved = (PeerAddressResolved) message;
290                 setPeerAddress(resolved.getPeerId().toString(),
291                         resolved.getPeerAddress());
292             } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
293                 handleTransactionCommitTimeoutCheck();
294             } else if(message instanceof DatastoreContext) {
295                 onDatastoreContext((DatastoreContext)message);
296             } else if(message instanceof RegisterRoleChangeListener){
297                 roleChangeNotifier.get().forward(message, context());
298             } else {
299                 super.onReceiveCommand(message);
300             }
301         } finally {
302             context.done();
303         }
304     }
305
306     @Override
307     protected Optional<ActorRef> getRoleChangeNotifier() {
308         return roleChangeNotifier;
309     }
310
311     private void onDatastoreContext(DatastoreContext context) {
312         datastoreContext = context;
313
314         commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
315
316         setTransactionCommitTimeout();
317
318         if(datastoreContext.isPersistent() &&
319                 dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
320             dataPersistenceProvider = new PersistentDataProvider();
321         } else if(!datastoreContext.isPersistent() &&
322                 dataPersistenceProvider instanceof PersistentDataProvider) {
323             dataPersistenceProvider = new NonPersistentRaftDataProvider();
324         }
325
326         updateConfigParams(datastoreContext.getShardRaftConfig());
327     }
328
329     private void handleTransactionCommitTimeoutCheck() {
330         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
331         if(cohortEntry != null) {
332             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
333             if(elapsed > transactionCommitTimeout) {
334                 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
335                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
336
337                 doAbortTransaction(cohortEntry.getTransactionID(), null);
338             }
339         }
340     }
341
342     private void handleCommitTransaction(final CommitTransaction commit) {
343         final String transactionID = commit.getTransactionID();
344
345         LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
346
347         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
348         // this transaction.
349         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
350         if(cohortEntry == null) {
351             // We're not the current Tx - the Tx was likely expired b/c it took too long in
352             // between the canCommit and commit messages.
353             IllegalStateException ex = new IllegalStateException(
354                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
355                             persistenceId(), transactionID));
356             LOG.error(ex.getMessage());
357             shardMBean.incrementFailedTransactionsCount();
358             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
359             return;
360         }
361
362         // We perform the preCommit phase here atomically with the commit phase. This is an
363         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
364         // coordination of preCommit across shards in case of failure but preCommit should not
365         // normally fail since we ensure only one concurrent 3-phase commit.
366
367         try {
368             // We block on the future here so we don't have to worry about possibly accessing our
369             // state on a different thread outside of our dispatcher. Also, the data store
370             // currently uses a same thread executor anyway.
371             cohortEntry.getCohort().preCommit().get();
372
373             // If we do not have any followers and we are not using persistence we can
374             // apply modification to the state immediately
375             if(!hasFollowers() && !persistence().isRecoveryApplicable()){
376                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
377             } else {
378                 Shard.this.persistData(getSender(), transactionID,
379                         new ModificationPayload(cohortEntry.getModification()));
380             }
381         } catch (Exception e) {
382             LOG.error("{} An exception occurred while preCommitting transaction {}",
383                     persistenceId(), cohortEntry.getTransactionID(), e);
384             shardMBean.incrementFailedTransactionsCount();
385             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
386         }
387
388         cohortEntry.updateLastAccessTime();
389     }
390
391     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
392         // With persistence enabled, this method is called via applyState by the leader strategy
393         // after the commit has been replicated to a majority of the followers.
394
395         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
396         if(cohortEntry == null) {
397             // The transaction is no longer the current commit. This can happen if the transaction
398             // was aborted prior, most likely due to timeout in the front-end. We need to finish
399             // committing the transaction though since it was successfully persisted and replicated
400             // however we can't use the original cohort b/c it was already preCommitted and may
401             // conflict with the current commit or may have been aborted so we commit with a new
402             // transaction.
403             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
404             if(cohortEntry != null) {
405                 commitWithNewTransaction(cohortEntry.getModification());
406                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
407             } else {
408                 // This really shouldn't happen - it likely means that persistence or replication
409                 // took so long to complete such that the cohort entry was expired from the cache.
410                 IllegalStateException ex = new IllegalStateException(
411                         String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
412                                 persistenceId(), transactionID));
413                 LOG.error(ex.getMessage());
414                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
415             }
416
417             return;
418         }
419
420         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
421
422         try {
423             // We block on the future here so we don't have to worry about possibly accessing our
424             // state on a different thread outside of our dispatcher. Also, the data store
425             // currently uses a same thread executor anyway.
426             cohortEntry.getCohort().commit().get();
427
428             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
429
430             shardMBean.incrementCommittedTransactionCount();
431             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
432
433         } catch (Exception e) {
434             sender.tell(new akka.actor.Status.Failure(e), getSelf());
435
436             LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
437                     transactionID, e);
438             shardMBean.incrementFailedTransactionsCount();
439         } finally {
440             commitCoordinator.currentTransactionComplete(transactionID, true);
441         }
442     }
443
444     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
445         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
446         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
447     }
448
449     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
450         LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
451                 ready.getTransactionID(), ready.getTxnClientVersion());
452
453         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
454         // commitCoordinator in preparation for the subsequent three phase commit initiated by
455         // the front-end.
456         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
457                 ready.getModification());
458
459         // Return our actor path as we'll handle the three phase commit, except if the Tx client
460         // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
461         // node. In that case, the subsequent 3-phase commit messages won't contain the
462         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
463         // to provide the compatible behavior.
464         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
465             LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
466             ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
467                     ready.getTransactionID()));
468
469             ReadyTransactionReply readyTransactionReply =
470                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
471             getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
472                     readyTransactionReply, getSelf());
473
474         } else {
475
476             getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
477                     READY_TRANSACTION_REPLY, getSelf());
478         }
479     }
480
481     private void handleAbortTransaction(final AbortTransaction abort) {
482         doAbortTransaction(abort.getTransactionID(), getSender());
483     }
484
485     void doAbortTransaction(final String transactionID, final ActorRef sender) {
486         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
487         if(cohortEntry != null) {
488             LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
489
490             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
491             // aborted during replication in which case we may still commit locally if replication
492             // succeeds.
493             commitCoordinator.currentTransactionComplete(transactionID, false);
494
495             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
496             final ActorRef self = getSelf();
497
498             Futures.addCallback(future, new FutureCallback<Void>() {
499                 @Override
500                 public void onSuccess(final Void v) {
501                     shardMBean.incrementAbortTransactionsCount();
502
503                     if(sender != null) {
504                         sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
505                     }
506                 }
507
508                 @Override
509                 public void onFailure(final Throwable t) {
510                     LOG.error("{}: An exception happened during abort", persistenceId(), t);
511
512                     if(sender != null) {
513                         sender.tell(new akka.actor.Status.Failure(t), self);
514                     }
515                 }
516             });
517         }
518     }
519
520     private void handleCreateTransaction(final Object message) {
521         if (isLeader()) {
522             createTransaction(CreateTransaction.fromSerializable(message));
523         } else if (getLeader() != null) {
524             getLeader().forward(message, getContext());
525         } else {
526             getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
527                 "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
528                 " when the system is coming up or recovering and a leader is being elected. Try again" +
529                 " later.", persistenceId()))), getSelf());
530         }
531     }
532
533     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
534         DOMStoreTransactionChain chain =
535             transactionChains.remove(closeTransactionChain.getTransactionChainId());
536
537         if(chain != null) {
538             chain.close();
539         }
540     }
541
542     private ActorRef createTypedTransactionActor(int transactionType,
543             ShardTransactionIdentifier transactionId, String transactionChainId,
544             short clientVersion ) {
545
546         DOMStoreTransactionFactory factory = store;
547
548         if(!transactionChainId.isEmpty()) {
549             factory = transactionChains.get(transactionChainId);
550             if(factory == null){
551                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
552                 transactionChains.put(transactionChainId, transactionChain);
553                 factory = transactionChain;
554             }
555         }
556
557         if(this.schemaContext == null) {
558             throw new IllegalStateException("SchemaContext is not set");
559         }
560
561         if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
562
563             shardMBean.incrementWriteOnlyTransactionCount();
564
565             return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
566
567         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
568
569             shardMBean.incrementReadWriteTransactionCount();
570
571             return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
572
573         } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
574
575             shardMBean.incrementReadOnlyTransactionCount();
576
577             return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
578
579         } else {
580             throw new IllegalArgumentException(
581                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
582                     + transactionType);
583         }
584     }
585
586     private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
587                                             short clientVersion){
588         return getContext().actorOf(
589                 ShardTransaction.props(transaction, getSelf(),
590                         schemaContext, datastoreContext, shardMBean,
591                         transactionId.getRemoteTransactionId(), clientVersion)
592                         .withDispatcher(txnDispatcherPath),
593                 transactionId.toString());
594
595     }
596
597     private void createTransaction(CreateTransaction createTransaction) {
598         try {
599             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
600                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
601                 createTransaction.getVersion());
602
603             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
604                     createTransaction.getTransactionId()).toSerializable(), getSelf());
605         } catch (Exception e) {
606             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
607         }
608     }
609
610     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
611             String transactionChainId, short clientVersion) {
612
613
614         ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
615
616         if(LOG.isDebugEnabled()) {
617             LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
618         }
619
620         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
621                 transactionChainId, clientVersion);
622
623         return transactionActor;
624     }
625
626     private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
627         throws ExecutionException, InterruptedException {
628         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
629         commitCohort.preCommit().get();
630         commitCohort.commit().get();
631     }
632
633     private void commitWithNewTransaction(final Modification modification) {
634         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
635         modification.apply(tx);
636         try {
637             syncCommitTransaction(tx);
638             shardMBean.incrementCommittedTransactionCount();
639             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
640         } catch (InterruptedException | ExecutionException e) {
641             shardMBean.incrementFailedTransactionsCount();
642             LOG.error("{}: Failed to commit", persistenceId(), e);
643         }
644     }
645
646     private void updateSchemaContext(final UpdateSchemaContext message) {
647         this.schemaContext = message.getSchemaContext();
648         updateSchemaContext(message.getSchemaContext());
649         store.onGlobalContextUpdated(message.getSchemaContext());
650     }
651
652     @VisibleForTesting
653     void updateSchemaContext(final SchemaContext schemaContext) {
654         store.onGlobalContextUpdated(schemaContext);
655     }
656
657     private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
658
659         LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
660
661         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
662                                                      NormalizedNode<?, ?>>> registration;
663         if(isLeader()) {
664             registration = doChangeListenerRegistration(registerChangeListener);
665         } else {
666             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
667
668             DelayedListenerRegistration delayedReg =
669                     new DelayedListenerRegistration(registerChangeListener);
670             delayedListenerRegistrations.add(delayedReg);
671             registration = delayedReg;
672         }
673
674         ActorRef listenerRegistration = getContext().actorOf(
675                 DataChangeListenerRegistration.props(registration));
676
677         LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
678                 persistenceId(), listenerRegistration.path());
679
680         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
681     }
682
683     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
684                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
685             final RegisterChangeListener registerChangeListener) {
686
687         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
688                 registerChangeListener.getDataChangeListenerPath());
689
690         // Notify the listener if notifications should be enabled or not
691         // If this shard is the leader then it will enable notifications else
692         // it will not
693         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
694
695         // Now store a reference to the data change listener so it can be notified
696         // at a later point if notifications should be enabled or disabled
697         dataChangeListeners.add(dataChangeListenerPath);
698
699         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
700                 new DataChangeListenerProxy(dataChangeListenerPath);
701
702         LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
703
704         return store.registerChangeListener(registerChangeListener.getPath(), listener,
705                 registerChangeListener.getScope());
706     }
707
708     private boolean isMetricsCaptureEnabled(){
709         CommonConfig config = new CommonConfig(getContext().system().settings().config());
710         return config.isMetricCaptureEnabled();
711     }
712
713     @Override
714     protected
715     void startLogRecoveryBatch(final int maxBatchSize) {
716         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
717
718         if(LOG.isDebugEnabled()) {
719             LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
720         }
721     }
722
723     @Override
724     protected void appendRecoveredLogEntry(final Payload data) {
725         if(data instanceof ModificationPayload) {
726             try {
727                 currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
728             } catch (ClassNotFoundException | IOException e) {
729                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
730             }
731         } else if (data instanceof CompositeModificationPayload) {
732             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
733         } else if (data instanceof CompositeModificationByteStringPayload) {
734             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
735         } else {
736             LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
737         }
738     }
739
740     @Override
741     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
742         if(recoveryCoordinator == null) {
743             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
744                     LOG, name.toString());
745         }
746
747         recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
748
749         if(LOG.isDebugEnabled()) {
750             LOG.debug("{}: submitted recovery sbapshot", persistenceId());
751         }
752     }
753
754     @Override
755     protected void applyCurrentLogRecoveryBatch() {
756         if(recoveryCoordinator == null) {
757             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
758                     LOG, name.toString());
759         }
760
761         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
762
763         if(LOG.isDebugEnabled()) {
764             LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
765                     currentLogRecoveryBatch.size());
766         }
767     }
768
769     @Override
770     protected void onRecoveryComplete() {
771         if(recoveryCoordinator != null) {
772             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
773
774             if(LOG.isDebugEnabled()) {
775                 LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
776             }
777
778             for(DOMStoreWriteTransaction tx: txList) {
779                 try {
780                     syncCommitTransaction(tx);
781                     shardMBean.incrementCommittedTransactionCount();
782                 } catch (InterruptedException | ExecutionException e) {
783                     shardMBean.incrementFailedTransactionsCount();
784                     LOG.error("{}: Failed to commit", persistenceId(), e);
785                 }
786             }
787         }
788
789         recoveryCoordinator = null;
790         currentLogRecoveryBatch = null;
791         updateJournalStats();
792
793         //notify shard manager
794         getContext().parent().tell(new ActorInitialized(), getSelf());
795
796         // Being paranoid here - this method should only be called once but just in case...
797         if(txCommitTimeoutCheckSchedule == null) {
798             // Schedule a message to be periodically sent to check if the current in-progress
799             // transaction should be expired and aborted.
800             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
801             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
802                     period, period, getSelf(),
803                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
804         }
805     }
806
807     @Override
808     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
809
810         if(data instanceof ModificationPayload) {
811             try {
812                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
813             } catch (ClassNotFoundException | IOException e) {
814                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
815             }
816         }
817         else if (data instanceof CompositeModificationPayload) {
818             Object modification = ((CompositeModificationPayload) data).getModification();
819
820             applyModificationToState(clientActor, identifier, modification);
821         } else if(data instanceof CompositeModificationByteStringPayload ){
822             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
823
824             applyModificationToState(clientActor, identifier, modification);
825         } else {
826             LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
827                     persistenceId(), data, data.getClass().getClassLoader(),
828                     CompositeModificationPayload.class.getClassLoader());
829         }
830
831         updateJournalStats();
832
833     }
834
835     private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
836         if(modification == null) {
837             LOG.error(
838                     "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
839                     persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
840         } else if(clientActor == null) {
841             // There's no clientActor to which to send a commit reply so we must be applying
842             // replicated state from the leader.
843             commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
844         } else {
845             // This must be the OK to commit after replication consensus.
846             finishCommit(clientActor, identifier);
847         }
848     }
849
850     private void updateJournalStats() {
851         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
852
853         if (lastLogEntry != null) {
854             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
855             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
856         }
857
858         shardMBean.setCommitIndex(getCommitIndex());
859         shardMBean.setLastApplied(getLastApplied());
860         shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
861     }
862
863     @Override
864     protected void createSnapshot() {
865         // Create a transaction actor. We are really going to treat the transaction as a worker
866         // so that this actor does not get block building the snapshot. THe transaction actor will
867         // after processing the CreateSnapshot message.
868
869         ActorRef createSnapshotTransaction = createTransaction(
870                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
871                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
872                 DataStoreVersions.CURRENT_VERSION);
873
874         createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
875     }
876
877     @VisibleForTesting
878     @Override
879     protected void applySnapshot(final byte[] snapshotBytes) {
880         // Since this will be done only on Recovery or when this actor is a Follower
881         // we can safely commit everything in here. We not need to worry about event notifications
882         // as they would have already been disabled on the follower
883
884         LOG.info("{}: Applying snapshot", persistenceId());
885         try {
886             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
887
888             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
889
890             // delete everything first
891             transaction.delete(DATASTORE_ROOT);
892
893             // Add everything from the remote node back
894             transaction.write(DATASTORE_ROOT, node);
895             syncCommitTransaction(transaction);
896         } catch (InterruptedException | ExecutionException e) {
897             LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
898         } finally {
899             LOG.info("{}: Done applying snapshot", persistenceId());
900         }
901     }
902
903     @Override
904     protected void onStateChanged() {
905         boolean isLeader = isLeader();
906         for (ActorSelection dataChangeListener : dataChangeListeners) {
907             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
908         }
909
910         if(isLeader) {
911             for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
912                 if(!reg.isClosed()) {
913                     reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
914                 }
915             }
916
917             delayedListenerRegistrations.clear();
918         }
919
920         shardMBean.setRaftState(getRaftState().name());
921         shardMBean.setCurrentTerm(getCurrentTerm());
922
923         // If this actor is no longer the leader close all the transaction chains
924         if(!isLeader){
925             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
926                 if(LOG.isDebugEnabled()) {
927                     LOG.debug(
928                         "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
929                         persistenceId(), entry.getKey(), getId());
930                 }
931                 entry.getValue().close();
932             }
933
934             transactionChains.clear();
935         }
936     }
937
938     @Override
939     protected DataPersistenceProvider persistence() {
940         return dataPersistenceProvider;
941     }
942
943     @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
944         shardMBean.setLeader(newLeader);
945     }
946
947     @Override public String persistenceId() {
948         return this.name.toString();
949     }
950
951     @VisibleForTesting
952     DataPersistenceProvider getDataPersistenceProvider() {
953         return dataPersistenceProvider;
954     }
955
956     private static class ShardCreator implements Creator<Shard> {
957
958         private static final long serialVersionUID = 1L;
959
960         final ShardIdentifier name;
961         final Map<ShardIdentifier, String> peerAddresses;
962         final DatastoreContext datastoreContext;
963         final SchemaContext schemaContext;
964
965         ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
966                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
967             this.name = name;
968             this.peerAddresses = peerAddresses;
969             this.datastoreContext = datastoreContext;
970             this.schemaContext = schemaContext;
971         }
972
973         @Override
974         public Shard create() throws Exception {
975             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
976         }
977     }
978
979     @VisibleForTesting
980     InMemoryDOMDataStore getDataStore() {
981         return store;
982     }
983
984     @VisibleForTesting
985     ShardStats getShardMBean() {
986         return shardMBean;
987     }
988
989     private static class DelayedListenerRegistration implements
990         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
991
992         private volatile boolean closed;
993
994         private final RegisterChangeListener registerChangeListener;
995
996         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
997                                                              NormalizedNode<?, ?>>> delegate;
998
999         DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
1000             this.registerChangeListener = registerChangeListener;
1001         }
1002
1003         void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
1004                                             NormalizedNode<?, ?>>> registration) {
1005             this.delegate = registration;
1006         }
1007
1008         boolean isClosed() {
1009             return closed;
1010         }
1011
1012         RegisterChangeListener getRegisterChangeListener() {
1013             return registerChangeListener;
1014         }
1015
1016         @Override
1017         public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
1018             return delegate != null ? delegate.getInstance() : null;
1019         }
1020
1021         @Override
1022         public void close() {
1023             closed = true;
1024             if(delegate != null) {
1025                 delegate.close();
1026             }
1027         }
1028     }
1029 }