import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
Logging.getLogger(getContext().system(), this);
private Shard(String name) {
+ log.info("Creating shard : {}", name );
+
store = new InMemoryDOMDataStore(name, storeExecutor);
}
@Override
public void onReceive(Object message) throws Exception {
+ log.debug("Received message {}", message);
+
if (message instanceof CreateTransactionChain) {
createTransactionChain();
} else if (message instanceof RegisterChangeListener) {
handleForwardedCommit((ForwardedCommitTransaction) message);
} else if (message instanceof Persistent) {
commit((Persistent) message);
+ } else if (message instanceof CreateTransaction) {
+ createTransaction();
}
}
+ private void createTransaction() {
+ DOMStoreReadWriteTransaction transaction =
+ store.newReadWriteTransaction();
+ ActorRef transactionActor = getContext().actorOf(
+ ShardTransaction.props(transaction, getSelf()));
+ getSender()
+ .tell(new CreateTransactionReply(transactionActor.path()),
+ getSelf());
+ }
+
private void commit(Persistent message) {
Modification modification = (Modification) message.payload();
DOMStoreThreePhaseCommitCohort cohort =
store.registerChangeListener(registerChangeListener.getPath(),
listener, registerChangeListener.getScope());
ActorRef listenerRegistration =
- getContext().actorOf(ListenerRegistration.props(registration));
+ getContext().actorOf(
+ DataChangeListenerRegistration.props(registration));
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
getSelf());