Merge "Remove l2switch sample"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import akka.japi.Creator;
19 import akka.persistence.RecoveryFailure;
20 import akka.serialization.Serialization;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.collect.Lists;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.protobuf.ByteString;
29 import com.google.protobuf.InvalidProtocolBufferException;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
32 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
33 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
34 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
38 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
39 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
40 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
43 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
50 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
53 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
54 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
56 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
57 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
58 import org.opendaylight.controller.cluster.datastore.modification.Modification;
59 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
60 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
61 import org.opendaylight.controller.cluster.raft.RaftActor;
62 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
65 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
66 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
67 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
68 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
69 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
72 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
73 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
74 import org.opendaylight.yangtools.concepts.ListenerRegistration;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
78 import scala.concurrent.duration.Duration;
79 import scala.concurrent.duration.FiniteDuration;
80
81 import javax.annotation.Nonnull;
82 import java.util.Collection;
83 import java.util.HashMap;
84 import java.util.List;
85 import java.util.Map;
86 import java.util.concurrent.ExecutionException;
87 import java.util.concurrent.TimeUnit;
88
89 /**
90  * A Shard represents a portion of the logical data tree <br/>
91  * <p>
92  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
93  * </p>
94  */
95 public class Shard extends RaftActor {
96
97     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
98
99     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
100
101     public static final String DEFAULT_NAME = "default";
102
103     // The state of this Shard
104     private final InMemoryDOMDataStore store;
105
106     private final LoggingAdapter LOG =
107         Logging.getLogger(getContext().system(), this);
108
109     /// The name of this shard
110     private final ShardIdentifier name;
111
112     private final ShardStats shardMBean;
113
114     private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
115
116     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
117                                                                        Lists.newArrayList();
118
119     private final DatastoreContext datastoreContext;
120
121     private final DataPersistenceProvider dataPersistenceProvider;
122
123     private SchemaContext schemaContext;
124
125     private ActorRef createSnapshotTransaction;
126
127     private int createSnapshotTransactionCounter;
128
129     private final ShardCommitCoordinator commitCoordinator;
130
131     private final long transactionCommitTimeout;
132
133     private Cancellable txCommitTimeoutCheckSchedule;
134
135     /**
136      * Coordinates persistence recovery on startup.
137      */
138     private ShardRecoveryCoordinator recoveryCoordinator;
139     private List<Object> currentLogRecoveryBatch;
140
141     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
142
143     protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
144             DatastoreContext datastoreContext, SchemaContext schemaContext) {
145         super(name.toString(), mapPeerAddresses(peerAddresses),
146                 Optional.of(datastoreContext.getShardRaftConfig()));
147
148         this.name = name;
149         this.datastoreContext = datastoreContext;
150         this.schemaContext = schemaContext;
151         this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
152
153         LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
154
155         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
156                 datastoreContext.getDataStoreProperties());
157
158         if(schemaContext != null) {
159             store.onGlobalContextUpdated(schemaContext);
160         }
161
162         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
163                 datastoreContext.getDataStoreMXBeanType());
164         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
165         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
166
167         if (isMetricsCaptureEnabled()) {
168             getContext().become(new MeteringBehavior(this));
169         }
170
171         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
172                 datastoreContext.getShardTransactionCommitQueueCapacity());
173
174         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
175                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
176     }
177
178     private static Map<String, String> mapPeerAddresses(
179         Map<ShardIdentifier, String> peerAddresses) {
180         Map<String, String> map = new HashMap<>();
181
182         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
183             .entrySet()) {
184             map.put(entry.getKey().toString(), entry.getValue());
185         }
186
187         return map;
188     }
189
190     public static Props props(final ShardIdentifier name,
191         final Map<ShardIdentifier, String> peerAddresses,
192         DatastoreContext datastoreContext, SchemaContext schemaContext) {
193         Preconditions.checkNotNull(name, "name should not be null");
194         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
195         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
196         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
197
198         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
199     }
200
201     @Override
202     public void postStop() {
203         super.postStop();
204
205         if(txCommitTimeoutCheckSchedule != null) {
206             txCommitTimeoutCheckSchedule.cancel();
207         }
208     }
209
210     @Override
211     public void onReceiveRecover(Object message) throws Exception {
212         if(LOG.isDebugEnabled()) {
213             LOG.debug("onReceiveRecover: Received message {} from {}",
214                 message.getClass().toString(),
215                 getSender());
216         }
217
218         if (message instanceof RecoveryFailure){
219             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
220
221             // Even though recovery failed, we still need to finish our recovery, eg send the
222             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
223             onRecoveryComplete();
224         } else {
225             super.onReceiveRecover(message);
226         }
227     }
228
229     @Override
230     public void onReceiveCommand(Object message) throws Exception {
231         if(LOG.isDebugEnabled()) {
232             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
233         }
234
235         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
236             handleReadDataReply(message);
237         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
238             handleCreateTransaction(message);
239         } else if(message instanceof ForwardedReadyTransaction) {
240             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
241         } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
242             handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
243         } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
244             handleCommitTransaction(CommitTransaction.fromSerializable(message));
245         } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
246             handleAbortTransaction(AbortTransaction.fromSerializable(message));
247         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
248             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
249         } else if (message instanceof RegisterChangeListener) {
250             registerChangeListener((RegisterChangeListener) message);
251         } else if (message instanceof UpdateSchemaContext) {
252             updateSchemaContext((UpdateSchemaContext) message);
253         } else if (message instanceof PeerAddressResolved) {
254             PeerAddressResolved resolved = (PeerAddressResolved) message;
255             setPeerAddress(resolved.getPeerId().toString(),
256                 resolved.getPeerAddress());
257         } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
258             handleTransactionCommitTimeoutCheck();
259         } else {
260             super.onReceiveCommand(message);
261         }
262     }
263
264     private void handleTransactionCommitTimeoutCheck() {
265         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
266         if(cohortEntry != null) {
267             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
268             if(elapsed > transactionCommitTimeout) {
269                 LOG.warning("Current transaction {} has timed out after {} ms - aborting",
270                         cohortEntry.getTransactionID(), transactionCommitTimeout);
271
272                 doAbortTransaction(cohortEntry.getTransactionID(), null);
273             }
274         }
275     }
276
277     private void handleCommitTransaction(CommitTransaction commit) {
278         final String transactionID = commit.getTransactionID();
279
280         LOG.debug("Committing transaction {}", transactionID);
281
282         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
283         // this transaction.
284         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
285         if(cohortEntry == null) {
286             // We're not the current Tx - the Tx was likely expired b/c it took too long in
287             // between the canCommit and commit messages.
288             IllegalStateException ex = new IllegalStateException(
289                     String.format("Cannot commit transaction %s - it is not the current transaction",
290                             transactionID));
291             LOG.error(ex.getMessage());
292             shardMBean.incrementFailedTransactionsCount();
293             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
294             return;
295         }
296
297         // We perform the preCommit phase here atomically with the commit phase. This is an
298         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
299         // coordination of preCommit across shards in case of failure but preCommit should not
300         // normally fail since we ensure only one concurrent 3-phase commit.
301
302         try {
303             // We block on the future here so we don't have to worry about possibly accessing our
304             // state on a different thread outside of our dispatcher. Also, the data store
305             // currently uses a same thread executor anyway.
306             cohortEntry.getCohort().preCommit().get();
307
308             Shard.this.persistData(getSender(), transactionID,
309                     new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
310         } catch (InterruptedException | ExecutionException e) {
311             LOG.error(e, "An exception occurred while preCommitting transaction {}",
312                     cohortEntry.getTransactionID());
313             shardMBean.incrementFailedTransactionsCount();
314             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
315         }
316
317         cohortEntry.updateLastAccessTime();
318     }
319
320     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
321         // With persistence enabled, this method is called via applyState by the leader strategy
322         // after the commit has been replicated to a majority of the followers.
323
324         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
325         if(cohortEntry == null) {
326             // The transaction is no longer the current commit. This can happen if the transaction
327             // was aborted prior, most likely due to timeout in the front-end. We need to finish
328             // committing the transaction though since it was successfully persisted and replicated
329             // however we can't use the original cohort b/c it was already preCommitted and may
330             // conflict with the current commit or may have been aborted so we commit with a new
331             // transaction.
332             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
333             if(cohortEntry != null) {
334                 commitWithNewTransaction(cohortEntry.getModification());
335                 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
336             } else {
337                 // This really shouldn't happen - it likely means that persistence or replication
338                 // took so long to complete such that the cohort entry was expired from the cache.
339                 IllegalStateException ex = new IllegalStateException(
340                         String.format("Could not finish committing transaction %s - no CohortEntry found",
341                                 transactionID));
342                 LOG.error(ex.getMessage());
343                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
344             }
345
346             return;
347         }
348
349         LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
350
351         try {
352             // We block on the future here so we don't have to worry about possibly accessing our
353             // state on a different thread outside of our dispatcher. Also, the data store
354             // currently uses a same thread executor anyway.
355             cohortEntry.getCohort().commit().get();
356
357             sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
358
359             shardMBean.incrementCommittedTransactionCount();
360             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
361
362         } catch (InterruptedException | ExecutionException e) {
363             sender.tell(new akka.actor.Status.Failure(e), getSelf());
364
365             LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
366             shardMBean.incrementFailedTransactionsCount();
367         }
368
369         commitCoordinator.currentTransactionComplete(transactionID, true);
370     }
371
372     private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
373         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
374         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
375     }
376
377     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
378         LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
379                 ready.getTxnClientVersion());
380
381         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
382         // commitCoordinator in preparation for the subsequent three phase commit initiated by
383         // the front-end.
384         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
385                 ready.getModification());
386
387         // Return our actor path as we'll handle the three phase commit, except if the Tx client
388         // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
389         // node. In that case, the subsequent 3-phase commit messages won't contain the
390         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
391         // to provide the compatible behavior.
392         ActorRef replyActorPath = self();
393         if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
394             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
395             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
396                     ready.getTransactionID()));
397         }
398
399         ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
400                 Serialization.serializedActorPath(replyActorPath));
401         getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
402                 readyTransactionReply, getSelf());
403     }
404
405     private void handleAbortTransaction(AbortTransaction abort) {
406         doAbortTransaction(abort.getTransactionID(), getSender());
407     }
408
409     private void doAbortTransaction(String transactionID, final ActorRef sender) {
410         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
411         if(cohortEntry != null) {
412             LOG.debug("Aborting transaction {}", transactionID);
413
414             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
415             // aborted during replication in which case we may still commit locally if replication
416             // succeeds.
417             commitCoordinator.currentTransactionComplete(transactionID, false);
418
419             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
420             final ActorRef self = getSelf();
421
422             Futures.addCallback(future, new FutureCallback<Void>() {
423                 @Override
424                 public void onSuccess(Void v) {
425                     shardMBean.incrementAbortTransactionsCount();
426
427                     if(sender != null) {
428                         sender.tell(new AbortTransactionReply().toSerializable(), self);
429                     }
430                 }
431
432                 @Override
433                 public void onFailure(Throwable t) {
434                     LOG.error(t, "An exception happened during abort");
435
436                     if(sender != null) {
437                         sender.tell(new akka.actor.Status.Failure(t), self);
438                     }
439                 }
440             });
441         }
442     }
443
444     private void handleCreateTransaction(Object message) {
445         if (isLeader()) {
446             createTransaction(CreateTransaction.fromSerializable(message));
447         } else if (getLeader() != null) {
448             getLeader().forward(message, getContext());
449         } else {
450             getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
451                 "Could not find shard leader so transaction cannot be created. This typically happens" +
452                 " when the system is coming up or recovering and a leader is being elected. Try again" +
453                 " later.")), getSelf());
454         }
455     }
456
457     private void handleReadDataReply(Object message) {
458         // This must be for install snapshot. Don't want to open this up and trigger
459         // deSerialization
460
461         self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
462                 self());
463
464         createSnapshotTransaction = null;
465
466         // Send a PoisonPill instead of sending close transaction because we do not really need
467         // a response
468         getSender().tell(PoisonPill.getInstance(), self());
469     }
470
471     private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
472         DOMStoreTransactionChain chain =
473             transactionChains.remove(closeTransactionChain.getTransactionChainId());
474
475         if(chain != null) {
476             chain.close();
477         }
478     }
479
480     private ActorRef createTypedTransactionActor(int transactionType,
481             ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
482
483         DOMStoreTransactionFactory factory = store;
484
485         if(!transactionChainId.isEmpty()) {
486             factory = transactionChains.get(transactionChainId);
487             if(factory == null){
488                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
489                 transactionChains.put(transactionChainId, transactionChain);
490                 factory = transactionChain;
491             }
492         }
493
494         if(this.schemaContext == null){
495             throw new NullPointerException("schemaContext should not be null");
496         }
497
498         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
499
500             shardMBean.incrementReadOnlyTransactionCount();
501
502             return getContext().actorOf(
503                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
504                         schemaContext,datastoreContext, shardMBean,
505                         transactionId.getRemoteTransactionId(), clientVersion),
506                         transactionId.toString());
507
508         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
509
510             shardMBean.incrementReadWriteTransactionCount();
511
512             return getContext().actorOf(
513                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
514                         schemaContext, datastoreContext, shardMBean,
515                         transactionId.getRemoteTransactionId(), clientVersion),
516                         transactionId.toString());
517
518
519         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
520
521             shardMBean.incrementWriteOnlyTransactionCount();
522
523             return getContext().actorOf(
524                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
525                         schemaContext, datastoreContext, shardMBean,
526                         transactionId.getRemoteTransactionId(), clientVersion),
527                         transactionId.toString());
528         } else {
529             throw new IllegalArgumentException(
530                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
531                     + transactionType);
532         }
533     }
534
535     private void createTransaction(CreateTransaction createTransaction) {
536         createTransaction(createTransaction.getTransactionType(),
537             createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
538             createTransaction.getVersion());
539     }
540
541     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
542             String transactionChainId, int clientVersion) {
543
544         ShardTransactionIdentifier transactionId =
545             ShardTransactionIdentifier.builder()
546                 .remoteTransactionId(remoteTransactionId)
547                 .build();
548         if(LOG.isDebugEnabled()) {
549             LOG.debug("Creating transaction : {} ", transactionId);
550         }
551         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
552                 transactionChainId, clientVersion);
553
554         getSender()
555             .tell(new CreateTransactionReply(
556                     Serialization.serializedActorPath(transactionActor),
557                     remoteTransactionId).toSerializable(),
558                 getSelf());
559
560         return transactionActor;
561     }
562
563     private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
564         throws ExecutionException, InterruptedException {
565         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
566         commitCohort.preCommit().get();
567         commitCohort.commit().get();
568     }
569
570     private void commitWithNewTransaction(Modification modification) {
571         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
572         modification.apply(tx);
573         try {
574             syncCommitTransaction(tx);
575             shardMBean.incrementCommittedTransactionCount();
576             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
577         } catch (InterruptedException | ExecutionException e) {
578             shardMBean.incrementFailedTransactionsCount();
579             LOG.error(e, "Failed to commit");
580         }
581     }
582
583     private void updateSchemaContext(UpdateSchemaContext message) {
584         this.schemaContext = message.getSchemaContext();
585         updateSchemaContext(message.getSchemaContext());
586         store.onGlobalContextUpdated(message.getSchemaContext());
587     }
588
589     @VisibleForTesting
590     void updateSchemaContext(SchemaContext schemaContext) {
591         store.onGlobalContextUpdated(schemaContext);
592     }
593
594     private void registerChangeListener(RegisterChangeListener registerChangeListener) {
595
596         LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
597
598         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
599                                                      NormalizedNode<?, ?>>> registration;
600         if(isLeader()) {
601             registration = doChangeListenerRegistration(registerChangeListener);
602         } else {
603             LOG.debug("Shard is not the leader - delaying registration");
604
605             DelayedListenerRegistration delayedReg =
606                     new DelayedListenerRegistration(registerChangeListener);
607             delayedListenerRegistrations.add(delayedReg);
608             registration = delayedReg;
609         }
610
611         ActorRef listenerRegistration = getContext().actorOf(
612                 DataChangeListenerRegistration.props(registration));
613
614         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
615                     listenerRegistration.path());
616
617         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
618     }
619
620     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
621                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
622             RegisterChangeListener registerChangeListener) {
623
624         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
625                 registerChangeListener.getDataChangeListenerPath());
626
627         // Notify the listener if notifications should be enabled or not
628         // If this shard is the leader then it will enable notifications else
629         // it will not
630         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
631
632         // Now store a reference to the data change listener so it can be notified
633         // at a later point if notifications should be enabled or disabled
634         dataChangeListeners.add(dataChangeListenerPath);
635
636         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
637                 new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
638
639         LOG.debug("Registering for path {}", registerChangeListener.getPath());
640
641         return store.registerChangeListener(registerChangeListener.getPath(), listener,
642                 registerChangeListener.getScope());
643     }
644
645     private boolean isMetricsCaptureEnabled(){
646         CommonConfig config = new CommonConfig(getContext().system().settings().config());
647         return config.isMetricCaptureEnabled();
648     }
649
650     @Override
651     protected
652     void startLogRecoveryBatch(int maxBatchSize) {
653         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
654
655         if(LOG.isDebugEnabled()) {
656             LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
657         }
658     }
659
660     @Override
661     protected void appendRecoveredLogEntry(Payload data) {
662         if (data instanceof CompositeModificationPayload) {
663             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
664         } else {
665             LOG.error("Unknown state received {} during recovery", data);
666         }
667     }
668
669     @Override
670     protected void applyRecoverySnapshot(ByteString snapshot) {
671         if(recoveryCoordinator == null) {
672             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
673         }
674
675         recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
676
677         if(LOG.isDebugEnabled()) {
678             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
679         }
680     }
681
682     @Override
683     protected void applyCurrentLogRecoveryBatch() {
684         if(recoveryCoordinator == null) {
685             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
686         }
687
688         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
689
690         if(LOG.isDebugEnabled()) {
691             LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
692                     currentLogRecoveryBatch.size());
693         }
694     }
695
696     @Override
697     protected void onRecoveryComplete() {
698         if(recoveryCoordinator != null) {
699             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
700
701             if(LOG.isDebugEnabled()) {
702                 LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
703             }
704
705             for(DOMStoreWriteTransaction tx: txList) {
706                 try {
707                     syncCommitTransaction(tx);
708                     shardMBean.incrementCommittedTransactionCount();
709                 } catch (InterruptedException | ExecutionException e) {
710                     shardMBean.incrementFailedTransactionsCount();
711                     LOG.error(e, "Failed to commit");
712                 }
713             }
714         }
715
716         recoveryCoordinator = null;
717         currentLogRecoveryBatch = null;
718         updateJournalStats();
719
720         //notify shard manager
721         getContext().parent().tell(new ActorInitialized(), getSelf());
722
723         // Being paranoid here - this method should only be called once but just in case...
724         if(txCommitTimeoutCheckSchedule == null) {
725             // Schedule a message to be periodically sent to check if the current in-progress
726             // transaction should be expired and aborted.
727             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
728             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
729                     period, period, getSelf(),
730                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
731         }
732     }
733
734     @Override
735     protected void applyState(ActorRef clientActor, String identifier, Object data) {
736
737         if (data instanceof CompositeModificationPayload) {
738             Object modification = ((CompositeModificationPayload) data).getModification();
739
740             if(modification == null) {
741                 LOG.error(
742                      "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
743                      identifier, clientActor != null ? clientActor.path().toString() : null);
744             } else if(clientActor == null) {
745                 // There's no clientActor to which to send a commit reply so we must be applying
746                 // replicated state from the leader.
747                 commitWithNewTransaction(MutableCompositeModification.fromSerializable(
748                         modification, schemaContext));
749             } else {
750                 // This must be the OK to commit after replication consensus.
751                 finishCommit(clientActor, identifier);
752             }
753         } else {
754             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
755                     data, data.getClass().getClassLoader(),
756                     CompositeModificationPayload.class.getClassLoader());
757         }
758
759         updateJournalStats();
760
761     }
762
763     private void updateJournalStats() {
764         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
765
766         if (lastLogEntry != null) {
767             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
768             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
769         }
770
771         shardMBean.setCommitIndex(getCommitIndex());
772         shardMBean.setLastApplied(getLastApplied());
773     }
774
775     @Override
776     protected void createSnapshot() {
777         if (createSnapshotTransaction == null) {
778
779             // Create a transaction. We are really going to treat the transaction as a worker
780             // so that this actor does not get block building the snapshot
781             createSnapshotTransaction = createTransaction(
782                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
783                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
784                 CreateTransaction.CURRENT_VERSION);
785
786             createSnapshotTransaction.tell(
787                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
788
789         }
790     }
791
792     @VisibleForTesting
793     @Override
794     protected void applySnapshot(ByteString snapshot) {
795         // Since this will be done only on Recovery or when this actor is a Follower
796         // we can safely commit everything in here. We not need to worry about event notifications
797         // as they would have already been disabled on the follower
798
799         LOG.info("Applying snapshot");
800         try {
801             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
802             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
803             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
804                 .decode(serializedNode);
805
806             // delete everything first
807             transaction.delete(YangInstanceIdentifier.builder().build());
808
809             // Add everything from the remote node back
810             transaction.write(YangInstanceIdentifier.builder().build(), node);
811             syncCommitTransaction(transaction);
812         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
813             LOG.error(e, "An exception occurred when applying snapshot");
814         } finally {
815             LOG.info("Done applying snapshot");
816         }
817     }
818
819     @Override
820     protected void onStateChanged() {
821         boolean isLeader = isLeader();
822         for (ActorSelection dataChangeListener : dataChangeListeners) {
823             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
824         }
825
826         if(isLeader) {
827             for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
828                 if(!reg.isClosed()) {
829                     reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
830                 }
831             }
832
833             delayedListenerRegistrations.clear();
834         }
835
836         shardMBean.setRaftState(getRaftState().name());
837         shardMBean.setCurrentTerm(getCurrentTerm());
838
839         // If this actor is no longer the leader close all the transaction chains
840         if(!isLeader){
841             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
842                 if(LOG.isDebugEnabled()) {
843                     LOG.debug(
844                         "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
845                         entry.getKey(), getId());
846                 }
847                 entry.getValue().close();
848             }
849
850             transactionChains.clear();
851         }
852     }
853
854     @Override
855     protected DataPersistenceProvider persistence() {
856         return dataPersistenceProvider;
857     }
858
859     @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
860         shardMBean.setLeader(newLeader);
861     }
862
863     @Override public String persistenceId() {
864         return this.name.toString();
865     }
866
867     @VisibleForTesting
868     DataPersistenceProvider getDataPersistenceProvider() {
869         return dataPersistenceProvider;
870     }
871
872     private static class ShardCreator implements Creator<Shard> {
873
874         private static final long serialVersionUID = 1L;
875
876         final ShardIdentifier name;
877         final Map<ShardIdentifier, String> peerAddresses;
878         final DatastoreContext datastoreContext;
879         final SchemaContext schemaContext;
880
881         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
882                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
883             this.name = name;
884             this.peerAddresses = peerAddresses;
885             this.datastoreContext = datastoreContext;
886             this.schemaContext = schemaContext;
887         }
888
889         @Override
890         public Shard create() throws Exception {
891             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
892         }
893     }
894
895     @VisibleForTesting
896     InMemoryDOMDataStore getDataStore() {
897         return store;
898     }
899
900     @VisibleForTesting
901     ShardStats getShardMBean() {
902         return shardMBean;
903     }
904
905     private static class DelayedListenerRegistration implements
906         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
907
908         private volatile boolean closed;
909
910         private final RegisterChangeListener registerChangeListener;
911
912         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
913                                                              NormalizedNode<?, ?>>> delegate;
914
915         DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
916             this.registerChangeListener = registerChangeListener;
917         }
918
919         void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
920                                             NormalizedNode<?, ?>>> registration) {
921             this.delegate = registration;
922         }
923
924         boolean isClosed() {
925             return closed;
926         }
927
928         RegisterChangeListener getRegisterChangeListener() {
929             return registerChangeListener;
930         }
931
932         @Override
933         public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
934             return delegate != null ? delegate.getInstance() : null;
935         }
936
937         @Override
938         public void close() {
939             closed = true;
940             if(delegate != null) {
941                 delegate.close();
942             }
943         }
944     }
945 }