package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import akka.japi.Creator;
import akka.persistence.UntypedProcessor;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Executors;
/**
- * A Shard represents a portion of the logical data tree
- * <p/>
+ * A Shard represents a portion of the logical data tree <br/>
+ * <p>
* Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * </p>
*/
public class Shard extends UntypedProcessor {
- ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ public static final String DEFAULT_NAME = "default";
- private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+ private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
- LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final InMemoryDOMDataStore store;
+
+ private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ private Shard(String name){
+ store = new InMemoryDOMDataStore(name, storeExecutor);
+ }
+
+ public static Props props(final String name) {
+ return Props.create(new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(name);
+ }
+
+ });
+ }
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof CreateTransactionChain) {
createTransactionChain();
- } else if(message instanceof RegisterChangeListener){
+ } else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
- } else if(message instanceof UpdateSchemaContext){
- store.onGlobalContextUpdated(((UpdateSchemaContext) message).getSchemaContext());
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
}
}
+ private void updateSchemaContext(UpdateSchemaContext message) {
+ store.onGlobalContextUpdated(message.getSchemaContext());
+ }
+
private void registerChangeListener(RegisterChangeListener registerChangeListener) {
org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
- // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore
- ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null));
+ ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}