Implement creating and applying of snapshot for a shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import akka.persistence.RecoveryFailure;
19 import akka.serialization.Serialization;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.protobuf.ByteString;
28 import com.google.protobuf.InvalidProtocolBufferException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
32 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
39 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
44 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.controller.cluster.datastore.modification.Modification;
47 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.raft.ConfigParams;
50 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
51 import org.opendaylight.controller.cluster.raft.RaftActor;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
57 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
58 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
60 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import scala.concurrent.duration.FiniteDuration;
68
69 import java.util.ArrayList;
70 import java.util.Date;
71 import java.util.HashMap;
72 import java.util.List;
73 import java.util.Map;
74 import java.util.concurrent.ExecutionException;
75 import java.util.concurrent.TimeUnit;
76
77 /**
78  * A Shard represents a portion of the logical data tree <br/>
79  * <p>
80  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
81  * </p>
82  */
83 public class Shard extends RaftActor {
84
85     private static final ConfigParams configParams = new ShardConfigParams();
86
87     public static final String DEFAULT_NAME = "default";
88
89     // The state of this Shard
90     private final InMemoryDOMDataStore store;
91
92     private final Map<Object, DOMStoreThreePhaseCommitCohort>
93         modificationToCohort = new HashMap<>();
94
95     private final LoggingAdapter LOG =
96         Logging.getLogger(getContext().system(), this);
97
98     // By default persistent will be true and can be turned off using the system
99     // property shard.persistent
100     private final boolean persistent;
101
102     /// The name of this shard
103     private final ShardIdentifier name;
104
105     private final ShardStats shardMBean;
106
107     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
108
109     private final DatastoreContext datastoreContext;
110
111
112     private SchemaContext schemaContext;
113
114     private ActorRef createSnapshotTransaction;
115
116     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
117             DatastoreContext datastoreContext, SchemaContext schemaContext) {
118         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
119
120         this.name = name;
121         this.datastoreContext = datastoreContext;
122         this.schemaContext = schemaContext;
123
124         String setting = System.getProperty("shard.persistent");
125
126         this.persistent = !"false".equals(setting);
127
128         LOG.info("Shard created : {} persistent : {}", name, persistent);
129
130         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
131                 datastoreContext.getDataStoreProperties());
132
133         if(schemaContext != null) {
134             store.onGlobalContextUpdated(schemaContext);
135         }
136
137         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
138
139
140     }
141
142     private static Map<String, String> mapPeerAddresses(
143         Map<ShardIdentifier, String> peerAddresses) {
144         Map<String, String> map = new HashMap<>();
145
146         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
147             .entrySet()) {
148             map.put(entry.getKey().toString(), entry.getValue());
149         }
150
151         return map;
152     }
153
154     public static Props props(final ShardIdentifier name,
155         final Map<ShardIdentifier, String> peerAddresses,
156         DatastoreContext datastoreContext, SchemaContext schemaContext) {
157         Preconditions.checkNotNull(name, "name should not be null");
158         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
159         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
160         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
161
162         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
163     }
164
165     @Override public void onReceiveRecover(Object message) {
166         LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
167             getSender());
168
169         if (message instanceof RecoveryFailure){
170             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
171         } else {
172             super.onReceiveRecover(message);
173         }
174     }
175
176     @Override public void onReceiveCommand(Object message) {
177         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
178             getSender());
179
180         if (message.getClass()
181             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
182             if (isLeader()) {
183                 createTransactionChain();
184             } else if (getLeader() != null) {
185                 getLeader().forward(message, getContext());
186             }
187         } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
188             // This must be for install snapshot. Don't want to open this up and trigger
189             // deSerialization
190             self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
191
192             // Send a PoisonPill instead of sending close transaction because we do not really need
193             // a response
194             getSender().tell(PoisonPill.getInstance(), self());
195
196         } else if (message instanceof RegisterChangeListener) {
197             registerChangeListener((RegisterChangeListener) message);
198         } else if (message instanceof UpdateSchemaContext) {
199             updateSchemaContext((UpdateSchemaContext) message);
200         } else if (message instanceof ForwardedCommitTransaction) {
201             handleForwardedCommit((ForwardedCommitTransaction) message);
202         } else if (message.getClass()
203             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
204             if (isLeader()) {
205                 createTransaction(CreateTransaction.fromSerializable(message));
206             } else if (getLeader() != null) {
207                 getLeader().forward(message, getContext());
208             }
209         } else if (message instanceof PeerAddressResolved) {
210             PeerAddressResolved resolved = (PeerAddressResolved) message;
211             setPeerAddress(resolved.getPeerId().toString(),
212                 resolved.getPeerAddress());
213         } else {
214             super.onReceiveCommand(message);
215         }
216     }
217
218     private ActorRef createTypedTransactionActor(
219         int transactionType,
220         ShardTransactionIdentifier transactionId) {
221
222         if(this.schemaContext == null){
223             throw new NullPointerException("schemaContext should not be null");
224         }
225
226         if (transactionType
227             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
228
229             shardMBean.incrementReadOnlyTransactionCount();
230
231             return getContext().actorOf(
232                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
233                         schemaContext,datastoreContext, name.toString()), transactionId.toString());
234
235         } else if (transactionType
236             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
237
238             shardMBean.incrementReadWriteTransactionCount();
239
240             return getContext().actorOf(
241                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
242                         schemaContext, datastoreContext,name.toString()), transactionId.toString());
243
244
245         } else if (transactionType
246             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
247
248             shardMBean.incrementWriteOnlyTransactionCount();
249
250             return getContext().actorOf(
251                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
252                         schemaContext, datastoreContext, name.toString()), transactionId.toString());
253         } else {
254             throw new IllegalArgumentException(
255                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
256                     + transactionType);
257         }
258     }
259
260     private void createTransaction(CreateTransaction createTransaction) {
261         createTransaction(createTransaction.getTransactionType(),
262             createTransaction.getTransactionId());
263     }
264
265     private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
266
267         ShardTransactionIdentifier transactionId =
268             ShardTransactionIdentifier.builder()
269                 .remoteTransactionId(remoteTransactionId)
270                 .build();
271         LOG.debug("Creating transaction : {} ", transactionId);
272         ActorRef transactionActor =
273             createTypedTransactionActor(transactionType, transactionId);
274
275         getSender()
276             .tell(new CreateTransactionReply(
277                     Serialization.serializedActorPath(transactionActor),
278                     remoteTransactionId).toSerializable(),
279                 getSelf());
280
281         return transactionActor;
282     }
283
284     private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
285         throws ExecutionException, InterruptedException {
286         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
287         commitCohort.preCommit().get();
288         commitCohort.commit().get();
289     }
290
291
292     private void commit(final ActorRef sender, Object serialized) {
293         Modification modification = MutableCompositeModification
294             .fromSerializable(serialized, schemaContext);
295         DOMStoreThreePhaseCommitCohort cohort =
296             modificationToCohort.remove(serialized);
297         if (cohort == null) {
298             LOG.debug(
299                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
300                 modification);
301             DOMStoreWriteTransaction transaction =
302                 store.newWriteOnlyTransaction();
303             modification.apply(transaction);
304             try {
305                 syncCommitTransaction(transaction);
306             } catch (InterruptedException | ExecutionException e) {
307                 shardMBean.incrementFailedTransactionsCount();
308                 LOG.error("Failed to commit", e);
309                 return;
310             }
311             //we want to just apply the recovery commit and return
312             shardMBean.incrementCommittedTransactionCount();
313             return;
314         }
315
316         final ListenableFuture<Void> future = cohort.commit();
317         final ActorRef self = getSelf();
318
319         Futures.addCallback(future, new FutureCallback<Void>() {
320             @Override
321             public void onSuccess(Void v) {
322                 sender.tell(new CommitTransactionReply().toSerializable(), self);
323                 shardMBean.incrementCommittedTransactionCount();
324                 shardMBean.setLastCommittedTransactionTime(new Date());
325             }
326
327             @Override
328             public void onFailure(Throwable t) {
329                 LOG.error(t, "An exception happened during commit");
330                 shardMBean.incrementFailedTransactionsCount();
331                 sender.tell(new akka.actor.Status.Failure(t), self);
332             }
333         });
334
335     }
336
337     private void handleForwardedCommit(ForwardedCommitTransaction message) {
338         Object serializedModification =
339             message.getModification().toSerializable();
340
341         modificationToCohort
342             .put(serializedModification, message.getCohort());
343
344         if (persistent) {
345             this.persistData(getSender(), "identifier",
346                 new CompositeModificationPayload(serializedModification));
347         } else {
348             this.commit(getSender(), serializedModification);
349         }
350     }
351
352     private void updateSchemaContext(UpdateSchemaContext message) {
353         this.schemaContext = message.getSchemaContext();
354         updateSchemaContext(message.getSchemaContext());
355         store.onGlobalContextUpdated(message.getSchemaContext());
356     }
357
358     @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
359         store.onGlobalContextUpdated(schemaContext);
360     }
361
362     private void registerChangeListener(
363         RegisterChangeListener registerChangeListener) {
364
365         LOG.debug("registerDataChangeListener for {}", registerChangeListener
366             .getPath());
367
368
369         ActorSelection dataChangeListenerPath = getContext()
370             .system().actorSelection(
371                 registerChangeListener.getDataChangeListenerPath());
372
373
374         // Notify the listener if notifications should be enabled or not
375         // If this shard is the leader then it will enable notifications else
376         // it will not
377         dataChangeListenerPath
378             .tell(new EnableNotification(isLeader()), getSelf());
379
380         // Now store a reference to the data change listener so it can be notified
381         // at a later point if notifications should be enabled or disabled
382         dataChangeListeners.add(dataChangeListenerPath);
383
384         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
385             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
386
387         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
388             registration = store.registerChangeListener(registerChangeListener.getPath(),
389                 listener, registerChangeListener.getScope());
390         ActorRef listenerRegistration =
391             getContext().actorOf(
392                 DataChangeListenerRegistration.props(registration));
393
394         LOG.debug(
395             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
396             , listenerRegistration.path().toString());
397
398         getSender()
399             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
400                 getSelf());
401     }
402
403     private void createTransactionChain() {
404         DOMStoreTransactionChain chain = store.createTransactionChain();
405         ActorRef transactionChain = getContext().actorOf(
406                 ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
407         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
408             getSelf());
409     }
410
411     @Override protected void applyState(ActorRef clientActor, String identifier,
412         Object data) {
413
414         if (data instanceof CompositeModificationPayload) {
415             Object modification =
416                 ((CompositeModificationPayload) data).getModification();
417
418             if (modification != null) {
419                 commit(clientActor, modification);
420             } else {
421                 LOG.error(
422                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
423                     identifier, clientActor.path().toString());
424             }
425
426         } else {
427             LOG.error("Unknown state received {}", data);
428         }
429
430         // Update stats
431         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
432
433         if (lastLogEntry != null) {
434             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
435             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
436         }
437
438         shardMBean.setCommitIndex(getCommitIndex());
439         shardMBean.setLastApplied(getLastApplied());
440
441     }
442
443     @Override protected void createSnapshot() {
444         if (createSnapshotTransaction == null) {
445
446             // Create a transaction. We are really going to treat the transaction as a worker
447             // so that this actor does not get block building the snapshot
448             createSnapshotTransaction = createTransaction(
449                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
450                 "createSnapshot");
451
452             createSnapshotTransaction.tell(
453                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
454
455         }
456     }
457
458     @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
459         // Since this will be done only on Recovery or when this actor is a Follower
460         // we can safely commit everything in here. We not need to worry about event notifications
461         // as they would have already been disabled on the follower
462         try {
463             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
464             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
465             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
466                 .decode(YangInstanceIdentifier.builder().build(), serializedNode);
467
468             // delete everything first
469             transaction.delete(YangInstanceIdentifier.builder().build());
470
471             // Add everything from the remote node back
472             transaction.write(YangInstanceIdentifier.builder().build(), node);
473             syncCommitTransaction(transaction);
474         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
475             LOG.error(e, "An exception occurred when applying snapshot");
476         }
477     }
478
479     @Override protected void onStateChanged() {
480         for (ActorSelection dataChangeListener : dataChangeListeners) {
481             dataChangeListener
482                 .tell(new EnableNotification(isLeader()), getSelf());
483         }
484
485         if (getLeaderId() != null) {
486             shardMBean.setLeader(getLeaderId());
487         }
488
489         shardMBean.setRaftState(getRaftState().name());
490         shardMBean.setCurrentTerm(getCurrentTerm());
491     }
492
493     @Override public String persistenceId() {
494         return this.name.toString();
495     }
496
497
498     private static class ShardConfigParams extends DefaultConfigParamsImpl {
499         public static final FiniteDuration HEART_BEAT_INTERVAL =
500             new FiniteDuration(500, TimeUnit.MILLISECONDS);
501
502         @Override public FiniteDuration getHeartBeatInterval() {
503             return HEART_BEAT_INTERVAL;
504         }
505     }
506
507     private static class ShardCreator implements Creator<Shard> {
508
509         private static final long serialVersionUID = 1L;
510
511         final ShardIdentifier name;
512         final Map<ShardIdentifier, String> peerAddresses;
513         final DatastoreContext datastoreContext;
514         final SchemaContext schemaContext;
515
516         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
517                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
518             this.name = name;
519             this.peerAddresses = peerAddresses;
520             this.datastoreContext = datastoreContext;
521             this.schemaContext = schemaContext;
522         }
523
524         @Override
525         public Shard create() throws Exception {
526             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
527         }
528     }
529
530     @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
531         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
532
533         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
534             transaction.read(YangInstanceIdentifier.builder().build());
535
536         NormalizedNode<?, ?> node = future.get().get();
537
538         transaction.close();
539
540         return node;
541     }
542
543     @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
544         throws ExecutionException, InterruptedException {
545         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
546
547         transaction.write(id, node);
548
549         syncCommitTransaction(transaction);
550     }
551
552 }