Implement DistributedDataStore#registerDataChangeListener
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.Persistent;
18 import akka.persistence.UntypedProcessor;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
27 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
28 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
29 import org.opendaylight.controller.cluster.datastore.modification.Modification;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
31 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36
37 import java.util.HashMap;
38 import java.util.Map;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.Executors;
41
42 /**
43  * A Shard represents a portion of the logical data tree <br/>
44  * <p>
45  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
46  * </p>
47  */
48 public class Shard extends UntypedProcessor {
49
50     public static final String DEFAULT_NAME = "default";
51
52     private final ListeningExecutorService storeExecutor =
53         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
54
55     private final InMemoryDOMDataStore store;
56
57     private final Map<Modification, DOMStoreThreePhaseCommitCohort>
58         modificationToCohort = new HashMap<>();
59
60     private final LoggingAdapter log =
61         Logging.getLogger(getContext().system(), this);
62
63     private Shard(String name) {
64         store = new InMemoryDOMDataStore(name, storeExecutor);
65     }
66
67     public static Props props(final String name) {
68         return Props.create(new Creator<Shard>() {
69
70             @Override
71             public Shard create() throws Exception {
72                 return new Shard(name);
73             }
74
75         });
76     }
77
78     @Override
79     public void onReceive(Object message) throws Exception {
80         if (message instanceof CreateTransactionChain) {
81             createTransactionChain();
82         } else if (message instanceof RegisterChangeListener) {
83             registerChangeListener((RegisterChangeListener) message);
84         } else if (message instanceof UpdateSchemaContext) {
85             updateSchemaContext((UpdateSchemaContext) message);
86         } else if (message instanceof ForwardedCommitTransaction) {
87             handleForwardedCommit((ForwardedCommitTransaction) message);
88         } else if (message instanceof Persistent) {
89             commit((Persistent) message);
90         }
91     }
92
93     private void commit(Persistent message) {
94         Modification modification = (Modification) message.payload();
95         DOMStoreThreePhaseCommitCohort cohort =
96             modificationToCohort.remove(modification);
97         if (cohort == null) {
98             log.error(
99                 "Could not find cohort for modification : " + modification);
100             return;
101         }
102         final ListenableFuture<Void> future = cohort.commit();
103         final ActorRef sender = getSender();
104         final ActorRef self = getSelf();
105         future.addListener(new Runnable() {
106             @Override
107             public void run() {
108                 try {
109                     future.get();
110                     sender.tell(new CommitTransactionReply(), self);
111                 } catch (InterruptedException | ExecutionException e) {
112                     log.error(e, "An exception happened when committing");
113                 }
114             }
115         }, getContext().dispatcher());
116     }
117
118     private void handleForwardedCommit(ForwardedCommitTransaction message) {
119         log.info("received forwarded transaction");
120         modificationToCohort
121             .put(message.getModification(), message.getCohort());
122         getSelf().forward(Persistent.create(message.getModification()),
123             getContext());
124     }
125
126     private void updateSchemaContext(UpdateSchemaContext message) {
127         store.onGlobalContextUpdated(message.getSchemaContext());
128     }
129
130     private void registerChangeListener(
131         RegisterChangeListener registerChangeListener) {
132
133         ActorSelection listenerRegistrationActor = getContext()
134             .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
135
136         AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
137             listener = new ListenerProxy(listenerRegistrationActor);
138
139         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
140             registration =
141             store.registerChangeListener(registerChangeListener.getPath(),
142                 listener, registerChangeListener.getScope());
143         ActorRef listenerRegistration =
144             getContext().actorOf(ListenerRegistration.props(registration));
145         getSender()
146             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
147                 getSelf());
148     }
149
150     private void createTransactionChain() {
151         DOMStoreTransactionChain chain = store.createTransactionChain();
152         ActorRef transactionChain =
153             getContext().actorOf(ShardTransactionChain.props(chain));
154         getSender()
155             .tell(new CreateTransactionChainReply(transactionChain.path()),
156                 getSelf());
157     }
158 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.