import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
}
- public static Props props(final String name, final Map<String, String> peerAddresses) {
+ public static Props props(final String name,
+ final Map<String, String> peerAddresses) {
return Props.create(new Creator<Shard>() {
@Override
}
- @Override public void onReceiveCommand(Object message){
- LOG.debug("Received message {} from {}", message.getClass().toString(), getSender());
+ @Override public void onReceiveCommand(Object message) {
+ LOG.debug("Received message {} from {}", message.getClass().toString(),
+ getSender());
- if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
- if(isLeader()) {
+ if (message.getClass()
+ .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
+ if (isLeader()) {
createTransactionChain();
- } else if(getLeader() != null){
+ } else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
} else if (message instanceof RegisterChangeListener) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof ForwardedCommitTransaction) {
handleForwardedCommit((ForwardedCommitTransaction) message);
- } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- if(isLeader()) {
+ } else if (message.getClass()
+ .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (isLeader()) {
createTransaction(CreateTransaction.fromSerializable(message));
- } else if(getLeader() != null){
+ } else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
- } else if (message instanceof PeerAddressResolved){
+ } else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else {
- super.onReceiveCommand(message);
+ super.onReceiveCommand(message);
}
}
- private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
- if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
-
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
- }else{
- throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
- }
- }
+ private ActorRef createTypedTransactionActor(
+ CreateTransaction createTransaction, String transactionId) {
+ if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ shardMBean.incrementReadOnlyTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newReadOnlyTransaction(), getSelf(),
+ schemaContext), transactionId);
+
+ } else if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+ shardMBean.incrementReadWriteTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newReadWriteTransaction(), getSelf(),
+ schemaContext), transactionId);
+
+
+ } else if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ shardMBean.incrementWriteOnlyTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newWriteOnlyTransaction(), getSelf(),
+ schemaContext), transactionId);
+ } else {
+ throw new IllegalArgumentException(
+ "CreateTransaction message has unidentified transaction type="
+ + createTransaction.getTransactionType());
+ }
+ }
private void createTransaction(CreateTransaction createTransaction) {
String transactionId = "shard-" + createTransaction.getTransactionId();
- LOG.info("Creating transaction : {} " , transactionId);
- ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId);
+ LOG.info("Creating transaction : {} ", transactionId);
+ ActorRef transactionActor =
+ createTypedTransactionActor(createTransaction, transactionId);
getSender()
- .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(),
- getSelf());
+ .tell(new CreateTransactionReply(
+ Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(),
+ getSelf()
+ );
}
private void commit(final ActorRef sender, Object serialized) {
- Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
+ Modification modification =
+ MutableCompositeModification.fromSerializable(
+ serialized, schemaContext);
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
try {
future.get();
- if(sender != null) {
+ if (sender != null) {
sender
.tell(new CommitTransactionReply().toSerializable(),
self);
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
- Object serializedModification = message.getModification().toSerializable();
+ Object serializedModification =
+ message.getModification().toSerializable();
modificationToCohort
- .put(serializedModification , message.getCohort());
+ .put(serializedModification, message.getCohort());
- if(persistent) {
- this.persistData(getSender(), "identifier", new CompositeModificationPayload(serializedModification));
+ if (persistent) {
+ this.persistData(getSender(), "identifier",
+ new CompositeModificationPayload(serializedModification));
} else {
this.commit(getSender(), serializedModification);
}
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
+ LOG.debug("registerDataChangeListener for " + registerChangeListener
+ .getPath());
ActorSelection dataChangeListenerPath = getContext()
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
// it will not
- dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListenerPath
+ .tell(new EnableNotification(isLeader()), getSelf());
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
+ listener =
+ new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
registration =
getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
+ LOG.debug(
+ "registerDataChangeListener sending reply, listenerRegistrationPath = "
+ + listenerRegistration.path().toString());
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
ShardTransactionChain.props(chain, schemaContext));
getSender()
.tell(new CreateTransactionChainReply(transactionChain.path())
- .toSerializable(),
- getSelf());
+ .toSerializable(),
+ getSelf()
+ );
}
@Override protected void applyState(ActorRef clientActor, String identifier,
Object data) {
- if(data instanceof CompositeModificationPayload){
+ if (data instanceof CompositeModificationPayload) {
Object modification =
((CompositeModificationPayload) data).getModification();
- if(modification != null){
+ if (modification != null) {
commit(clientActor, modification);
} else {
LOG.error("modification is null - this is very unexpected");
LOG.error("Unknown state received {}", data);
}
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+
+ if(lastLogEntry != null){
+ shardMBean.setLastLogIndex(lastLogEntry.getIndex());
+ shardMBean.setLastLogTerm(lastLogEntry.getTerm());
+ }
+
+ shardMBean.setCommitIndex(getCommitIndex());
+ shardMBean.setLastApplied(getLastApplied());
+
}
@Override protected Object createSnapshot() {
}
@Override protected void onStateChanged() {
- for(ActorSelection dataChangeListener : dataChangeListeners){
- dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+ for (ActorSelection dataChangeListener : dataChangeListeners) {
+ dataChangeListener
+ .tell(new EnableNotification(isLeader()), getSelf());
}
- if(getLeaderId() != null){
+ if (getLeaderId() != null) {
shardMBean.setLeader(getLeaderId());
}
shardMBean.setRaftState(getRaftState().name());
+ shardMBean.setCurrentTerm(getCurrentTerm());
}
@Override public String persistenceId() {
* @author: syedbahm
*/
public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
- private Long committedTransactionsCount;
- private Long journalMessagesCount;
- final private String shardName;
- private String leader;
- private String raftState;
- ShardStats(String shardName){
- this.shardName = shardName;
- committedTransactionsCount =0L;
- journalMessagesCount = 0L;
- };
+ private final String shardName;
+ private Long committedTransactionsCount = 0L;
- @Override
- public String getShardName() {
- return shardName;
- }
+ private Long readOnlyTransactionCount = 0L;
- @Override
- public Long getCommittedTransactionsCount() {
- return committedTransactionsCount;
- }
+ private Long writeOnlyTransactionCount = 0L;
- @Override
- public Long getJournalMessagesCount() {
- //FIXME: this will be populated once after integration with Raft stuff
- return journalMessagesCount;
- }
+ private Long readWriteTransactionCount = 0L;
- @Override public String getLeader() {
- return leader;
- }
+ private String leader;
- @Override public String getRaftState() {
- return raftState;
- }
+ private String raftState;
- public Long incrementCommittedTransactionCount() {
- return committedTransactionsCount++;
- }
+ private Long lastLogTerm = -1L;
+ private Long lastLogIndex = -1L;
- public void updateCommittedTransactionsCount(long currentCount){
- committedTransactionsCount = currentCount;
+ private Long currentTerm = -1L;
- }
+ private Long commitIndex = -1L;
- public void updateJournalMessagesCount(long currentCount){
- journalMessagesCount = currentCount;
+ private Long lastApplied = -1L;
- }
+ ShardStats(String shardName) {
+ this.shardName = shardName;
+ }
- public void setLeader(String leader){
- this.leader = leader;
- }
- public void setRaftState(String raftState){
- this.raftState = raftState;
- }
+ @Override
+ public String getShardName() {
+ return shardName;
+ }
+ @Override
+ public Long getCommittedTransactionsCount() {
+ return committedTransactionsCount;
+ }
- @Override
- protected String getMBeanName() {
- return shardName;
- }
+ @Override public String getLeader() {
+ return leader;
+ }
- @Override
- protected String getMBeanType() {
- return JMX_TYPE_DISTRIBUTED_DATASTORE;
- }
+ @Override public String getRaftState() {
+ return raftState;
+ }
- @Override
- protected String getMBeanCategory() {
- return JMX_CATEGORY_SHARD;
- }
+ @Override public Long getReadOnlyTransactionCount() {
+ return readOnlyTransactionCount;
+ }
+
+ @Override public Long getWriteOnlyTransactionCount() {
+ return writeOnlyTransactionCount;
+ }
+
+ @Override public Long getReadWriteTransactionCount() {
+ return readWriteTransactionCount;
+ }
+
+ @Override public Long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ @Override public Long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ @Override public Long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ @Override public Long getCommitIndex() {
+ return commitIndex;
+ }
+
+ @Override public Long getLastApplied() {
+ return lastApplied;
+ }
+
+ public Long incrementCommittedTransactionCount() {
+ return committedTransactionsCount++;
+ }
+
+ public Long incrementReadOnlyTransactionCount() {
+ return readOnlyTransactionCount++;
+ }
+
+ public Long incrementWriteOnlyTransactionCount() {
+ return writeOnlyTransactionCount++;
+ }
+
+ public Long incrementReadWriteTransactionCount() {
+ return readWriteTransactionCount++;
+ }
+
+ public void setLeader(String leader) {
+ this.leader = leader;
+ }
+
+ public void setRaftState(String raftState) {
+ this.raftState = raftState;
+ }
+
+ public void setLastLogTerm(Long lastLogTerm) {
+ this.lastLogTerm = lastLogTerm;
+ }
+
+ public void setLastLogIndex(Long lastLogIndex) {
+ this.lastLogIndex = lastLogIndex;
+ }
+
+ public void setCurrentTerm(Long currentTerm) {
+ this.currentTerm = currentTerm;
+ }
+
+ public void setCommitIndex(Long commitIndex) {
+ this.commitIndex = commitIndex;
+ }
+
+ public void setLastApplied(Long lastApplied) {
+ this.lastApplied = lastApplied;
+ }
+
+ @Override
+ protected String getMBeanName() {
+ return shardName;
+ }
+
+ @Override
+ protected String getMBeanType() {
+ return JMX_TYPE_DISTRIBUTED_DATASTORE;
+ }
+
+ @Override
+ protected String getMBeanCategory() {
+ return JMX_CATEGORY_SHARD;
+ }
}