Merge "BUG 1623 - Clustering : Parsing Error thrown on startup"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.RecoveryFailure;
18 import akka.serialization.Serialization;
19
20 import com.google.common.base.Optional;
21 import com.google.common.base.Preconditions;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.protobuf.ByteString;
26 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
33 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
36 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
38 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
39 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
40 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
41 import org.opendaylight.controller.cluster.datastore.modification.Modification;
42 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
43 import org.opendaylight.controller.cluster.raft.ConfigParams;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.RaftActor;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
49 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
53 import org.opendaylight.yangtools.concepts.ListenerRegistration;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57
58 import scala.concurrent.duration.FiniteDuration;
59
60 import java.util.ArrayList;
61 import java.util.Date;
62 import java.util.HashMap;
63 import java.util.List;
64 import java.util.Map;
65 import java.util.concurrent.ExecutionException;
66 import java.util.concurrent.TimeUnit;
67
68 /**
69  * A Shard represents a portion of the logical data tree <br/>
70  * <p>
71  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
72  * </p>
73  */
74 public class Shard extends RaftActor {
75
76     private static final ConfigParams configParams = new ShardConfigParams();
77
78     public static final String DEFAULT_NAME = "default";
79
80     // The state of this Shard
81     private final InMemoryDOMDataStore store;
82
83     private final Map<Object, DOMStoreThreePhaseCommitCohort>
84         modificationToCohort = new HashMap<>();
85
86     private final LoggingAdapter LOG =
87         Logging.getLogger(getContext().system(), this);
88
89     // By default persistent will be true and can be turned off using the system
90     // property shard.persistent
91     private final boolean persistent;
92
93     /// The name of this shard
94     private final ShardIdentifier name;
95
96     private final ShardStats shardMBean;
97
98     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
99
100     private final DatastoreContext datastoreContext;
101
102
103     private SchemaContext schemaContext;
104
105     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
106             DatastoreContext datastoreContext) {
107         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
108
109         this.name = name;
110         this.datastoreContext = datastoreContext;
111
112         String setting = System.getProperty("shard.persistent");
113
114         this.persistent = !"false".equals(setting);
115
116         LOG.info("Shard created : {} persistent : {}", name, persistent);
117
118         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
119                 datastoreContext.getDataStoreProperties());
120
121         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
122
123
124     }
125
126     private static Map<String, String> mapPeerAddresses(
127         Map<ShardIdentifier, String> peerAddresses) {
128         Map<String, String> map = new HashMap<>();
129
130         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
131             .entrySet()) {
132             map.put(entry.getKey().toString(), entry.getValue());
133         }
134
135         return map;
136     }
137
138     public static Props props(final ShardIdentifier name,
139         final Map<ShardIdentifier, String> peerAddresses,
140         DatastoreContext datastoreContext) {
141         Preconditions.checkNotNull(name, "name should not be null");
142         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
143         Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
144
145         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
146     }
147
148     @Override public void onReceiveRecover(Object message) {
149         LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
150             getSender());
151
152         if (message instanceof RecoveryFailure){
153             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
154         } else {
155             super.onReceiveRecover(message);
156         }
157     }
158
159     @Override public void onReceiveCommand(Object message) {
160         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
161             getSender());
162
163         if (message.getClass()
164             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
165             if (isLeader()) {
166                 createTransactionChain();
167             } else if (getLeader() != null) {
168                 getLeader().forward(message, getContext());
169             }
170         } else if (message instanceof RegisterChangeListener) {
171             registerChangeListener((RegisterChangeListener) message);
172         } else if (message instanceof UpdateSchemaContext) {
173             updateSchemaContext((UpdateSchemaContext) message);
174         } else if (message instanceof ForwardedCommitTransaction) {
175             handleForwardedCommit((ForwardedCommitTransaction) message);
176         } else if (message.getClass()
177             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
178             if (isLeader()) {
179                 createTransaction(CreateTransaction.fromSerializable(message));
180             } else if (getLeader() != null) {
181                 getLeader().forward(message, getContext());
182             }
183         } else if (message instanceof PeerAddressResolved) {
184             PeerAddressResolved resolved = (PeerAddressResolved) message;
185             setPeerAddress(resolved.getPeerId().toString(),
186                 resolved.getPeerAddress());
187         } else {
188             super.onReceiveCommand(message);
189         }
190     }
191
192     private ActorRef createTypedTransactionActor(
193         CreateTransaction createTransaction,
194         ShardTransactionIdentifier transactionId) {
195         if (createTransaction.getTransactionType()
196             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
197
198             shardMBean.incrementReadOnlyTransactionCount();
199
200             return getContext().actorOf(
201                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
202                         schemaContext,datastoreContext, name.toString()), transactionId.toString());
203
204         } else if (createTransaction.getTransactionType()
205             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
206
207             shardMBean.incrementReadWriteTransactionCount();
208
209             return getContext().actorOf(
210                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
211                         schemaContext, datastoreContext,name.toString()), transactionId.toString());
212
213
214         } else if (createTransaction.getTransactionType()
215             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
216
217             shardMBean.incrementWriteOnlyTransactionCount();
218
219             return getContext().actorOf(
220                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
221                         schemaContext, datastoreContext, name.toString()), transactionId.toString());
222         } else {
223             throw new IllegalArgumentException(
224                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
225                     + createTransaction.getTransactionType());
226         }
227     }
228
229     private void createTransaction(CreateTransaction createTransaction) {
230
231         ShardTransactionIdentifier transactionId =
232             ShardTransactionIdentifier.builder()
233                 .remoteTransactionId(createTransaction.getTransactionId())
234                 .build();
235         LOG.debug("Creating transaction : {} ", transactionId);
236         ActorRef transactionActor =
237             createTypedTransactionActor(createTransaction, transactionId);
238
239         getSender()
240             .tell(new CreateTransactionReply(
241                     Serialization.serializedActorPath(transactionActor),
242                     createTransaction.getTransactionId()).toSerializable(),
243                 getSelf()
244             );
245     }
246
247     private void commit(final ActorRef sender, Object serialized) {
248         Modification modification = MutableCompositeModification
249             .fromSerializable(serialized, schemaContext);
250         DOMStoreThreePhaseCommitCohort cohort =
251             modificationToCohort.remove(serialized);
252         if (cohort == null) {
253             LOG.debug(
254                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
255                 modification);
256             DOMStoreReadWriteTransaction transaction =
257                 store.newReadWriteTransaction();
258             modification.apply(transaction);
259             DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
260             ListenableFuture<Void> future =
261                 commitCohort.preCommit();
262             try {
263                 future.get();
264                 future = commitCohort.commit();
265                 future.get();
266             } catch (InterruptedException | ExecutionException e) {
267                 shardMBean.incrementFailedTransactionsCount();
268                 LOG.error("Failed to commit", e);
269                 return;
270             }
271             //we want to just apply the recovery commit and return
272             shardMBean.incrementCommittedTransactionCount();
273             return;
274         }
275
276         final ListenableFuture<Void> future = cohort.commit();
277         final ActorRef self = getSelf();
278
279         Futures.addCallback(future, new FutureCallback<Void>() {
280             @Override
281             public void onSuccess(Void v) {
282                 sender.tell(new CommitTransactionReply().toSerializable(), self);
283                 shardMBean.incrementCommittedTransactionCount();
284                 shardMBean.setLastCommittedTransactionTime(new Date());
285             }
286
287             @Override
288             public void onFailure(Throwable t) {
289                 LOG.error(t, "An exception happened during commit");
290                 shardMBean.incrementFailedTransactionsCount();
291                 sender.tell(new akka.actor.Status.Failure(t), self);
292             }
293         });
294
295     }
296
297     private void handleForwardedCommit(ForwardedCommitTransaction message) {
298         Object serializedModification =
299             message.getModification().toSerializable();
300
301         modificationToCohort
302             .put(serializedModification, message.getCohort());
303
304         if (persistent) {
305             this.persistData(getSender(), "identifier",
306                 new CompositeModificationPayload(serializedModification));
307         } else {
308             this.commit(getSender(), serializedModification);
309         }
310     }
311
312     private void updateSchemaContext(UpdateSchemaContext message) {
313         this.schemaContext = message.getSchemaContext();
314         store.onGlobalContextUpdated(message.getSchemaContext());
315     }
316
317     private void registerChangeListener(
318         RegisterChangeListener registerChangeListener) {
319
320         LOG.debug("registerDataChangeListener for {}", registerChangeListener
321             .getPath());
322
323
324         ActorSelection dataChangeListenerPath = getContext()
325             .system().actorSelection(
326                 registerChangeListener.getDataChangeListenerPath());
327
328
329         // Notify the listener if notifications should be enabled or not
330         // If this shard is the leader then it will enable notifications else
331         // it will not
332         dataChangeListenerPath
333             .tell(new EnableNotification(isLeader()), getSelf());
334
335         // Now store a reference to the data change listener so it can be notified
336         // at a later point if notifications should be enabled or disabled
337         dataChangeListeners.add(dataChangeListenerPath);
338
339         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
340             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
341
342         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
343             registration = store.registerChangeListener(registerChangeListener.getPath(),
344                 listener, registerChangeListener.getScope());
345         ActorRef listenerRegistration =
346             getContext().actorOf(
347                 DataChangeListenerRegistration.props(registration));
348
349         LOG.debug(
350             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
351             , listenerRegistration.path().toString());
352
353         getSender()
354             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
355                 getSelf());
356     }
357
358     private void createTransactionChain() {
359         DOMStoreTransactionChain chain = store.createTransactionChain();
360         ActorRef transactionChain = getContext().actorOf(
361                 ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
362         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
363                 getSelf());
364     }
365
366     @Override protected void applyState(ActorRef clientActor, String identifier,
367         Object data) {
368
369         if (data instanceof CompositeModificationPayload) {
370             Object modification =
371                 ((CompositeModificationPayload) data).getModification();
372
373             if (modification != null) {
374                 commit(clientActor, modification);
375             } else {
376                 LOG.error(
377                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
378                     identifier, clientActor.path().toString());
379             }
380
381         } else {
382             LOG.error("Unknown state received {}", data);
383         }
384
385         // Update stats
386         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
387
388         if (lastLogEntry != null) {
389             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
390             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
391         }
392
393         shardMBean.setCommitIndex(getCommitIndex());
394         shardMBean.setLastApplied(getLastApplied());
395
396     }
397
398     @Override protected void createSnapshot() {
399         throw new UnsupportedOperationException("createSnapshot");
400     }
401
402     @Override protected void applySnapshot(ByteString snapshot) {
403         throw new UnsupportedOperationException("applySnapshot");
404     }
405
406     @Override protected void onStateChanged() {
407         for (ActorSelection dataChangeListener : dataChangeListeners) {
408             dataChangeListener
409                 .tell(new EnableNotification(isLeader()), getSelf());
410         }
411
412         if (getLeaderId() != null) {
413             shardMBean.setLeader(getLeaderId());
414         }
415
416         shardMBean.setRaftState(getRaftState().name());
417         shardMBean.setCurrentTerm(getCurrentTerm());
418     }
419
420     @Override public String persistenceId() {
421         return this.name.toString();
422     }
423
424
425     private static class ShardConfigParams extends DefaultConfigParamsImpl {
426         public static final FiniteDuration HEART_BEAT_INTERVAL =
427             new FiniteDuration(500, TimeUnit.MILLISECONDS);
428
429         @Override public FiniteDuration getHeartBeatInterval() {
430             return HEART_BEAT_INTERVAL;
431         }
432     }
433
434     private static class ShardCreator implements Creator<Shard> {
435
436         private static final long serialVersionUID = 1L;
437
438         final ShardIdentifier name;
439         final Map<ShardIdentifier, String> peerAddresses;
440         final DatastoreContext datastoreContext;
441
442         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
443                 DatastoreContext datastoreContext) {
444             this.name = name;
445             this.peerAddresses = peerAddresses;
446             this.datastoreContext = datastoreContext;
447         }
448
449         @Override
450         public Shard create() throws Exception {
451             return new Shard(name, peerAddresses, datastoreContext);
452         }
453     }
454 }