From e1ed959a68f8039130be821c27e82e75b5d59e5f Mon Sep 17 00:00:00 2001
From: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Date: Mon, 9 Sep 2024 10:41:25 -0500
Subject: [PATCH] Sliding Sync: Get `bump_stamp` from new sliding sync tables
 because it's faster (#17658)

Get `bump_stamp` from [new sliding sync
tables](https://github.com/element-hq/synapse/pull/17512) which should
be faster (performance) than flipping through the latest events in the
room.
---
 changelog.d/17658.misc                        |   1 +
 synapse/handlers/sliding_sync/__init__.py     |  74 ++++--
 synapse/storage/databases/main/events.py      |  61 ++---
 .../storage/databases/main/sliding_sync.py    |  40 ++++
 tests/storage/test_sliding_sync_tables.py     | 213 +++++++++++++++++-
 5 files changed, 333 insertions(+), 56 deletions(-)
 create mode 100644 changelog.d/17658.misc

diff --git a/changelog.d/17658.misc b/changelog.d/17658.misc
new file mode 100644
index 0000000000..0bdbc1140d
--- /dev/null
+++ b/changelog.d/17658.misc
@@ -0,0 +1 @@
+Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster.
diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py
index 444cc32f36..7340c6ec05 100644
--- a/synapse/handlers/sliding_sync/__init__.py
+++ b/synapse/handlers/sliding_sync/__init__.py
@@ -1040,29 +1040,67 @@ class SlidingSyncHandler:
                     )
                 )
 
-        # By default, just choose the membership event position
+        # Figure out the last bump event in the room
+        #
+        # By default, just choose the membership event position for any non-join membership
         bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
-
-        # Figure out the last bump event in the room if we're in the room.
+        # If we're joined to the room, we need to find the last bump event before the
+        # `to_token`
         if room_membership_for_user_at_to_token.membership == Membership.JOIN:
-            last_bump_event_result = (
-                await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                    room_id,
-                    to_token.room_key,
-                    event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
-                )
+            # We can quickly query for the latest bump event in the room using the
+            # sliding sync tables.
+            latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
+                room_id
             )
 
-            # But if we found a bump event, use that instead
-            if last_bump_event_result is not None:
-                _, new_bump_event_pos = last_bump_event_result
+            min_to_token_position = to_token.room_key.stream
 
-                # If we've just joined a remote room, then the last bump event may
-                # have been backfilled (and so have a negative stream ordering).
-                # These negative stream orderings can't sensibly be compared, so
-                # instead we use the membership event position.
-                if new_bump_event_pos.stream > 0:
-                    bump_stamp = new_bump_event_pos.stream
+            # If we can rely on the new sliding sync tables and the `bump_stamp` is
+            # `None`, just fallback to the membership event position. This can happen
+            # when we've just joined a remote room and all the events are backfilled.
+            if (
+                # FIXME: The background job check can be removed once we bump
+                # `SCHEMA_COMPAT_VERSION` and run the foreground update for
+                # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
+                # (tracked by https://github.com/element-hq/synapse/issues/17623)
+                await self.store.have_finished_sliding_sync_background_jobs()
+                and latest_room_bump_stamp is None
+            ):
+                pass
+
+            # The `bump_stamp` stored in the database might be ahead of our token. Since
+            # `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure
+            # that's before the `to_token` in all scenarios. The only scenario we can be
+            # sure of is if the `bump_stamp` is totally before the minimum position from
+            # the token.
+            #
+            # We don't need to check if the background update has finished, as if the
+            # returned bump stamp is not None then it must be up to date.
+            elif (
+                latest_room_bump_stamp is not None
+                and latest_room_bump_stamp < min_to_token_position
+            ):
+                bump_stamp = latest_room_bump_stamp
+
+            # Otherwise, if it's within or after the `to_token`, we need to find the
+            # last bump event before the `to_token`.
+            else:
+                last_bump_event_result = (
+                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                        room_id,
+                        to_token.room_key,
+                        event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
+                    )
+                )
+                if last_bump_event_result is not None:
+                    _, new_bump_event_pos = last_bump_event_result
+
+                    # If we've just joined a remote room, then the last bump event may
+                    # have been backfilled (and so have a negative stream ordering).
+                    # These negative stream orderings can't sensibly be compared, so
+                    # instead we use the membership event position.
+                    if new_bump_event_pos.stream > 0:
+                        bump_stamp = new_bump_event_pos.stream
 
         unstable_expanded_timeline = False
         prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index d423d80efa..e5f63019fd 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -327,6 +327,13 @@ class PersistEventsStore:
 
         async with stream_ordering_manager as stream_orderings:
             for (event, _), stream in zip(events_and_contexts, stream_orderings):
+                # XXX: We can't rely on `stream_ordering`/`instance_name` being correct
+                # at this point. We could be working with events that were previously
+                # persisted as an `outlier` with one `stream_ordering` but are now being
+                # persisted again and de-outliered and are being assigned a different
+                # `stream_ordering` here that won't end up being used.
+                # `_update_outliers_txn()` will fix this discrepancy (always use the
+                # `stream_ordering` from the first time it was persisted).
                 event.internal_metadata.stream_ordering = stream
                 event.internal_metadata.instance_name = self._instance_name
 
@@ -470,11 +477,11 @@ class PersistEventsStore:
                 membership_infos_to_insert_membership_snapshots.append(
                     # XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here
                     # because we're sourcing the event from `events_and_contexts`, we
-                    # can't rely on `stream_ordering`/`instance_name` being correct. We
-                    # could be working with events that were previously persisted as an
-                    # `outlier` with one `stream_ordering` but are now being persisted
-                    # again and de-outliered and assigned a different `stream_ordering`
-                    # that won't end up being used. Since we call
+                    # can't rely on `stream_ordering`/`instance_name` being correct at
+                    # this point. We could be working with events that were previously
+                    # persisted as an `outlier` with one `stream_ordering` but are now
+                    # being persisted again and de-outliered and assigned a different
+                    # `stream_ordering` that won't end up being used. Since we call
                     # `_calculate_sliding_sync_table_changes()` before
                     # `_update_outliers_txn()` which fixes this discrepancy (always use
                     # the `stream_ordering` from the first time it was persisted), we're
@@ -591,11 +598,17 @@ class PersistEventsStore:
                         event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
                     )
                 )
-                bump_stamp_to_fully_insert = (
-                    most_recent_bump_event_pos_results[1].stream
-                    if most_recent_bump_event_pos_results is not None
-                    else None
-                )
+                if most_recent_bump_event_pos_results is not None:
+                    _, new_bump_event_pos = most_recent_bump_event_pos_results
+
+                    # If we've just joined a remote room, then the last bump event may
+                    # have been backfilled (and so have a negative stream ordering).
+                    # These negative stream orderings can't sensibly be compared, so
+                    # instead just leave it as `None` in the table and we will use their
+                    # membership event position as the bump event position in the
+                    # Sliding Sync API.
+                    if new_bump_event_pos.stream > 0:
+                        bump_stamp_to_fully_insert = new_bump_event_pos.stream
 
                 current_state_ids_map = dict(
                     await self.store.get_partial_filtered_current_state_ids(
@@ -2123,31 +2136,26 @@ class PersistEventsStore:
         if len(events_and_contexts) == 0:
             return
 
-        # We only update the sliding sync tables for non-backfilled events.
-        #
-        # Check if the first event is a backfilled event (with a negative
-        # `stream_ordering`). If one event is backfilled, we assume this whole batch was
-        # backfilled.
-        first_event_stream_ordering = events_and_contexts[0][
-            0
-        ].internal_metadata.stream_ordering
-        # This should exist for persisted events
-        assert first_event_stream_ordering is not None
-        if first_event_stream_ordering < 0:
-            return
-
         # Since the list is sorted ascending by `stream_ordering`, the last event should
         # have the highest `stream_ordering`.
         max_stream_ordering = events_and_contexts[-1][
             0
         ].internal_metadata.stream_ordering
+        # `stream_ordering` should be assigned for persisted events
+        assert max_stream_ordering is not None
+        # Check if the event is a backfilled event (with a negative `stream_ordering`).
+        # If one event is backfilled, we assume this whole batch was backfilled.
+        if max_stream_ordering < 0:
+            # We only update the sliding sync tables for non-backfilled events.
+            return
+
         max_bump_stamp = None
         for event, _ in reversed(events_and_contexts):
             # Sanity check that all events belong to the same room
             assert event.room_id == room_id
 
             if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
-                # This should exist for persisted events
+                # `stream_ordering` should be assigned for persisted events
                 assert event.internal_metadata.stream_ordering is not None
 
                 max_bump_stamp = event.internal_metadata.stream_ordering
@@ -2156,11 +2164,6 @@ class PersistEventsStore:
                 # matching bump event which should have the highest `stream_ordering`.
                 break
 
-        # We should have exited earlier if there were no events
-        assert (
-            max_stream_ordering is not None
-        ), "Expected to have a stream_ordering if we have events"
-
         # Handle updating the `sliding_sync_joined_rooms` table.
         #
         txn.execute(
diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py
index dc747d7ac0..83939d10b0 100644
--- a/synapse/storage/databases/main/sliding_sync.py
+++ b/synapse/storage/databases/main/sliding_sync.py
@@ -41,6 +41,46 @@ logger = logging.getLogger(__name__)
 
 
 class SlidingSyncStore(SQLBaseStore):
+    async def get_latest_bump_stamp_for_room(
+        self,
+        room_id: str,
+    ) -> Optional[int]:
+        """
+        Get the `bump_stamp` for the room.
+
+        The `bump_stamp` is the `stream_ordering` of the last event according to the
+        `bump_event_types`. This helps clients sort more readily without them needing to
+        pull in a bunch of the timeline to determine the last activity.
+        `bump_event_types` is a thing because for example, we don't want display name
+        changes to mark the room as unread and bump it to the top. For encrypted rooms,
+        we just have to consider any activity as a bump because we can't see the content
+        and the client has to figure it out for themselves.
+
+        This should only be called where the server is participating
+        in the room (someone local is joined).
+
+        Returns:
+            The `bump_stamp` for the room (which can be `None`).
+        """
+
+        return cast(
+            Optional[int],
+            await self.db_pool.simple_select_one_onecol(
+                table="sliding_sync_joined_rooms",
+                keyvalues={"room_id": room_id},
+                retcol="bump_stamp",
+                # FIXME: This should be `False` once we bump `SCHEMA_COMPAT_VERSION` and run the
+                # foreground update for
+                # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked
+                # by https://github.com/element-hq/synapse/issues/17623)
+                #
+                # The should be `allow_none=False` in the future because event though
+                # `bump_stamp` itself can be `None`, we should have a row in the
+                # `sliding_sync_joined_rooms` table for any joined room.
+                allow_none=True,
+            ),
+        )
+
     async def persist_per_connection_state(
         self,
         user_id: str,
diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py
index 621f46fff8..de80ad53cd 100644
--- a/tests/storage/test_sliding_sync_tables.py
+++ b/tests/storage/test_sliding_sync_tables.py
@@ -106,6 +106,12 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
         assert persist_events_store is not None
         self.persist_events_store = persist_events_store
 
+        persist_controller = self.hs.get_storage_controllers().persistence
+        assert persist_controller is not None
+        self.persist_controller = persist_controller
+
+        self.state_handler = self.hs.get_state_handler()
+
     def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]:
         """
         Return the rows from the `sliding_sync_joined_rooms` table.
@@ -260,10 +266,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
             )
         )
         context = EventContext.for_outlier(self.hs.get_storage_controllers())
-        persist_controller = self.hs.get_storage_controllers().persistence
-        assert persist_controller is not None
         persisted_event, _, _ = self.get_success(
-            persist_controller.persist_event(invite_event, context)
+            self.persist_controller.persist_event(invite_event, context)
         )
 
         self._remote_invite_count += 1
@@ -316,10 +320,8 @@ class SlidingSyncTablesTestCaseBase(HomeserverTestCase):
             )
         )
         context = EventContext.for_outlier(self.hs.get_storage_controllers())
-        persist_controller = self.hs.get_storage_controllers().persistence
-        assert persist_controller is not None
         persisted_event, _, _ = self.get_success(
-            persist_controller.persist_event(kick_event, context)
+            self.persist_controller.persist_event(kick_event, context)
         )
 
         return persisted_event
@@ -926,6 +928,201 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
             user2_snapshot,
         )
 
+    def test_joined_room_bump_stamp_backfill(self) -> None:
+        """
+        Test that `bump_stamp` ignores backfilled events, i.e. events with a
+        negative stream ordering.
+        """
+        user1_id = self.register_user("user1", "pass")
+        _user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote room
+        creator = "@user:other"
+        room_id = "!foo:other"
+        room_version = RoomVersions.V10
+        shared_kwargs = {
+            "room_id": room_id,
+            "room_version": room_version.identifier,
+        }
+
+        create_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[],
+                type=EventTypes.Create,
+                state_key="",
+                content={
+                    # The `ROOM_CREATOR` field could be removed if we used a room
+                    # version > 10 (in favor of relying on `sender`)
+                    EventContentFields.ROOM_CREATOR: creator,
+                    EventContentFields.ROOM_VERSION: room_version.identifier,
+                },
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        creator_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[create_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id],
+                type=EventTypes.Member,
+                state_key=creator,
+                content={"membership": Membership.JOIN},
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        room_name_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[creator_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+                type=EventTypes.Name,
+                state_key="",
+                content={
+                    EventContentFields.ROOM_NAME: "my super duper room",
+                },
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        # We add a message event as a valid "bump type"
+        msg_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[room_name_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+                type=EventTypes.Message,
+                content={"body": "foo", "msgtype": "m.text"},
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        invite_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[msg_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+                type=EventTypes.Member,
+                state_key=user1_id,
+                content={"membership": Membership.INVITE},
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+
+        remote_events_and_contexts = [
+            create_tuple,
+            creator_tuple,
+            room_name_tuple,
+            msg_tuple,
+            invite_tuple,
+        ]
+
+        # Ensure the local HS knows the room version
+        self.get_success(self.store.store_room(room_id, creator, False, room_version))
+
+        # Persist these events as backfilled events.
+        for event, context in remote_events_and_contexts:
+            self.get_success(
+                self.persist_controller.persist_event(event, context, backfilled=True)
+            )
+
+        # Now we join the local user to the room. We want to make this feel as close to
+        # the real `process_remote_join()` as possible but we'd like to avoid some of
+        # the auth checks that would be done in the real code.
+        #
+        # FIXME: The test was originally written using this less-real
+        # `persist_event(...)` shortcut but it would be nice to use the real remote join
+        # process in a `FederatingHomeserverTestCase`.
+        flawed_join_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[invite_tuple[0].event_id],
+                # This doesn't work correctly to create an `EventContext` that includes
+                # both of these state events. I assume it's because we're working on our
+                # local homeserver which has the remote state set as `outlier`. We have
+                # to create our own EventContext below to get this right.
+                auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
+                type=EventTypes.Member,
+                state_key=user1_id,
+                content={"membership": Membership.JOIN},
+                sender=user1_id,
+                **shared_kwargs,
+            )
+        )
+        # We have to create our own context to get the state set correctly. If we use
+        # the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
+        # table will only have the join event in it which should never happen in our
+        # real server.
+        join_event = flawed_join_tuple[0]
+        join_context = self.get_success(
+            self.state_handler.compute_event_context(
+                join_event,
+                state_ids_before_event={
+                    (e.type, e.state_key): e.event_id
+                    for e in [create_tuple[0], invite_tuple[0], room_name_tuple[0]]
+                },
+                partial_state=False,
+            )
+        )
+        join_event, _join_event_pos, _room_token = self.get_success(
+            self.persist_controller.persist_event(join_event, join_context)
+        )
+
+        # Make sure the tables are populated correctly
+        sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
+        self.assertIncludes(
+            set(sliding_sync_joined_rooms_results.keys()),
+            {room_id},
+            exact=True,
+        )
+        self.assertEqual(
+            sliding_sync_joined_rooms_results[room_id],
+            _SlidingSyncJoinedRoomResult(
+                room_id=room_id,
+                # This should be the last event in the room (the join membership)
+                event_stream_ordering=join_event.internal_metadata.stream_ordering,
+                # Since all of the bump events are backfilled, the `bump_stamp` should
+                # still be `None`. (and we will fallback to the users membership event
+                # position in the Sliding Sync API)
+                bump_stamp=None,
+                room_type=None,
+                # We still pick up state of the room even if it's backfilled
+                room_name="my super duper room",
+                is_encrypted=False,
+                tombstone_successor_room_id=None,
+            ),
+        )
+
+        sliding_sync_membership_snapshots_results = (
+            self._get_sliding_sync_membership_snapshots()
+        )
+        self.assertIncludes(
+            set(sliding_sync_membership_snapshots_results.keys()),
+            {
+                (room_id, user1_id),
+            },
+            exact=True,
+        )
+        self.assertEqual(
+            sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
+            _SlidingSyncMembershipSnapshotResult(
+                room_id=room_id,
+                user_id=user1_id,
+                sender=user1_id,
+                membership_event_id=join_event.event_id,
+                membership=Membership.JOIN,
+                event_stream_ordering=join_event.internal_metadata.stream_ordering,
+                has_known_state=True,
+                room_type=None,
+                room_name="my super duper room",
+                is_encrypted=False,
+                tombstone_successor_room_id=None,
+            ),
+        )
+
     @parameterized.expand(
         # Test both an insert an upsert into the
         # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise
@@ -1036,11 +1233,9 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
             context = self.get_success(unpersisted_context.persist(event))
             events_to_persist.append((event, context))
 
-            persist_controller = self.hs.get_storage_controllers().persistence
-            assert persist_controller is not None
             for event, context in events_to_persist:
                 self.get_success(
-                    persist_controller.persist_event(
+                    self.persist_controller.persist_event(
                         event,
                         context,
                     )
-- 
GitLab