Merge "added namespace to features/adsal pom.xml"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import akka.japi.Creator;
19 import akka.persistence.RecoveryFailure;
20 import akka.serialization.Serialization;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.collect.Lists;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.protobuf.ByteString;
29 import com.google.protobuf.InvalidProtocolBufferException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import javax.annotation.Nonnull;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
39 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
40 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
41 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
42 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
45 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
57 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
67 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
69 import org.opendaylight.controller.cluster.raft.RaftActor;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
72 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
74 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
75 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
76 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
77 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
78 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
79 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
80 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
81 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
82 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
83 import org.opendaylight.yangtools.concepts.ListenerRegistration;
84 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
85 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
86 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
87 import scala.concurrent.duration.Duration;
88 import scala.concurrent.duration.FiniteDuration;
89
90 /**
91  * A Shard represents a portion of the logical data tree <br/>
92  * <p>
93  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
94  * </p>
95  */
96 public class Shard extends RaftActor {
97
98     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
99
100     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
101
102     public static final String DEFAULT_NAME = "default";
103
104     // The state of this Shard
105     private final InMemoryDOMDataStore store;
106
107     private final LoggingAdapter LOG =
108         Logging.getLogger(getContext().system(), this);
109
110     /// The name of this shard
111     private final ShardIdentifier name;
112
113     private final ShardStats shardMBean;
114
115     private final List<ActorSelection> dataChangeListeners =  Lists.newArrayList();
116
117     private final List<DelayedListenerRegistration> delayedListenerRegistrations =
118                                                                        Lists.newArrayList();
119
120     private final DatastoreContext datastoreContext;
121
122     private final DataPersistenceProvider dataPersistenceProvider;
123
124     private SchemaContext schemaContext;
125
126     private ActorRef createSnapshotTransaction;
127
128     private int createSnapshotTransactionCounter;
129
130     private final ShardCommitCoordinator commitCoordinator;
131
132     private final long transactionCommitTimeout;
133
134     private Cancellable txCommitTimeoutCheckSchedule;
135
136     private Optional<ActorRef> roleChangeNotifier;
137
138     /**
139      * Coordinates persistence recovery on startup.
140      */
141     private ShardRecoveryCoordinator recoveryCoordinator;
142     private List<Object> currentLogRecoveryBatch;
143
144     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
145
146     protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
147             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
148         super(name.toString(), mapPeerAddresses(peerAddresses),
149                 Optional.of(datastoreContext.getShardRaftConfig()));
150
151         this.name = name;
152         this.datastoreContext = datastoreContext;
153         this.schemaContext = schemaContext;
154         this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
155
156         LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
157
158         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
159                 datastoreContext.getDataStoreProperties());
160
161         if(schemaContext != null) {
162             store.onGlobalContextUpdated(schemaContext);
163         }
164
165         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
166                 datastoreContext.getDataStoreMXBeanType());
167         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
168
169         if (isMetricsCaptureEnabled()) {
170             getContext().become(new MeteringBehavior(this));
171         }
172
173         commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
174                 datastoreContext.getShardTransactionCommitQueueCapacity());
175
176         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
177                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
178
179         // create a notifier actor for each cluster member
180         roleChangeNotifier = createRoleChangeNotifier(name.toString());
181     }
182
183     private static Map<String, String> mapPeerAddresses(
184         final Map<ShardIdentifier, String> peerAddresses) {
185         Map<String, String> map = new HashMap<>();
186
187         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
188             .entrySet()) {
189             map.put(entry.getKey().toString(), entry.getValue());
190         }
191
192         return map;
193     }
194
195     public static Props props(final ShardIdentifier name,
196         final Map<ShardIdentifier, String> peerAddresses,
197         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
198         Preconditions.checkNotNull(name, "name should not be null");
199         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
200         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
201         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
202
203         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
204     }
205
206     private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
207         ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
208             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
209         return Optional.<ActorRef>of(shardRoleChangeNotifier);
210     }
211
212     @Override
213     public void postStop() {
214         super.postStop();
215
216         if(txCommitTimeoutCheckSchedule != null) {
217             txCommitTimeoutCheckSchedule.cancel();
218         }
219     }
220
221     @Override
222     public void onReceiveRecover(final Object message) throws Exception {
223         if(LOG.isDebugEnabled()) {
224             LOG.debug("onReceiveRecover: Received message {} from {}",
225                 message.getClass().toString(),
226                 getSender());
227         }
228
229         if (message instanceof RecoveryFailure){
230             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
231
232             // Even though recovery failed, we still need to finish our recovery, eg send the
233             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
234             onRecoveryComplete();
235         } else {
236             super.onReceiveRecover(message);
237         }
238     }
239
240     @Override
241     public void onReceiveCommand(final Object message) throws Exception {
242         if(LOG.isDebugEnabled()) {
243             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
244         }
245
246         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
247             handleReadDataReply(message);
248         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
249             handleCreateTransaction(message);
250         } else if(message instanceof ForwardedReadyTransaction) {
251             handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
252         } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
253             handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
254         } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
255             handleCommitTransaction(CommitTransaction.fromSerializable(message));
256         } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
257             handleAbortTransaction(AbortTransaction.fromSerializable(message));
258         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
259             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
260         } else if (message instanceof RegisterChangeListener) {
261             registerChangeListener((RegisterChangeListener) message);
262         } else if (message instanceof UpdateSchemaContext) {
263             updateSchemaContext((UpdateSchemaContext) message);
264         } else if (message instanceof PeerAddressResolved) {
265             PeerAddressResolved resolved = (PeerAddressResolved) message;
266             setPeerAddress(resolved.getPeerId().toString(),
267                 resolved.getPeerAddress());
268         } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
269             handleTransactionCommitTimeoutCheck();
270         } else {
271             super.onReceiveCommand(message);
272         }
273     }
274
275     @Override
276     protected Optional<ActorRef> getRoleChangeNotifier() {
277         return roleChangeNotifier;
278     }
279
280     private void handleTransactionCommitTimeoutCheck() {
281         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
282         if(cohortEntry != null) {
283             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
284             if(elapsed > transactionCommitTimeout) {
285                 LOG.warning("Current transaction {} has timed out after {} ms - aborting",
286                         cohortEntry.getTransactionID(), transactionCommitTimeout);
287
288                 doAbortTransaction(cohortEntry.getTransactionID(), null);
289             }
290         }
291     }
292
293     private void handleCommitTransaction(final CommitTransaction commit) {
294         final String transactionID = commit.getTransactionID();
295
296         LOG.debug("Committing transaction {}", transactionID);
297
298         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
299         // this transaction.
300         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
301         if(cohortEntry == null) {
302             // We're not the current Tx - the Tx was likely expired b/c it took too long in
303             // between the canCommit and commit messages.
304             IllegalStateException ex = new IllegalStateException(
305                     String.format("Cannot commit transaction %s - it is not the current transaction",
306                             transactionID));
307             LOG.error(ex.getMessage());
308             shardMBean.incrementFailedTransactionsCount();
309             getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
310             return;
311         }
312
313         // We perform the preCommit phase here atomically with the commit phase. This is an
314         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
315         // coordination of preCommit across shards in case of failure but preCommit should not
316         // normally fail since we ensure only one concurrent 3-phase commit.
317
318         try {
319             // We block on the future here so we don't have to worry about possibly accessing our
320             // state on a different thread outside of our dispatcher. Also, the data store
321             // currently uses a same thread executor anyway.
322             cohortEntry.getCohort().preCommit().get();
323
324             Shard.this.persistData(getSender(), transactionID,
325                     new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
326         } catch (InterruptedException | ExecutionException e) {
327             LOG.error(e, "An exception occurred while preCommitting transaction {}",
328                     cohortEntry.getTransactionID());
329             shardMBean.incrementFailedTransactionsCount();
330             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
331         }
332
333         cohortEntry.updateLastAccessTime();
334     }
335
336     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
337         // With persistence enabled, this method is called via applyState by the leader strategy
338         // after the commit has been replicated to a majority of the followers.
339
340         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
341         if(cohortEntry == null) {
342             // The transaction is no longer the current commit. This can happen if the transaction
343             // was aborted prior, most likely due to timeout in the front-end. We need to finish
344             // committing the transaction though since it was successfully persisted and replicated
345             // however we can't use the original cohort b/c it was already preCommitted and may
346             // conflict with the current commit or may have been aborted so we commit with a new
347             // transaction.
348             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
349             if(cohortEntry != null) {
350                 commitWithNewTransaction(cohortEntry.getModification());
351                 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
352             } else {
353                 // This really shouldn't happen - it likely means that persistence or replication
354                 // took so long to complete such that the cohort entry was expired from the cache.
355                 IllegalStateException ex = new IllegalStateException(
356                         String.format("Could not finish committing transaction %s - no CohortEntry found",
357                                 transactionID));
358                 LOG.error(ex.getMessage());
359                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
360             }
361
362             return;
363         }
364
365         LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
366
367         try {
368             // We block on the future here so we don't have to worry about possibly accessing our
369             // state on a different thread outside of our dispatcher. Also, the data store
370             // currently uses a same thread executor anyway.
371             cohortEntry.getCohort().commit().get();
372
373             sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
374
375             shardMBean.incrementCommittedTransactionCount();
376             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
377
378         } catch (InterruptedException | ExecutionException e) {
379             sender.tell(new akka.actor.Status.Failure(e), getSelf());
380
381             LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
382             shardMBean.incrementFailedTransactionsCount();
383         }
384
385         commitCoordinator.currentTransactionComplete(transactionID, true);
386     }
387
388     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
389         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
390         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
391     }
392
393     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
394         LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
395                 ready.getTxnClientVersion());
396
397         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
398         // commitCoordinator in preparation for the subsequent three phase commit initiated by
399         // the front-end.
400         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
401                 ready.getModification());
402
403         // Return our actor path as we'll handle the three phase commit, except if the Tx client
404         // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
405         // node. In that case, the subsequent 3-phase commit messages won't contain the
406         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
407         // to provide the compatible behavior.
408         ActorRef replyActorPath = self();
409         if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
410             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
411             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
412                     ready.getTransactionID()));
413         }
414
415         ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
416                 Serialization.serializedActorPath(replyActorPath));
417         getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
418                 readyTransactionReply, getSelf());
419     }
420
421     private void handleAbortTransaction(final AbortTransaction abort) {
422         doAbortTransaction(abort.getTransactionID(), getSender());
423     }
424
425     private void doAbortTransaction(final String transactionID, final ActorRef sender) {
426         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
427         if(cohortEntry != null) {
428             LOG.debug("Aborting transaction {}", transactionID);
429
430             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
431             // aborted during replication in which case we may still commit locally if replication
432             // succeeds.
433             commitCoordinator.currentTransactionComplete(transactionID, false);
434
435             final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
436             final ActorRef self = getSelf();
437
438             Futures.addCallback(future, new FutureCallback<Void>() {
439                 @Override
440                 public void onSuccess(final Void v) {
441                     shardMBean.incrementAbortTransactionsCount();
442
443                     if(sender != null) {
444                         sender.tell(new AbortTransactionReply().toSerializable(), self);
445                     }
446                 }
447
448                 @Override
449                 public void onFailure(final Throwable t) {
450                     LOG.error(t, "An exception happened during abort");
451
452                     if(sender != null) {
453                         sender.tell(new akka.actor.Status.Failure(t), self);
454                     }
455                 }
456             });
457         }
458     }
459
460     private void handleCreateTransaction(final Object message) {
461         if (isLeader()) {
462             createTransaction(CreateTransaction.fromSerializable(message));
463         } else if (getLeader() != null) {
464             getLeader().forward(message, getContext());
465         } else {
466             getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
467                 "Could not find shard leader so transaction cannot be created. This typically happens" +
468                 " when the system is coming up or recovering and a leader is being elected. Try again" +
469                 " later.")), getSelf());
470         }
471     }
472
473     private void handleReadDataReply(final Object message) {
474         // This must be for install snapshot. Don't want to open this up and trigger
475         // deSerialization
476
477         self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
478                 self());
479
480         createSnapshotTransaction = null;
481
482         // Send a PoisonPill instead of sending close transaction because we do not really need
483         // a response
484         getSender().tell(PoisonPill.getInstance(), self());
485     }
486
487     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
488         DOMStoreTransactionChain chain =
489             transactionChains.remove(closeTransactionChain.getTransactionChainId());
490
491         if(chain != null) {
492             chain.close();
493         }
494     }
495
496     private ActorRef createTypedTransactionActor(int transactionType,
497             ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
498
499         DOMStoreTransactionFactory factory = store;
500
501         if(!transactionChainId.isEmpty()) {
502             factory = transactionChains.get(transactionChainId);
503             if(factory == null){
504                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
505                 transactionChains.put(transactionChainId, transactionChain);
506                 factory = transactionChain;
507             }
508         }
509
510         if(this.schemaContext == null) {
511             throw new IllegalStateException("SchemaContext is not set");
512         }
513
514         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
515
516             shardMBean.incrementReadOnlyTransactionCount();
517
518             return getContext().actorOf(
519                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
520                         schemaContext,datastoreContext, shardMBean,
521                         transactionId.getRemoteTransactionId(), clientVersion),
522                         transactionId.toString());
523
524         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
525
526             shardMBean.incrementReadWriteTransactionCount();
527
528             return getContext().actorOf(
529                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
530                         schemaContext, datastoreContext, shardMBean,
531                         transactionId.getRemoteTransactionId(), clientVersion),
532                         transactionId.toString());
533
534
535         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
536
537             shardMBean.incrementWriteOnlyTransactionCount();
538
539             return getContext().actorOf(
540                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
541                         schemaContext, datastoreContext, shardMBean,
542                         transactionId.getRemoteTransactionId(), clientVersion),
543                         transactionId.toString());
544         } else {
545             throw new IllegalArgumentException(
546                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
547                     + transactionType);
548         }
549     }
550
551     private void createTransaction(CreateTransaction createTransaction) {
552         try {
553             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
554                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
555                 createTransaction.getVersion());
556
557             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
558                     createTransaction.getTransactionId()).toSerializable(), getSelf());
559         } catch (Exception e) {
560             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
561         }
562     }
563
564     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
565             String transactionChainId, int clientVersion) {
566
567         ShardTransactionIdentifier transactionId =
568             ShardTransactionIdentifier.builder()
569                 .remoteTransactionId(remoteTransactionId)
570                 .build();
571
572         if(LOG.isDebugEnabled()) {
573             LOG.debug("Creating transaction : {} ", transactionId);
574         }
575
576         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
577                 transactionChainId, clientVersion);
578
579         return transactionActor;
580     }
581
582     private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
583         throws ExecutionException, InterruptedException {
584         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
585         commitCohort.preCommit().get();
586         commitCohort.commit().get();
587     }
588
589     private void commitWithNewTransaction(final Modification modification) {
590         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
591         modification.apply(tx);
592         try {
593             syncCommitTransaction(tx);
594             shardMBean.incrementCommittedTransactionCount();
595             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
596         } catch (InterruptedException | ExecutionException e) {
597             shardMBean.incrementFailedTransactionsCount();
598             LOG.error(e, "Failed to commit");
599         }
600     }
601
602     private void updateSchemaContext(final UpdateSchemaContext message) {
603         this.schemaContext = message.getSchemaContext();
604         updateSchemaContext(message.getSchemaContext());
605         store.onGlobalContextUpdated(message.getSchemaContext());
606     }
607
608     @VisibleForTesting
609     void updateSchemaContext(final SchemaContext schemaContext) {
610         store.onGlobalContextUpdated(schemaContext);
611     }
612
613     private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
614
615         LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
616
617         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
618                                                      NormalizedNode<?, ?>>> registration;
619         if(isLeader()) {
620             registration = doChangeListenerRegistration(registerChangeListener);
621         } else {
622             LOG.debug("Shard is not the leader - delaying registration");
623
624             DelayedListenerRegistration delayedReg =
625                     new DelayedListenerRegistration(registerChangeListener);
626             delayedListenerRegistrations.add(delayedReg);
627             registration = delayedReg;
628         }
629
630         ActorRef listenerRegistration = getContext().actorOf(
631                 DataChangeListenerRegistration.props(registration));
632
633         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
634                     listenerRegistration.path());
635
636         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
637     }
638
639     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
640                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
641             final RegisterChangeListener registerChangeListener) {
642
643         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
644                 registerChangeListener.getDataChangeListenerPath());
645
646         // Notify the listener if notifications should be enabled or not
647         // If this shard is the leader then it will enable notifications else
648         // it will not
649         dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
650
651         // Now store a reference to the data change listener so it can be notified
652         // at a later point if notifications should be enabled or disabled
653         dataChangeListeners.add(dataChangeListenerPath);
654
655         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
656                 new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
657
658         LOG.debug("Registering for path {}", registerChangeListener.getPath());
659
660         return store.registerChangeListener(registerChangeListener.getPath(), listener,
661                 registerChangeListener.getScope());
662     }
663
664     private boolean isMetricsCaptureEnabled(){
665         CommonConfig config = new CommonConfig(getContext().system().settings().config());
666         return config.isMetricCaptureEnabled();
667     }
668
669     @Override
670     protected
671     void startLogRecoveryBatch(final int maxBatchSize) {
672         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
673
674         if(LOG.isDebugEnabled()) {
675             LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
676         }
677     }
678
679     @Override
680     protected void appendRecoveredLogEntry(final Payload data) {
681         if (data instanceof CompositeModificationPayload) {
682             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
683         } else if (data instanceof CompositeModificationByteStringPayload) {
684             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
685         } else {
686             LOG.error("Unknown state received {} during recovery", data);
687         }
688     }
689
690     @Override
691     protected void applyRecoverySnapshot(final ByteString snapshot) {
692         if(recoveryCoordinator == null) {
693             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
694         }
695
696         recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction());
697
698         if(LOG.isDebugEnabled()) {
699             LOG.debug("{} : submitted recovery sbapshot", persistenceId());
700         }
701     }
702
703     @Override
704     protected void applyCurrentLogRecoveryBatch() {
705         if(recoveryCoordinator == null) {
706             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
707         }
708
709         recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
710
711         if(LOG.isDebugEnabled()) {
712             LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(),
713                     currentLogRecoveryBatch.size());
714         }
715     }
716
717     @Override
718     protected void onRecoveryComplete() {
719         if(recoveryCoordinator != null) {
720             Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
721
722             if(LOG.isDebugEnabled()) {
723                 LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size());
724             }
725
726             for(DOMStoreWriteTransaction tx: txList) {
727                 try {
728                     syncCommitTransaction(tx);
729                     shardMBean.incrementCommittedTransactionCount();
730                 } catch (InterruptedException | ExecutionException e) {
731                     shardMBean.incrementFailedTransactionsCount();
732                     LOG.error(e, "Failed to commit");
733                 }
734             }
735         }
736
737         recoveryCoordinator = null;
738         currentLogRecoveryBatch = null;
739         updateJournalStats();
740
741         //notify shard manager
742         getContext().parent().tell(new ActorInitialized(), getSelf());
743
744         // Being paranoid here - this method should only be called once but just in case...
745         if(txCommitTimeoutCheckSchedule == null) {
746             // Schedule a message to be periodically sent to check if the current in-progress
747             // transaction should be expired and aborted.
748             FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
749             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
750                     period, period, getSelf(),
751                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
752         }
753     }
754
755     @Override
756     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
757
758         if (data instanceof CompositeModificationPayload) {
759             Object modification = ((CompositeModificationPayload) data).getModification();
760
761             applyModificationToState(clientActor, identifier, modification);
762         } else if(data instanceof CompositeModificationByteStringPayload ){
763             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
764
765             applyModificationToState(clientActor, identifier, modification);
766
767         } else {
768             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
769                     data, data.getClass().getClassLoader(),
770                     CompositeModificationPayload.class.getClassLoader());
771         }
772
773         updateJournalStats();
774
775     }
776
777     private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
778         if(modification == null) {
779             LOG.error(
780                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
781                     identifier, clientActor != null ? clientActor.path().toString() : null);
782         } else if(clientActor == null) {
783             // There's no clientActor to which to send a commit reply so we must be applying
784             // replicated state from the leader.
785             commitWithNewTransaction(MutableCompositeModification.fromSerializable(
786                     modification, schemaContext));
787         } else {
788             // This must be the OK to commit after replication consensus.
789             finishCommit(clientActor, identifier);
790         }
791     }
792
793     private void updateJournalStats() {
794         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
795
796         if (lastLogEntry != null) {
797             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
798             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
799         }
800
801         shardMBean.setCommitIndex(getCommitIndex());
802         shardMBean.setLastApplied(getLastApplied());
803         shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
804     }
805
806     @Override
807     protected void createSnapshot() {
808         if (createSnapshotTransaction == null) {
809
810             // Create a transaction. We are really going to treat the transaction as a worker
811             // so that this actor does not get block building the snapshot
812             createSnapshotTransaction = createTransaction(
813                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
814                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
815                 CreateTransaction.CURRENT_VERSION);
816
817             createSnapshotTransaction.tell(
818                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
819
820         }
821     }
822
823     @VisibleForTesting
824     @Override
825     protected void applySnapshot(final ByteString snapshot) {
826         // Since this will be done only on Recovery or when this actor is a Follower
827         // we can safely commit everything in here. We not need to worry about event notifications
828         // as they would have already been disabled on the follower
829
830         LOG.info("Applying snapshot");
831         try {
832             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
833             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
834             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
835                 .decode(serializedNode);
836
837             // delete everything first
838             transaction.delete(YangInstanceIdentifier.builder().build());
839
840             // Add everything from the remote node back
841             transaction.write(YangInstanceIdentifier.builder().build(), node);
842             syncCommitTransaction(transaction);
843         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
844             LOG.error(e, "An exception occurred when applying snapshot");
845         } finally {
846             LOG.info("Done applying snapshot");
847         }
848     }
849
850     @Override
851     protected void onStateChanged() {
852         boolean isLeader = isLeader();
853         for (ActorSelection dataChangeListener : dataChangeListeners) {
854             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
855         }
856
857         if(isLeader) {
858             for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
859                 if(!reg.isClosed()) {
860                     reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
861                 }
862             }
863
864             delayedListenerRegistrations.clear();
865         }
866
867         shardMBean.setRaftState(getRaftState().name());
868         shardMBean.setCurrentTerm(getCurrentTerm());
869
870         // If this actor is no longer the leader close all the transaction chains
871         if(!isLeader){
872             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
873                 if(LOG.isDebugEnabled()) {
874                     LOG.debug(
875                         "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
876                         entry.getKey(), getId());
877                 }
878                 entry.getValue().close();
879             }
880
881             transactionChains.clear();
882         }
883     }
884
885     @Override
886     protected DataPersistenceProvider persistence() {
887         return dataPersistenceProvider;
888     }
889
890     @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
891         shardMBean.setLeader(newLeader);
892     }
893
894     @Override public String persistenceId() {
895         return this.name.toString();
896     }
897
898     @VisibleForTesting
899     DataPersistenceProvider getDataPersistenceProvider() {
900         return dataPersistenceProvider;
901     }
902
903     private static class ShardCreator implements Creator<Shard> {
904
905         private static final long serialVersionUID = 1L;
906
907         final ShardIdentifier name;
908         final Map<ShardIdentifier, String> peerAddresses;
909         final DatastoreContext datastoreContext;
910         final SchemaContext schemaContext;
911
912         ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
913                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
914             this.name = name;
915             this.peerAddresses = peerAddresses;
916             this.datastoreContext = datastoreContext;
917             this.schemaContext = schemaContext;
918         }
919
920         @Override
921         public Shard create() throws Exception {
922             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
923         }
924     }
925
926     @VisibleForTesting
927     InMemoryDOMDataStore getDataStore() {
928         return store;
929     }
930
931     @VisibleForTesting
932     ShardStats getShardMBean() {
933         return shardMBean;
934     }
935
936     private static class DelayedListenerRegistration implements
937         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
938
939         private volatile boolean closed;
940
941         private final RegisterChangeListener registerChangeListener;
942
943         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
944                                                              NormalizedNode<?, ?>>> delegate;
945
946         DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
947             this.registerChangeListener = registerChangeListener;
948         }
949
950         void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
951                                             NormalizedNode<?, ?>>> registration) {
952             this.delegate = registration;
953         }
954
955         boolean isClosed() {
956             return closed;
957         }
958
959         RegisterChangeListener getRegisterChangeListener() {
960             return registerChangeListener;
961         }
962
963         @Override
964         public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
965             return delegate != null ? delegate.getInstance() : null;
966         }
967
968         @Override
969         public void close() {
970             closed = true;
971             if(delegate != null) {
972                 delegate.close();
973             }
974         }
975     }
976 }