Improve SubscriptionStateMachine 60/115560/6
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 21 Feb 2025 06:20:57 +0000 (07:20 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 21 Feb 2025 16:14:54 +0000 (16:14 +0000)
Internal map may be accessed concurrently, so this patch switches to
ConcurrentHashMap, in order to avoid CMEs.

We dust off SessionStatePair and turn it into a useful object,
expressing state transition rules via switch expressions -- which allow
us to explicitly document all of them. This also fixes a NPE, if one
were trying to move from END to something else.

We also make moveTo() an atomic transition via Map.computeIfPresent()
calls to SessionStatePair.withNewState(), which improves performance by
performing only a single lookup.

Finally we from the 'get' prefix in favor of lookup(), documenting the
fact these methods can legally return null.

JIRA: NETCONF-714
Change-Id: I202d16086b8ba3b2a5ad2aad07d991c5f09026f3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
apps/restconf-subscription/src/main/java/module-info.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/DeleteSubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/KillSubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/ModifySubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/SubscriptionStateMachine.java
apps/restconf-subscription/src/test/java/org/opendaylight/restconf/subscription/DeleteSubscriptionRpcTest.java
apps/restconf-subscription/src/test/java/org/opendaylight/restconf/subscription/KillSubscriptionRpcTest.java
apps/restconf-subscription/src/test/java/org/opendaylight/restconf/subscription/ModifySubscriptionRpcTest.java
apps/restconf-subscription/src/test/java/org/opendaylight/restconf/subscription/StateTransitionTest.java
restconf/restconf-notifications/src/main/java/org/opendaylight/restconf/notifications/SubscriptionResourceInstance.java

index 4b788b5967793ab6b74cb243c6f4d6a61b824074..5fdd802d95fe03b5759aa9dd55a7b301d7db0396 100644 (file)
@@ -28,6 +28,7 @@ module org.opendaylight.restconf.subscription {
     // Annotations
     requires static transitive java.annotation;
     requires static transitive javax.inject;
+    requires static transitive org.eclipse.jdt.annotation;
     requires static org.kohsuke.metainf_services;
     requires static org.osgi.service.component.annotations;
     requires static org.osgi.annotation.bundle;
index f24a2d78fd3e0892f193afddfc0eb01825ee26df..42804e85e1bacc171af54dc3b0fd1c321b319ab7 100644 (file)
@@ -82,7 +82,7 @@ public final class DeleteSubscriptionRpc extends RpcImplementation {
                 "No ID specified."));
             return;
         }
-        final var state = stateMachine.getSubscriptionState(id);
+        final var state = stateMachine.lookupSubscriptionState(id);
         if (state == null) {
             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
                 "No subscription with given ID."));
@@ -94,7 +94,7 @@ public final class DeleteSubscriptionRpc extends RpcImplementation {
             return;
         }
 
-        if (stateMachine.getSubscriptionSession(id) != request.session()) {
+        if (stateMachine.lookupSubscriptionSession(id) != request.session()) {
             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
                 "Subscription with given id does not exist on this session"));
             return;
index a5e1f8190b62a25e99a8b339234b7e67a84f792f..a44c4cfc36dd398a975bf453deee2c2f506046d5 100644 (file)
@@ -81,7 +81,7 @@ public final class KillSubscriptionRpc extends RpcImplementation {
                 "No id specified"));
             return;
         }
-        final var state = stateMachine.getSubscriptionState(id);
+        final var state = stateMachine.lookupSubscriptionState(id);
         if (state == null) {
             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
                 "No subscription with given ID."));
index 36ceca860335c91ce2ec837b4e419b09b8fe7a12..026286933b2d09e2c55efcb2a034895935ad5bfa 100644 (file)
@@ -73,7 +73,7 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
     public ModifySubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
             @Reference final SubscriptionStateService subscriptionStateService,
             @Reference final SubscriptionStateMachine stateMachine,
-            @Reference RestconfStream.Registry streamRegistry) {
+            @Reference final RestconfStream.Registry streamRegistry) {
         super(ModifySubscription.QNAME);
         this.mdsalService = requireNonNull(mdsalService);
         this.subscriptionStateService = requireNonNull(subscriptionStateService);
@@ -100,7 +100,7 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
             return;
         }
 
-        final var state = stateMachine.getSubscriptionState(id);
+        final var state = stateMachine.lookupSubscriptionState(id);
         if (state == null) {
             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
                 "No subscription with given ID."));
@@ -112,7 +112,7 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
             return;
         }
 
-        if (stateMachine.getSubscriptionSession(id) != request.session()) {
+        if (stateMachine.lookupSubscriptionSession(id) != request.session()) {
             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
                 "Subscription with given id does not exist on this session"));
             return;
index 9544bc9c27550066a84108f72edc7e1618b59ad1..58f6ed30e147a7aa58c668a0d0d5131766c33453 100644 (file)
@@ -7,12 +7,16 @@
  */
 package org.opendaylight.restconf.subscription;
 
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import static java.util.Objects.requireNonNull;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.restconf.server.api.TransportSession;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.osgi.service.component.annotations.Activate;
@@ -23,16 +27,45 @@ import org.slf4j.LoggerFactory;
 @Singleton
 @Component(service = SubscriptionStateMachine.class)
 public class SubscriptionStateMachine {
-    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStateMachine.class);
+    /**
+     * Internal helper DTO that not meant to be used anywhere else. Used to increase readability of the code and allow
+     * us store subscription session and state in one map.
+     */
+    @NonNullByDefault
+    private record SessionStatePair(TransportSession session, SubscriptionState state) {
+        SessionStatePair {
+            requireNonNull(session);
+            requireNonNull(state);
+        }
+
+        SessionStatePair withState(final Uint32 id, final SubscriptionState newState) {
+            return switch (state) {
+                case START, SUSPENDED -> switch (newState) {
+                    case START, SUSPENDED -> throw reject(id, newState);
+                    case ACTIVE, END -> accept(id, newState);
+                };
+                case ACTIVE -> switch (newState) {
+                    case START, ACTIVE -> throw reject(id, newState);
+                    case SUSPENDED, END -> accept(id, newState);
+                };
+                case END -> throw reject(id, newState);
+            };
+        }
+
+        private SessionStatePair accept(final Uint32 id, final SubscriptionState newState) {
+            LOG.debug("Subscription {} moving from {} to {}", id, state, newState);
+            return new SessionStatePair(session, newState);
+        }
 
-    // All legal transitions from one state to another
-    private static final Map<SubscriptionState, Set<SubscriptionState>> TRANSITIONS = new EnumMap<>(Map.of(
-        SubscriptionState.START, Set.of(SubscriptionState.ACTIVE, SubscriptionState.END),
-        SubscriptionState.ACTIVE, Set.of(SubscriptionState.SUSPENDED, SubscriptionState.END),
-        SubscriptionState.SUSPENDED, Set.of(SubscriptionState.ACTIVE, SubscriptionState.END)
-    ));
+        private IllegalStateException reject(final Uint32 id, final SubscriptionState newState) {
+            return new IllegalStateException(
+                "Subscription %s cannot transition from %s to %s".formatted(id, state, newState));
+        }
+    }
 
-    private final Map<Uint32, SessionStatePair> subscriptionStateMap = new HashMap<>();
+    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStateMachine.class);
+
+    private final ConcurrentMap<Uint32, SessionStatePair> subscriptionStateMap = new ConcurrentHashMap<>();
 
     @Inject
     @Activate
@@ -46,6 +79,7 @@ public class SubscriptionStateMachine {
      * @param session session on which we are registering the subscription
      * @param subscriptionId id of the newly registered subscription
      */
+    @NonNullByDefault
     public void registerSubscription(final TransportSession session, final Uint32 subscriptionId) {
         subscriptionStateMap.put(subscriptionId, new SessionStatePair(session, SubscriptionState.START));
     }
@@ -54,52 +88,41 @@ public class SubscriptionStateMachine {
      * Moves subscription from its current state to new one assuming this transition is legal.
      *
      * @param subscriptionId id of the subscription
-     * @param toType new state assigned to subscription
+     * @param newState new state assigned to subscription
+     * @throws NoSuchElementException if the subscription is not found
      * @throws IllegalStateException if transition to a new state is not legal
      */
-    public void moveTo(final Uint32 subscriptionId, final SubscriptionState toType) {
-        final var transition = TRANSITIONS.get(getSubscriptionState(subscriptionId)).contains(toType);
-        // Check if this state transition is allowed
-        if (!transition) {
-            throw new IllegalStateException(String.format("Illegal transition to %s state.", toType));
+    @NonNullByDefault
+    public void moveTo(final Uint32 subscriptionId, final SubscriptionState newState) {
+        requireNonNull(newState);
+        // atomic search-check-and-replace, since we normally produce non-null, a null return indicates the mapping was
+        // not present
+        final var found = subscriptionStateMap.computeIfPresent(subscriptionId,
+            (id, pair) -> pair.withState(id, newState));
+        if (found == null) {
+            throw new NoSuchElementException("No subscription " + subscriptionId);
         }
-        subscriptionStateMap.replace(subscriptionId, new SessionStatePair(getSubscriptionSession(subscriptionId),
-            toType));
     }
 
     /**
-     * Retrieves state of given subscription.
+     * Retrieves state of given subscription, if present.
      *
      * @param subscriptionId id of the subscription
-     * @return current state of subscription
+     * @return current state of subscription, or {@code null}
      */
-    public SubscriptionState getSubscriptionState(final Uint32 subscriptionId) {
-        final var currentState = subscriptionStateMap.get(subscriptionId);
-        // Check if subscription exist
-        if (currentState == null) {
-            return null;
-        }
-        return currentState.state();
+    public @Nullable SubscriptionState lookupSubscriptionState(final @NonNull Uint32 subscriptionId) {
+        final var pair = subscriptionStateMap.get(subscriptionId);
+        return pair != null ? pair.state : null;
     }
 
     /**
-     * Retrieves session of given subscription.
+     * Retrieves session of given subscription, if present.
      *
      * @param subscriptionId id of the subscription
-     * @return session tied to the subscription
+     * @return session tied to the subscription, or {@code null}
      */
-    public TransportSession getSubscriptionSession(final Uint32 subscriptionId) {
-        final var currentState = subscriptionStateMap.get(subscriptionId);
-        // Check if subscription exist
-        if (currentState == null) {
-            return null;
-        }
-        return currentState.session();
+    public @Nullable TransportSession lookupSubscriptionSession(final @NonNull Uint32 subscriptionId) {
+        final var pair = subscriptionStateMap.get(subscriptionId);
+        return pair != null ? pair.session : null;
     }
-
-    /**
-     * Internal helper class that not meant to be used anywhere else. Used to increase readability of the code and allow
-     * us store subscription session and state in one map.
-     */
-    private record SessionStatePair(TransportSession session, SubscriptionState state) { }
 }
index 4026e1b19dcecda254769cdb0c6cb9ceeecc2dec..95646b84dc26f1dc7f60ecb6fc3520cd92c19071 100644 (file)
@@ -85,8 +85,8 @@ class DeleteSubscriptionRpcTest {
         doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
         doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
         doReturn(session).when(request).session();
-        doReturn(session).when(stateMachine).getSubscriptionSession(ID);
-        doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+        doReturn(session).when(stateMachine).lookupSubscriptionSession(ID);
+        doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(writeTx).delete(eq(LogicalDatastoreType.OPERATIONAL),
@@ -98,18 +98,17 @@ class DeleteSubscriptionRpcTest {
     void deleteSubscriptionWrongSessionTest() {
         doReturn(session).when(request).session();
         // return session different from request session
-        doReturn(null).when(stateMachine).getSubscriptionSession(ID);
-        doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+        doReturn(null).when(stateMachine).lookupSubscriptionSession(ID);
+        doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(response.capture());
-        assertEquals("Subscription with given id does not exist on this session",
-            response.getValue().getMessage());
+        assertEquals("Subscription with given id does not exist on this session", response.getValue().getMessage());
     }
 
     @Test
     void deleteSubscriptionWrongIDTest() {
-        doReturn(null).when(stateMachine).getSubscriptionState(ID);
+        doReturn(null).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(response.capture());
@@ -118,11 +117,10 @@ class DeleteSubscriptionRpcTest {
 
     @Test
     void deleteSubscriptionAlreadyEndedTest() {
-        doReturn(SubscriptionState.END).when(stateMachine).getSubscriptionState(ID);
+        doReturn(SubscriptionState.END).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(response.capture());
-        assertEquals("There is no active or suspended subscription with given ID.",
-            response.getValue().getMessage());
+        assertEquals("There is no active or suspended subscription with given ID.", response.getValue().getMessage());
     }
 }
index b7d850c7b6d96b1938b96cfeec4bc457b0fae208..2740cb1ef82257a8c76266f4ff5a1f863ff614a8 100644 (file)
@@ -81,7 +81,7 @@ class KillSubscriptionRpcTest {
 
         doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
         doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
-        doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+        doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(writeTx).delete(eq(LogicalDatastoreType.OPERATIONAL),
@@ -91,7 +91,7 @@ class KillSubscriptionRpcTest {
 
     @Test
     void killSubscriptionWrongIDTest() {
-        doReturn(null).when(stateMachine).getSubscriptionState(ID);
+        doReturn(null).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(response.capture());
@@ -100,7 +100,7 @@ class KillSubscriptionRpcTest {
 
     @Test
     void killSubscriptionAlreadyEndedTest() {
-        doReturn(SubscriptionState.END).when(stateMachine).getSubscriptionState(ID);
+        doReturn(SubscriptionState.END).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(response.capture());
index f60442ec0132d011d0a2397424496dc4cd949e39..5addb90dd5096effaadda1ef2c3988e265987d52 100644 (file)
@@ -92,8 +92,8 @@ class ModifySubscriptionRpcTest {
         doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
         doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
         doReturn(session).when(request).session();
-        doReturn(session).when(stateMachine).getSubscriptionSession(ID);
-        doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+        doReturn(session).when(stateMachine).lookupSubscriptionSession(ID);
+        doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(writeTx).merge(eq(LogicalDatastoreType.OPERATIONAL),
@@ -106,8 +106,8 @@ class ModifySubscriptionRpcTest {
     void modifySubscriptionWrongSessionTest() {
         doReturn(session).when(request).session();
         // return session different from request session
-        doReturn(null).when(stateMachine).getSubscriptionSession(ID);
-        doReturn(SubscriptionState.ACTIVE).when(stateMachine).getSubscriptionState(ID);
+        doReturn(null).when(stateMachine).lookupSubscriptionSession(ID);
+        doReturn(SubscriptionState.ACTIVE).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(responseCaptor.capture());
@@ -117,7 +117,7 @@ class ModifySubscriptionRpcTest {
 
     @Test
     void modifySubscriptionWrongIDTest() {
-        doReturn(null).when(stateMachine).getSubscriptionState(ID);
+        doReturn(null).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(responseCaptor.capture());
@@ -126,7 +126,7 @@ class ModifySubscriptionRpcTest {
 
     @Test
     void modifySubscriptionAlreadyEndedTest() {
-        doReturn(SubscriptionState.END).when(stateMachine).getSubscriptionState(ID);
+        doReturn(SubscriptionState.END).when(stateMachine).lookupSubscriptionState(ID);
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, INPUT));
         verify(request).completeWith(responseCaptor.capture());
index 2449fe6915ab6441cbd3a927596b36efeb1045e2..d257131f8af444752aa3dd700abba7f56d29d5b6 100644 (file)
@@ -10,9 +10,13 @@ package org.opendaylight.restconf.subscription;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.List;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.restconf.server.api.TransportSession;
@@ -23,29 +27,67 @@ class StateTransitionTest {
     @Mock
     private TransportSession session;
 
-    private SubscriptionStateMachine subscriptionStateMachine;
+    private final SubscriptionStateMachine stateMachine = new SubscriptionStateMachine();
 
     @BeforeEach
     void before() {
-        // initializing state machine
-        subscriptionStateMachine = new SubscriptionStateMachine();
-        subscriptionStateMachine.registerSubscription(session, Uint32.ONE);
-
         // Checking default stating state
-        assertEquals(SubscriptionState.START, subscriptionStateMachine.getSubscriptionState(Uint32.ONE));
+        stateMachine.registerSubscription(session, Uint32.ONE);
+        assertEquals(SubscriptionState.START, stateMachine.lookupSubscriptionState(Uint32.ONE));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void moveFromStartValid(final SubscriptionState newState) {
+        assertMoveTo(newState);
+    }
+
+    private static List<Arguments> moveFromStartValid() {
+        return List.of(Arguments.of(SubscriptionState.ACTIVE), Arguments.of(SubscriptionState.END));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void moveFromStartInvalid(final SubscriptionState newState) {
+        final var ex = assertThrows(IllegalStateException.class, () -> stateMachine.moveTo(Uint32.ONE, newState));
+        assertEquals("Subscription 1 cannot transition from START to " + newState, ex.getMessage());
+        assertEquals(SubscriptionState.START, stateMachine.lookupSubscriptionState(Uint32.ONE));
+    }
+
+    private static List<Arguments> moveFromStartInvalid() {
+        return List.of(Arguments.of(SubscriptionState.SUSPENDED), Arguments.of(SubscriptionState.SUSPENDED));
     }
 
     @Test
-    void transitionStateTest() {
-        // Transition state
-        subscriptionStateMachine.moveTo(Uint32.ONE, SubscriptionState.ACTIVE);
-        assertEquals(SubscriptionState.ACTIVE, subscriptionStateMachine.getSubscriptionState(Uint32.ONE));
+    void moveFromActiveToEnd() {
+        assertMoveTo(SubscriptionState.ACTIVE);
+        assertMoveTo(SubscriptionState.END);
     }
 
     @Test
-    void illegalTransitionStateTest() {
-        // Trying illegal state transition
-        assertThrows(IllegalStateException.class,
-            () -> subscriptionStateMachine.moveTo(Uint32.ONE, SubscriptionState.START));
+    void moveFromActiveToSuspededToEnd() {
+        assertMoveTo(SubscriptionState.ACTIVE);
+        assertMoveTo(SubscriptionState.SUSPENDED);
+        assertMoveTo(SubscriptionState.END);
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void moveToInvalid(final SubscriptionState first, final SubscriptionState second) {
+        assertMoveTo(first);
+        final var ex = assertThrows(IllegalStateException.class, () -> stateMachine.moveTo(Uint32.ONE, second));
+        assertEquals("Subscription 1 cannot transition from " + first + " to " + second, ex.getMessage());
+        assertEquals(first, stateMachine.lookupSubscriptionState(Uint32.ONE));
+    }
+
+    private static List<Arguments> moveToInvalid() {
+        return List.of(
+            Arguments.of(SubscriptionState.ACTIVE, SubscriptionState.START),
+            Arguments.of(SubscriptionState.END, SubscriptionState.SUSPENDED));
+    }
+
+    private void assertMoveTo(final SubscriptionState newState) {
+        stateMachine.moveTo(Uint32.ONE, newState);
+        assertEquals(newState, stateMachine.lookupSubscriptionState(Uint32.ONE));
     }
 }
index ce0a08e043c762e3a9e615218492eeb2438d929b..09c4be07525afdc983d953ff8bfaeb01f29dad46 100644 (file)
@@ -111,7 +111,7 @@ final class SubscriptionResourceInstance extends WebHostResourceInstance {
         if (!headers.contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, false)) {
             return new EmptyResponse(HttpResponseStatus.NOT_ACCEPTABLE);
         }
-        final var subscriptionState = machine.getSubscriptionState(Uint32.valueOf(subscriptionId));
+        final var subscriptionState = machine.lookupSubscriptionState(Uint32.valueOf(subscriptionId));
         if (subscriptionState == null) {
             LOG.debug("Subscription for id {} not found", subscriptionId);
             return EmptyResponse.NOT_FOUND;