Merge "Candidate fix for dependency issue in odl-restconf feature"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.serialization.Serialization;
18
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
35 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
38 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
39 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.raft.ConfigParams;
43 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
44 import org.opendaylight.controller.cluster.raft.RaftActor;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
46 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
47 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56
57 import scala.concurrent.duration.FiniteDuration;
58
59 import java.util.ArrayList;
60 import java.util.Date;
61 import java.util.HashMap;
62 import java.util.List;
63 import java.util.Map;
64 import java.util.concurrent.ExecutionException;
65 import java.util.concurrent.TimeUnit;
66
67 /**
68  * A Shard represents a portion of the logical data tree <br/>
69  * <p>
70  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
71  * </p>
72  */
73 public class Shard extends RaftActor {
74
75     private static final ConfigParams configParams = new ShardConfigParams();
76
77     public static final String DEFAULT_NAME = "default";
78
79     // The state of this Shard
80     private final InMemoryDOMDataStore store;
81
82     private final Map<Object, DOMStoreThreePhaseCommitCohort>
83         modificationToCohort = new HashMap<>();
84
85     private final LoggingAdapter LOG =
86         Logging.getLogger(getContext().system(), this);
87
88     // By default persistent will be true and can be turned off using the system
89     // property shard.persistent
90     private final boolean persistent;
91
92     /// The name of this shard
93     private final ShardIdentifier name;
94
95     private final ShardStats shardMBean;
96
97     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
98
99     private final DatastoreContext datastoreContext;
100
101
102     private SchemaContext schemaContext;
103
104     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
105             DatastoreContext datastoreContext) {
106         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
107
108         this.name = name;
109         this.datastoreContext = datastoreContext;
110
111         String setting = System.getProperty("shard.persistent");
112
113         this.persistent = !"false".equals(setting);
114
115         LOG.info("Shard created : {} persistent : {}", name, persistent);
116
117         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
118                 datastoreContext.getDataStoreProperties());
119
120         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
121
122
123     }
124
125     private static Map<String, String> mapPeerAddresses(
126         Map<ShardIdentifier, String> peerAddresses) {
127         Map<String, String> map = new HashMap<>();
128
129         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
130             .entrySet()) {
131             map.put(entry.getKey().toString(), entry.getValue());
132         }
133
134         return map;
135     }
136
137     public static Props props(final ShardIdentifier name,
138         final Map<ShardIdentifier, String> peerAddresses,
139         DatastoreContext datastoreContext) {
140         Preconditions.checkNotNull(name, "name should not be null");
141         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
142         Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
143
144         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
145     }
146
147     @Override public void onReceiveCommand(Object message) {
148         LOG.debug("Received message {} from {}", message.getClass().toString(),
149             getSender());
150
151         if (message.getClass()
152             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
153             if (isLeader()) {
154                 createTransactionChain();
155             } else if (getLeader() != null) {
156                 getLeader().forward(message, getContext());
157             }
158         } else if (message instanceof RegisterChangeListener) {
159             registerChangeListener((RegisterChangeListener) message);
160         } else if (message instanceof UpdateSchemaContext) {
161             updateSchemaContext((UpdateSchemaContext) message);
162         } else if (message instanceof ForwardedCommitTransaction) {
163             handleForwardedCommit((ForwardedCommitTransaction) message);
164         } else if (message.getClass()
165             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
166             if (isLeader()) {
167                 createTransaction(CreateTransaction.fromSerializable(message));
168             } else if (getLeader() != null) {
169                 getLeader().forward(message, getContext());
170             }
171         } else if (message instanceof PeerAddressResolved) {
172             PeerAddressResolved resolved = (PeerAddressResolved) message;
173             setPeerAddress(resolved.getPeerId().toString(),
174                 resolved.getPeerAddress());
175         } else {
176             super.onReceiveCommand(message);
177         }
178     }
179
180     private ActorRef createTypedTransactionActor(
181         CreateTransaction createTransaction,
182         ShardTransactionIdentifier transactionId) {
183         if (createTransaction.getTransactionType()
184             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
185
186             shardMBean.incrementReadOnlyTransactionCount();
187
188             return getContext().actorOf(
189                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
190                         schemaContext,datastoreContext, name.toString()), transactionId.toString());
191
192         } else if (createTransaction.getTransactionType()
193             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
194
195             shardMBean.incrementReadWriteTransactionCount();
196
197             return getContext().actorOf(
198                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
199                         schemaContext, datastoreContext,name.toString()), transactionId.toString());
200
201
202         } else if (createTransaction.getTransactionType()
203             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
204
205             shardMBean.incrementWriteOnlyTransactionCount();
206
207             return getContext().actorOf(
208                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
209                         schemaContext, datastoreContext, name.toString()), transactionId.toString());
210         } else {
211             throw new IllegalArgumentException(
212                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
213                     + createTransaction.getTransactionType());
214         }
215     }
216
217     private void createTransaction(CreateTransaction createTransaction) {
218
219         ShardTransactionIdentifier transactionId =
220             ShardTransactionIdentifier.builder()
221                 .remoteTransactionId(createTransaction.getTransactionId())
222                 .build();
223         LOG.debug("Creating transaction : {} ", transactionId);
224         ActorRef transactionActor =
225             createTypedTransactionActor(createTransaction, transactionId);
226
227         getSender()
228             .tell(new CreateTransactionReply(
229                     Serialization.serializedActorPath(transactionActor),
230                     createTransaction.getTransactionId()).toSerializable(),
231                 getSelf());
232     }
233
234     private void commit(final ActorRef sender, Object serialized) {
235         Modification modification = MutableCompositeModification
236             .fromSerializable(serialized, schemaContext);
237         DOMStoreThreePhaseCommitCohort cohort =
238             modificationToCohort.remove(serialized);
239         if (cohort == null) {
240             LOG.debug(
241                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
242                 modification);
243             DOMStoreReadWriteTransaction transaction =
244                 store.newReadWriteTransaction();
245             modification.apply(transaction);
246             DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
247             ListenableFuture<Void> future =
248                 commitCohort.preCommit();
249             try {
250                 future.get();
251                 future = commitCohort.commit();
252                 future.get();
253             } catch (InterruptedException | ExecutionException e) {
254                 shardMBean.incrementFailedTransactionsCount();
255                 LOG.error("Failed to commit", e);
256                 return;
257             }
258             //we want to just apply the recovery commit and return
259             shardMBean.incrementCommittedTransactionCount();
260             return;
261         }
262
263         final ListenableFuture<Void> future = cohort.commit();
264         final ActorRef self = getSelf();
265
266         Futures.addCallback(future, new FutureCallback<Void>() {
267             @Override
268             public void onSuccess(Void v) {
269                sender.tell(new CommitTransactionReply().toSerializable(),self);
270                shardMBean.incrementCommittedTransactionCount();
271                shardMBean.setLastCommittedTransactionTime(new Date());
272             }
273
274             @Override
275             public void onFailure(Throwable t) {
276                 LOG.error(t, "An exception happened during commit");
277                 shardMBean.incrementFailedTransactionsCount();
278                 sender.tell(new akka.actor.Status.Failure(t), self);
279             }
280         });
281
282     }
283
284     private void handleForwardedCommit(ForwardedCommitTransaction message) {
285         Object serializedModification =
286             message.getModification().toSerializable();
287
288         modificationToCohort
289             .put(serializedModification, message.getCohort());
290
291         if (persistent) {
292             this.persistData(getSender(), "identifier",
293                 new CompositeModificationPayload(serializedModification));
294         } else {
295             this.commit(getSender(), serializedModification);
296         }
297     }
298
299     private void updateSchemaContext(UpdateSchemaContext message) {
300         this.schemaContext = message.getSchemaContext();
301         store.onGlobalContextUpdated(message.getSchemaContext());
302     }
303
304     private void registerChangeListener(
305         RegisterChangeListener registerChangeListener) {
306
307         LOG.debug("registerDataChangeListener for {}", registerChangeListener
308             .getPath());
309
310
311         ActorSelection dataChangeListenerPath = getContext()
312             .system().actorSelection(
313                 registerChangeListener.getDataChangeListenerPath());
314
315
316         // Notify the listener if notifications should be enabled or not
317         // If this shard is the leader then it will enable notifications else
318         // it will not
319         dataChangeListenerPath
320             .tell(new EnableNotification(isLeader()), getSelf());
321
322         // Now store a reference to the data change listener so it can be notified
323         // at a later point if notifications should be enabled or disabled
324         dataChangeListeners.add(dataChangeListenerPath);
325
326         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
327             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
328
329         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
330             registration = store.registerChangeListener(registerChangeListener.getPath(),
331                 listener, registerChangeListener.getScope());
332         ActorRef listenerRegistration =
333             getContext().actorOf(
334                 DataChangeListenerRegistration.props(registration));
335
336         LOG.debug(
337             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
338             , listenerRegistration.path().toString());
339
340         getSender()
341             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
342                 getSelf());
343     }
344
345     private void createTransactionChain() {
346         DOMStoreTransactionChain chain = store.createTransactionChain();
347         ActorRef transactionChain = getContext().actorOf(
348                 ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
349         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
350                 getSelf());
351     }
352
353     @Override protected void applyState(ActorRef clientActor, String identifier,
354         Object data) {
355
356         if (data instanceof CompositeModificationPayload) {
357             Object modification =
358                 ((CompositeModificationPayload) data).getModification();
359
360             if (modification != null) {
361                 commit(clientActor, modification);
362             } else {
363                 LOG.error(
364                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
365                     identifier, clientActor.path().toString());
366             }
367
368
369         } else {
370             LOG.error("Unknown state received {}", data);
371         }
372
373         // Update stats
374         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
375
376         if (lastLogEntry != null) {
377             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
378             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
379         }
380
381         shardMBean.setCommitIndex(getCommitIndex());
382         shardMBean.setLastApplied(getLastApplied());
383
384     }
385
386     @Override protected Object createSnapshot() {
387         throw new UnsupportedOperationException("createSnapshot");
388     }
389
390     @Override protected void applySnapshot(Object snapshot) {
391         throw new UnsupportedOperationException("applySnapshot");
392     }
393
394     @Override protected void onStateChanged() {
395         for (ActorSelection dataChangeListener : dataChangeListeners) {
396             dataChangeListener
397                 .tell(new EnableNotification(isLeader()), getSelf());
398         }
399
400         if (getLeaderId() != null) {
401             shardMBean.setLeader(getLeaderId());
402         }
403
404         shardMBean.setRaftState(getRaftState().name());
405         shardMBean.setCurrentTerm(getCurrentTerm());
406     }
407
408     @Override public String persistenceId() {
409         return this.name.toString();
410     }
411
412
413     private static class ShardConfigParams extends DefaultConfigParamsImpl {
414         public static final FiniteDuration HEART_BEAT_INTERVAL =
415             new FiniteDuration(500, TimeUnit.MILLISECONDS);
416
417         @Override public FiniteDuration getHeartBeatInterval() {
418             return HEART_BEAT_INTERVAL;
419         }
420     }
421
422     private static class ShardCreator implements Creator<Shard> {
423
424         private static final long serialVersionUID = 1L;
425
426         final ShardIdentifier name;
427         final Map<ShardIdentifier, String> peerAddresses;
428         final DatastoreContext datastoreContext;
429
430         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
431                 DatastoreContext datastoreContext) {
432             this.name = name;
433             this.peerAddresses = peerAddresses;
434             this.datastoreContext = datastoreContext;
435         }
436
437         @Override
438         public Shard create() throws Exception {
439             return new Shard(name, peerAddresses, datastoreContext);
440         }
441     }
442 }