Merge "BUG 1712 - Distributed DataStore does not work properly with Transaction...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import akka.persistence.RecoveryFailure;
19 import akka.serialization.Serialization;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.protobuf.ByteString;
28 import com.google.protobuf.InvalidProtocolBufferException;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
34 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
39 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
41 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
45 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
46 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
47 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
48 import org.opendaylight.controller.cluster.datastore.modification.Modification;
49 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
51 import org.opendaylight.controller.cluster.raft.ConfigParams;
52 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
53 import org.opendaylight.controller.cluster.raft.RaftActor;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
56 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
59 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
60 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
63 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
64 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
65 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
69 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 import scala.concurrent.duration.FiniteDuration;
71
72 import java.util.ArrayList;
73 import java.util.HashMap;
74 import java.util.List;
75 import java.util.Map;
76 import java.util.concurrent.ExecutionException;
77 import java.util.concurrent.TimeUnit;
78
79 /**
80  * A Shard represents a portion of the logical data tree <br/>
81  * <p>
82  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
83  * </p>
84  */
85 public class Shard extends RaftActor {
86
87     private static final ConfigParams configParams = new ShardConfigParams();
88
89     public static final String DEFAULT_NAME = "default";
90
91     // The state of this Shard
92     private final InMemoryDOMDataStore store;
93
94     private final Map<Object, DOMStoreThreePhaseCommitCohort>
95         modificationToCohort = new HashMap<>();
96
97     private final LoggingAdapter LOG =
98         Logging.getLogger(getContext().system(), this);
99
100     // By default persistent will be true and can be turned off using the system
101     // property shard.persistent
102     private final boolean persistent;
103
104     /// The name of this shard
105     private final ShardIdentifier name;
106
107     private final ShardStats shardMBean;
108
109     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
110
111     private final DatastoreContext datastoreContext;
112
113     private SchemaContext schemaContext;
114
115     private ActorRef createSnapshotTransaction;
116
117     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
118
119     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
120             DatastoreContext datastoreContext, SchemaContext schemaContext) {
121         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
122
123         this.name = name;
124         this.datastoreContext = datastoreContext;
125         this.schemaContext = schemaContext;
126
127         String setting = System.getProperty("shard.persistent");
128
129         this.persistent = !"false".equals(setting);
130
131         LOG.info("Shard created : {} persistent : {}", name, persistent);
132
133         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
134                 datastoreContext.getDataStoreProperties());
135
136         if(schemaContext != null) {
137             store.onGlobalContextUpdated(schemaContext);
138         }
139
140         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
141                 datastoreContext.getDataStoreMXBeanType());
142         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
143         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
144
145         if (isMetricsCaptureEnabled()) {
146             getContext().become(new MeteringBehavior(this));
147         }
148     }
149
150     private static Map<String, String> mapPeerAddresses(
151         Map<ShardIdentifier, String> peerAddresses) {
152         Map<String, String> map = new HashMap<>();
153
154         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
155             .entrySet()) {
156             map.put(entry.getKey().toString(), entry.getValue());
157         }
158
159         return map;
160     }
161
162     public static Props props(final ShardIdentifier name,
163         final Map<ShardIdentifier, String> peerAddresses,
164         DatastoreContext datastoreContext, SchemaContext schemaContext) {
165         Preconditions.checkNotNull(name, "name should not be null");
166         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
167         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
168         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
169
170         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
171     }
172
173     @Override public void onReceiveRecover(Object message) {
174         LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
175             getSender());
176
177         if (message instanceof RecoveryFailure){
178             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
179         } else {
180             super.onReceiveRecover(message);
181         }
182     }
183
184     @Override public void onReceiveCommand(Object message) {
185         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
186             getSender());
187
188         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
189             // This must be for install snapshot. Don't want to open this up and trigger
190             // deSerialization
191             self()
192                 .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
193                     self());
194
195             // Send a PoisonPill instead of sending close transaction because we do not really need
196             // a response
197             getSender().tell(PoisonPill.getInstance(), self());
198
199         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
200             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
201         } else if (message instanceof RegisterChangeListener) {
202             registerChangeListener((RegisterChangeListener) message);
203         } else if (message instanceof UpdateSchemaContext) {
204             updateSchemaContext((UpdateSchemaContext) message);
205         } else if (message instanceof ForwardedCommitTransaction) {
206             handleForwardedCommit((ForwardedCommitTransaction) message);
207         } else if (message.getClass()
208             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
209             if (isLeader()) {
210                 createTransaction(CreateTransaction.fromSerializable(message));
211             } else if (getLeader() != null) {
212                 getLeader().forward(message, getContext());
213             }
214         } else if (message instanceof PeerAddressResolved) {
215             PeerAddressResolved resolved = (PeerAddressResolved) message;
216             setPeerAddress(resolved.getPeerId().toString(),
217                 resolved.getPeerAddress());
218         } else {
219             super.onReceiveCommand(message);
220         }
221     }
222
223     private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
224         DOMStoreTransactionChain chain =
225             transactionChains.remove(closeTransactionChain.getTransactionChainId());
226
227         if(chain != null) {
228             chain.close();
229         }
230     }
231
232     private ActorRef createTypedTransactionActor(
233         int transactionType,
234         ShardTransactionIdentifier transactionId,
235         String transactionChainId ) {
236
237         DOMStoreTransactionFactory factory = store;
238
239         if(!transactionChainId.isEmpty()) {
240             factory = transactionChains.get(transactionChainId);
241             if(factory == null){
242                 DOMStoreTransactionChain transactionChain = store.createTransactionChain();
243                 transactionChains.put(transactionChainId, transactionChain);
244                 factory = transactionChain;
245             }
246         }
247
248         if(this.schemaContext == null){
249             throw new NullPointerException("schemaContext should not be null");
250         }
251
252         if (transactionType
253             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
254
255             shardMBean.incrementReadOnlyTransactionCount();
256
257             return getContext().actorOf(
258                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
259                         schemaContext,datastoreContext, shardMBean), transactionId.toString());
260
261         } else if (transactionType
262             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
263
264             shardMBean.incrementReadWriteTransactionCount();
265
266             return getContext().actorOf(
267                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
268                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
269
270
271         } else if (transactionType
272             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
273
274             shardMBean.incrementWriteOnlyTransactionCount();
275
276             return getContext().actorOf(
277                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
278                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
279         } else {
280             throw new IllegalArgumentException(
281                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
282                     + transactionType);
283         }
284     }
285
286     private void createTransaction(CreateTransaction createTransaction) {
287         createTransaction(createTransaction.getTransactionType(),
288             createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
289     }
290
291     private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
292
293         ShardTransactionIdentifier transactionId =
294             ShardTransactionIdentifier.builder()
295                 .remoteTransactionId(remoteTransactionId)
296                 .build();
297         LOG.debug("Creating transaction : {} ", transactionId);
298         ActorRef transactionActor =
299             createTypedTransactionActor(transactionType, transactionId, transactionChainId);
300
301         getSender()
302             .tell(new CreateTransactionReply(
303                     Serialization.serializedActorPath(transactionActor),
304                     remoteTransactionId).toSerializable(),
305                 getSelf());
306
307         return transactionActor;
308     }
309
310     private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
311         throws ExecutionException, InterruptedException {
312         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
313         commitCohort.preCommit().get();
314         commitCohort.commit().get();
315     }
316
317
318     private void commit(final ActorRef sender, Object serialized) {
319         Modification modification = MutableCompositeModification
320             .fromSerializable(serialized, schemaContext);
321         DOMStoreThreePhaseCommitCohort cohort =
322             modificationToCohort.remove(serialized);
323         if (cohort == null) {
324             LOG.debug(
325                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
326                 modification);
327             DOMStoreWriteTransaction transaction =
328                 store.newWriteOnlyTransaction();
329             modification.apply(transaction);
330             try {
331                 syncCommitTransaction(transaction);
332             } catch (InterruptedException | ExecutionException e) {
333                 shardMBean.incrementFailedTransactionsCount();
334                 LOG.error("Failed to commit", e);
335                 return;
336             }
337             //we want to just apply the recovery commit and return
338             shardMBean.incrementCommittedTransactionCount();
339             return;
340         }
341
342         final ListenableFuture<Void> future = cohort.commit();
343         final ActorRef self = getSelf();
344
345         Futures.addCallback(future, new FutureCallback<Void>() {
346             @Override
347             public void onSuccess(Void v) {
348                 sender.tell(new CommitTransactionReply().toSerializable(), self);
349                 shardMBean.incrementCommittedTransactionCount();
350                 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
351             }
352
353             @Override
354             public void onFailure(Throwable t) {
355                 LOG.error(t, "An exception happened during commit");
356                 shardMBean.incrementFailedTransactionsCount();
357                 sender.tell(new akka.actor.Status.Failure(t), self);
358             }
359         });
360
361     }
362
363     private void handleForwardedCommit(ForwardedCommitTransaction message) {
364         Object serializedModification =
365             message.getModification().toSerializable();
366
367         modificationToCohort
368             .put(serializedModification, message.getCohort());
369
370         if (persistent) {
371             this.persistData(getSender(), "identifier",
372                 new CompositeModificationPayload(serializedModification));
373         } else {
374             this.commit(getSender(), serializedModification);
375         }
376     }
377
378     private void updateSchemaContext(UpdateSchemaContext message) {
379         this.schemaContext = message.getSchemaContext();
380         updateSchemaContext(message.getSchemaContext());
381         store.onGlobalContextUpdated(message.getSchemaContext());
382     }
383
384     @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
385         store.onGlobalContextUpdated(schemaContext);
386     }
387
388     private void registerChangeListener(
389         RegisterChangeListener registerChangeListener) {
390
391         LOG.debug("registerDataChangeListener for {}", registerChangeListener
392             .getPath());
393
394
395         ActorSelection dataChangeListenerPath = getContext()
396             .system().actorSelection(
397                 registerChangeListener.getDataChangeListenerPath());
398
399
400         // Notify the listener if notifications should be enabled or not
401         // If this shard is the leader then it will enable notifications else
402         // it will not
403         dataChangeListenerPath
404             .tell(new EnableNotification(isLeader()), getSelf());
405
406         // Now store a reference to the data change listener so it can be notified
407         // at a later point if notifications should be enabled or disabled
408         dataChangeListeners.add(dataChangeListenerPath);
409
410         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
411             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
412
413         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
414             registration = store.registerChangeListener(registerChangeListener.getPath(),
415                 listener, registerChangeListener.getScope());
416         ActorRef listenerRegistration =
417             getContext().actorOf(
418                 DataChangeListenerRegistration.props(registration));
419
420         LOG.debug(
421             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
422             , listenerRegistration.path().toString());
423
424         getSender()
425             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
426                 getSelf());
427     }
428
429     private void createTransactionChain() {
430         DOMStoreTransactionChain chain = store.createTransactionChain();
431         ActorRef transactionChain = getContext().actorOf(
432                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
433         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
434                 getSelf());
435     }
436
437     private boolean isMetricsCaptureEnabled(){
438         CommonConfig config = new CommonConfig(getContext().system().settings().config());
439         return config.isMetricCaptureEnabled();
440     }
441
442     @Override protected void applyState(ActorRef clientActor, String identifier,
443         Object data) {
444
445         if (data instanceof CompositeModificationPayload) {
446             Object modification =
447                 ((CompositeModificationPayload) data).getModification();
448
449             if (modification != null) {
450                 commit(clientActor, modification);
451             } else {
452                 LOG.error(
453                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
454                     identifier, clientActor.path().toString());
455             }
456
457         } else {
458             LOG.error("Unknown state received {}", data);
459         }
460
461         // Update stats
462         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
463
464         if (lastLogEntry != null) {
465             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
466             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
467         }
468
469         shardMBean.setCommitIndex(getCommitIndex());
470         shardMBean.setLastApplied(getLastApplied());
471
472     }
473
474     @Override protected void createSnapshot() {
475         if (createSnapshotTransaction == null) {
476
477             // Create a transaction. We are really going to treat the transaction as a worker
478             // so that this actor does not get block building the snapshot
479             createSnapshotTransaction = createTransaction(
480                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
481                 "createSnapshot", "");
482
483             createSnapshotTransaction.tell(
484                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
485
486         }
487     }
488
489     @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
490         // Since this will be done only on Recovery or when this actor is a Follower
491         // we can safely commit everything in here. We not need to worry about event notifications
492         // as they would have already been disabled on the follower
493         try {
494             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
495             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
496             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
497                 .decode(YangInstanceIdentifier.builder().build(), serializedNode);
498
499             // delete everything first
500             transaction.delete(YangInstanceIdentifier.builder().build());
501
502             // Add everything from the remote node back
503             transaction.write(YangInstanceIdentifier.builder().build(), node);
504             syncCommitTransaction(transaction);
505         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
506             LOG.error(e, "An exception occurred when applying snapshot");
507         }
508     }
509
510     @Override protected void onStateChanged() {
511         for (ActorSelection dataChangeListener : dataChangeListeners) {
512             dataChangeListener
513                 .tell(new EnableNotification(isLeader()), getSelf());
514         }
515
516         if (getLeaderId() != null) {
517             shardMBean.setLeader(getLeaderId());
518         }
519
520         shardMBean.setRaftState(getRaftState().name());
521         shardMBean.setCurrentTerm(getCurrentTerm());
522
523         // If this actor is no longer the leader close all the transaction chains
524         if(!isLeader()){
525             for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
526                 LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
527                 entry.getValue().close();
528             }
529
530             transactionChains.clear();
531         }
532     }
533
534     @Override public String persistenceId() {
535         return this.name.toString();
536     }
537
538
539     private static class ShardConfigParams extends DefaultConfigParamsImpl {
540         public static final FiniteDuration HEART_BEAT_INTERVAL =
541             new FiniteDuration(500, TimeUnit.MILLISECONDS);
542
543         @Override public FiniteDuration getHeartBeatInterval() {
544             return HEART_BEAT_INTERVAL;
545         }
546     }
547
548     private static class ShardCreator implements Creator<Shard> {
549
550         private static final long serialVersionUID = 1L;
551
552         final ShardIdentifier name;
553         final Map<ShardIdentifier, String> peerAddresses;
554         final DatastoreContext datastoreContext;
555         final SchemaContext schemaContext;
556
557         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
558                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
559             this.name = name;
560             this.peerAddresses = peerAddresses;
561             this.datastoreContext = datastoreContext;
562             this.schemaContext = schemaContext;
563         }
564
565         @Override
566         public Shard create() throws Exception {
567             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
568         }
569     }
570
571     @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
572         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
573
574         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
575             transaction.read(YangInstanceIdentifier.builder().build());
576
577         NormalizedNode<?, ?> node = future.get().get();
578
579         transaction.close();
580
581         return node;
582     }
583
584     @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
585         throws ExecutionException, InterruptedException {
586         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
587
588         transaction.write(id, node);
589
590         syncCommitTransaction(transaction);
591     }
592
593 }