// Annotations
requires static transitive java.annotation;
requires static transitive javax.inject;
+ requires static transitive org.eclipse.jdt.annotation;
requires static org.kohsuke.metainf_services;
requires static org.osgi.service.component.annotations;
requires static org.osgi.annotation.bundle;
"No ID specified."));
return;
}
- final var state = stateMachine.getSubscriptionState(id);
+ final var state = stateMachine.lookupSubscriptionState(id);
if (state == null) {
request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
"No subscription with given ID."));
return;
}
- if (stateMachine.getSubscriptionSession(id) != request.session()) {
+ if (stateMachine.lookupSubscriptionSession(id) != request.session()) {
request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
"Subscription with given id does not exist on this session"));
return;
"No id specified"));
return;
}
- final var state = stateMachine.getSubscriptionState(id);
+ final var state = stateMachine.lookupSubscriptionState(id);
if (state == null) {
request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
"No subscription with given ID."));
public ModifySubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
@Reference final SubscriptionStateService subscriptionStateService,
@Reference final SubscriptionStateMachine stateMachine,
- @Reference RestconfStream.Registry streamRegistry) {
+ @Reference final RestconfStream.Registry streamRegistry) {
super(ModifySubscription.QNAME);
this.mdsalService = requireNonNull(mdsalService);
this.subscriptionStateService = requireNonNull(subscriptionStateService);
return;
}
- final var state = stateMachine.getSubscriptionState(id);
+ final var state = stateMachine.lookupSubscriptionState(id);
if (state == null) {
request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
"No subscription with given ID."));
return;
}
- if (stateMachine.getSubscriptionSession(id) != request.session()) {
+ if (stateMachine.lookupSubscriptionSession(id) != request.session()) {
request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
"Subscription with given id does not exist on this session"));
return;
*/
package org.opendaylight.restconf.subscription;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import static java.util.Objects.requireNonNull;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.inject.Inject;
import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.restconf.server.api.TransportSession;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.osgi.service.component.annotations.Activate;
@Singleton
@Component(service = SubscriptionStateMachine.class)
public class SubscriptionStateMachine {
- private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStateMachine.class);
+ /**
+ * Internal helper DTO that not meant to be used anywhere else. Used to increase readability of the code and allow
+ * us store subscription session and state in one map.
+ */
+ @NonNullByDefault
+ private record SessionStatePair(TransportSession session, SubscriptionState state) {
+ SessionStatePair {
+ requireNonNull(session);
+ requireNonNull(state);
+ }
+
+ SessionStatePair withState(final Uint32 id, final SubscriptionState newState) {
+ return switch (state) {
+ case START, SUSPENDED -> switch (newState) {
+ case START, SUSPENDED -> throw reject(id, newState);
+ case ACTIVE, END -> accept(id, newState);
+ };
+ case ACTIVE -> switch (newState) {
+ case START, ACTIVE -> throw reject(id, newState);
+ case SUSPENDED, END -> accept(id, newState);
+ };
+ case END -> throw reject(id, newState);
+ };
+ }
+
+ private SessionStatePair accept(final Uint32 id, final SubscriptionState newState) {
+ LOG.debug("Subscription {} moving from {} to {}", id, state, newState);
+ return new SessionStatePair(session, newState);
+ }
- // All legal transitions from one state to another
- private static final Map<SubscriptionState, Set<SubscriptionState>> TRANSITIONS = new EnumMap<>(Map.of(
- SubscriptionState.START, Set.of(SubscriptionState.ACTIVE, SubscriptionState.END),
- SubscriptionState.ACTIVE, Set.of(SubscriptionState.SUSPENDED, SubscriptionState.END),
- SubscriptionState.SUSPENDED, Set.of(SubscriptionState.ACTIVE, SubscriptionState.END)
- ));
+ private IllegalStateException reject(final Uint32 id, final SubscriptionState newState) {
+ return new IllegalStateException(
+ "Subscription %s cannot transition from %s to %s".formatted(id, state, newState));
+ }
+ }
- private final Map<Uint32, SessionStatePair> subscriptionStateMap = new HashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStateMachine.class);
+
+ private final ConcurrentMap<Uint32, SessionStatePair> subscriptionStateMap = new ConcurrentHashMap<>();
@Inject
@Activate
* @param session session on which we are registering the subscription
* @param subscriptionId id of the newly registered subscription
*/
+ @NonNullByDefault
public void registerSubscription(final TransportSession session, final Uint32 subscriptionId) {
subscriptionStateMap.put(subscriptionId, new SessionStatePair(session, SubscriptionState.START));
}
* Moves subscription from its current state to new one assuming this transition is legal.
*
* @param subscriptionId id of the subscription
- * @param toType new state assigned to subscription
+ * @param newState new state assigned to subscription
+ * @throws NoSuchElementException if the subscription is not found
* @throws IllegalStateException if transition to a new state is not legal
*/
- public void moveTo(final Uint32 subscriptionId, final SubscriptionState toType) {
- final var transition = TRANSITIONS.get(getSubscriptionState(subscriptionId)).contains(toType);
- // Check if this state transition is allowed
- if (!transition) {
- throw new IllegalStateException(String.format("Illegal transition to %s state.", toType));
+ @NonNullByDefault
+ public void moveTo(final Uint32 subscriptionId, final SubscriptionState newState) {
+ requireNonNull(newState);
+ // atomic search-check-and-replace, since we normally produce non-null, a null return indicates the mapping was
+ // not present
+ final var found = subscriptionStateMap.computeIfPresent(subscriptionId,
+ (id, pair) -> pair.withState(id, newState));
+ if (found == null) {
+ throw new NoSuchElementException("No subscription " + subscriptionId);
}
- subscriptionStateMap.replace(subscriptionId, new SessionStatePair(getSubscriptionSession(subscriptionId),
- toType));
}
/**
- * Retrieves state of given subscription.
+ * Retrieves state of given subscription, if present.
*
* @param subscriptionId id of the subscription
- * @return current state of subscription
+ * @return current state of subscription, or {@code null}
*/
- public SubscriptionState getSubscriptionState(final Uint32 subscriptionId) {
- final var currentState = subscriptionStateMap.get(subscriptionId);
- // Check if subscription exist
- if (currentState == null) {
- return null;
- }
- return currentState.state();
+ public @Nullable SubscriptionState lookupSubscriptionState(final @NonNull Uint32 subscriptionId) {
+ final var pair = subscriptionStateMap.get(subscriptionId);
+ return pair != null ? pair.state : null;
}
/**
- * Retrieves session of given subscription.
+ * Retrieves session of given subscription, if present.
*
* @param subscriptionId id of the subscription
- * @return session tied to the subscription
+ * @return session tied to the subscription, or {@code null}
*/
- public TransportSession getSubscriptionSession(final Uint32 subscriptionId) {
- final var currentState = subscriptionStateMap.get(subscriptionId);
- // Check if subscription exist
- if (currentState == null) {
- return null;
- }
- return currentState.session();
+ public @Nullable TransportSession lookupSubscriptionSession(final @NonNull Uint32 subscriptionId) {
+ final var pair = subscriptionStateMap.get(subscriptionId);
+ return pair != null ? pair.session : null;
}
-
- /**
- * Internal helper class that not meant to be used anywhere else. Used to increase readability of the code and allow
- * us store subscription session and state in one map.
- */
- private record SessionStatePair(TransportSession session, SubscriptionState state) { }
}
doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
doReturn(session).when(request).session();
- doReturn(session).when(stateMachine).getSubscriptionSession(ID);
- doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+ doReturn(session).when(stateMachine).lookupSubscriptionSession(ID);
+ doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(writeTx).delete(eq(LogicalDatastoreType.OPERATIONAL),
void deleteSubscriptionWrongSessionTest() {
doReturn(session).when(request).session();
// return session different from request session
- doReturn(null).when(stateMachine).getSubscriptionSession(ID);
- doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+ doReturn(null).when(stateMachine).lookupSubscriptionSession(ID);
+ doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(response.capture());
- assertEquals("Subscription with given id does not exist on this session",
- response.getValue().getMessage());
+ assertEquals("Subscription with given id does not exist on this session", response.getValue().getMessage());
}
@Test
void deleteSubscriptionWrongIDTest() {
- doReturn(null).when(stateMachine).getSubscriptionState(ID);
+ doReturn(null).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(response.capture());
@Test
void deleteSubscriptionAlreadyEndedTest() {
- doReturn(SubscriptionState.END).when(stateMachine).getSubscriptionState(ID);
+ doReturn(SubscriptionState.END).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(response.capture());
- assertEquals("There is no active or suspended subscription with given ID.",
- response.getValue().getMessage());
+ assertEquals("There is no active or suspended subscription with given ID.", response.getValue().getMessage());
}
}
doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
- doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+ doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(writeTx).delete(eq(LogicalDatastoreType.OPERATIONAL),
@Test
void killSubscriptionWrongIDTest() {
- doReturn(null).when(stateMachine).getSubscriptionState(ID);
+ doReturn(null).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(response.capture());
@Test
void killSubscriptionAlreadyEndedTest() {
- doReturn(SubscriptionState.END).when(stateMachine).getSubscriptionState(ID);
+ doReturn(SubscriptionState.END).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(response.capture());
doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
doReturn(session).when(request).session();
- doReturn(session).when(stateMachine).getSubscriptionSession(ID);
- doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+ doReturn(session).when(stateMachine).lookupSubscriptionSession(ID);
+ doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(writeTx).merge(eq(LogicalDatastoreType.OPERATIONAL),
void modifySubscriptionWrongSessionTest() {
doReturn(session).when(request).session();
// return session different from request session
- doReturn(null).when(stateMachine).getSubscriptionSession(ID);
- doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+ doReturn(null).when(stateMachine).lookupSubscriptionSession(ID);
+ doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(responseCaptor.capture());
@Test
void modifySubscriptionWrongIDTest() {
- doReturn(null).when(stateMachine).getSubscriptionState(ID);
+ doReturn(null).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(responseCaptor.capture());
@Test
void modifySubscriptionAlreadyEndedTest() {
- doReturn(SubscriptionState.END).when(stateMachine).getSubscriptionState(ID);
+ doReturn(SubscriptionState.END).when(stateMachine).lookupSubscriptionState(ID);
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
verify(request).completeWith(responseCaptor.capture());
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opendaylight.restconf.server.api.TransportSession;
@Mock
private TransportSession session;
- private SubscriptionStateMachine subscriptionStateMachine;
+ private final SubscriptionStateMachine stateMachine = new SubscriptionStateMachine();
@BeforeEach
void before() {
- // initializing state machine
- subscriptionStateMachine = new SubscriptionStateMachine();
- subscriptionStateMachine.registerSubscription(session, Uint32.ONE);
-
// Checking default stating state
- assertEquals(SubscriptionState.START, subscriptionStateMachine.getSubscriptionState(Uint32.ONE));
+ stateMachine.registerSubscription(session, Uint32.ONE);
+ assertEquals(SubscriptionState.START, stateMachine.lookupSubscriptionState(Uint32.ONE));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void moveFromStartValid(final SubscriptionState newState) {
+ assertMoveTo(newState);
+ }
+
+ private static List<Arguments> moveFromStartValid() {
+ return List.of(Arguments.of(SubscriptionState.ACTIVE), Arguments.of(SubscriptionState.END));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void moveFromStartInvalid(final SubscriptionState newState) {
+ final var ex = assertThrows(IllegalStateException.class, () -> stateMachine.moveTo(Uint32.ONE, newState));
+ assertEquals("Subscription 1 cannot transition from START to " + newState, ex.getMessage());
+ assertEquals(SubscriptionState.START, stateMachine.lookupSubscriptionState(Uint32.ONE));
+ }
+
+ private static List<Arguments> moveFromStartInvalid() {
+ return List.of(Arguments.of(SubscriptionState.SUSPENDED), Arguments.of(SubscriptionState.SUSPENDED));
}
@Test
- void transitionStateTest() {
- // Transition state
- subscriptionStateMachine.moveTo(Uint32.ONE, SubscriptionState.ACTIVE);
- assertEquals(SubscriptionState.ACTIVE, subscriptionStateMachine.getSubscriptionState(Uint32.ONE));
+ void moveFromActiveToEnd() {
+ assertMoveTo(SubscriptionState.ACTIVE);
+ assertMoveTo(SubscriptionState.END);
}
@Test
- void illegalTransitionStateTest() {
- // Trying illegal state transition
- assertThrows(IllegalStateException.class,
- () -> subscriptionStateMachine.moveTo(Uint32.ONE, SubscriptionState.START));
+ void moveFromActiveToSuspededToEnd() {
+ assertMoveTo(SubscriptionState.ACTIVE);
+ assertMoveTo(SubscriptionState.SUSPENDED);
+ assertMoveTo(SubscriptionState.END);
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void moveToInvalid(final SubscriptionState first, final SubscriptionState second) {
+ assertMoveTo(first);
+ final var ex = assertThrows(IllegalStateException.class, () -> stateMachine.moveTo(Uint32.ONE, second));
+ assertEquals("Subscription 1 cannot transition from " + first + " to " + second, ex.getMessage());
+ assertEquals(first, stateMachine.lookupSubscriptionState(Uint32.ONE));
+ }
+
+ private static List<Arguments> moveToInvalid() {
+ return List.of(
+ Arguments.of(SubscriptionState.ACTIVE, SubscriptionState.START),
+ Arguments.of(SubscriptionState.END, SubscriptionState.SUSPENDED));
+ }
+
+ private void assertMoveTo(final SubscriptionState newState) {
+ stateMachine.moveTo(Uint32.ONE, newState);
+ assertEquals(newState, stateMachine.lookupSubscriptionState(Uint32.ONE));
}
}
if (!headers.contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, false)) {
return new EmptyResponse(HttpResponseStatus.NOT_ACCEPTABLE);
}
- final var subscriptionState = machine.getSubscriptionState(Uint32.valueOf(subscriptionId));
+ final var subscriptionState = machine.lookupSubscriptionState(Uint32.valueOf(subscriptionId));
if (subscriptionState == null) {
LOG.debug("Subscription for id {} not found", subscriptionId);
return EmptyResponse.NOT_FOUND;