diff --git a/.ci/scripts/calculate_jobs.py b/.ci/scripts/calculate_jobs.py
index 15f2d94a810f56b261fabde5ad8345c7ffe0335c..ea278173db386eaac0d4d984d2b5f8517adb0778 100755
--- a/.ci/scripts/calculate_jobs.py
+++ b/.ci/scripts/calculate_jobs.py
@@ -36,11 +36,11 @@ IS_PR = os.environ["GITHUB_REF"].startswith("refs/pull/")
 # First calculate the various trial jobs.
 #
 # For PRs, we only run each type of test with the oldest Python version supported (which
-# is Python 3.8 right now)
+# is Python 3.9 right now)
 
 trial_sqlite_tests = [
     {
-        "python-version": "3.8",
+        "python-version": "3.9",
         "database": "sqlite",
         "extras": "all",
     }
@@ -53,12 +53,12 @@ if not IS_PR:
             "database": "sqlite",
             "extras": "all",
         }
-        for version in ("3.9", "3.10", "3.11", "3.12", "3.13")
+        for version in ("3.10", "3.11", "3.12", "3.13")
     )
 
 trial_postgres_tests = [
     {
-        "python-version": "3.8",
+        "python-version": "3.9",
         "database": "postgres",
         "postgres-version": "11",
         "extras": "all",
@@ -77,7 +77,7 @@ if not IS_PR:
 
 trial_no_extra_tests = [
     {
-        "python-version": "3.8",
+        "python-version": "3.9",
         "database": "sqlite",
         "extras": "",
     }
@@ -99,24 +99,24 @@ set_output("trial_test_matrix", test_matrix)
 
 # First calculate the various sytest jobs.
 #
-# For each type of test we only run on focal on PRs
+# For each type of test we only run on bullseye on PRs
 
 
 sytest_tests = [
     {
-        "sytest-tag": "focal",
+        "sytest-tag": "bullseye",
     },
     {
-        "sytest-tag": "focal",
+        "sytest-tag": "bullseye",
         "postgres": "postgres",
     },
     {
-        "sytest-tag": "focal",
+        "sytest-tag": "bullseye",
         "postgres": "multi-postgres",
         "workers": "workers",
     },
     {
-        "sytest-tag": "focal",
+        "sytest-tag": "bullseye",
         "postgres": "multi-postgres",
         "workers": "workers",
         "reactor": "asyncio",
@@ -127,11 +127,11 @@ if not IS_PR:
     sytest_tests.extend(
         [
             {
-                "sytest-tag": "focal",
+                "sytest-tag": "bullseye",
                 "reactor": "asyncio",
             },
             {
-                "sytest-tag": "focal",
+                "sytest-tag": "bullseye",
                 "postgres": "postgres",
                 "reactor": "asyncio",
             },
diff --git a/.ci/scripts/prepare_old_deps.sh b/.ci/scripts/prepare_old_deps.sh
index 580f87bbdfb2120f2a47cc48fa12d07d8b769b7b..3589be26f8cceaaa22ee8283622ee9420e5ebacc 100755
--- a/.ci/scripts/prepare_old_deps.sh
+++ b/.ci/scripts/prepare_old_deps.sh
@@ -1,5 +1,5 @@
 #!/usr/bin/env bash
-# this script is run by GitHub Actions in a plain `focal` container; it
+# this script is run by GitHub Actions in a plain `jammy` container; it
 # - installs the minimal system requirements, and poetry;
 # - patches the project definition file to refer to old versions only;
 # - creates a venv with these old versions using poetry; and finally
diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml
index b9e9a401b9eb2b86d0998266748ee3fb917aa9a3..3884b6d402fe50170cfc95e2ddd47b082c4579c3 100644
--- a/.github/workflows/latest_deps.yml
+++ b/.github/workflows/latest_deps.yml
@@ -132,9 +132,9 @@ jobs:
       fail-fast: false
       matrix:
         include:
-          - sytest-tag: focal
+          - sytest-tag: bullseye
 
-          - sytest-tag: focal
+          - sytest-tag: bullseye
             postgres: postgres
             workers: workers
             redis: redis
diff --git a/.github/workflows/release-artifacts.yml b/.github/workflows/release-artifacts.yml
index 1e2513b2899115efcf4e4d291f05265ebaa0eb08..d77d7792f0f67c18df9895f351708f365db43a0b 100644
--- a/.github/workflows/release-artifacts.yml
+++ b/.github/workflows/release-artifacts.yml
@@ -102,7 +102,7 @@ jobs:
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
-        os: [ubuntu-20.04, macos-12]
+        os: [ubuntu-22.04, macos-12]
         arch: [x86_64, aarch64]
         # is_pr is a flag used to exclude certain jobs from the matrix on PRs.
         # It is not read by the rest of the workflow.
@@ -144,7 +144,7 @@ jobs:
 
       - name: Only build a single wheel on PR
         if: startsWith(github.ref, 'refs/pull/')
-        run: echo "CIBW_BUILD="cp38-manylinux_${{ matrix.arch }}"" >> $GITHUB_ENV
+        run: echo "CIBW_BUILD="cp39-manylinux_${{ matrix.arch }}"" >> $GITHUB_ENV
 
       - name: Build wheels
         run: python -m cibuildwheel --output-dir wheelhouse
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 5586bd6d947202865916d44b18cfc8ed492f73fb..d91f9c291876644befaf0b1f32dea17b5b27367a 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -397,7 +397,7 @@ jobs:
     needs:
       - linting-done
       - changes
-    runs-on: ubuntu-20.04
+    runs-on: ubuntu-22.04
     steps:
       - uses: actions/checkout@v4
 
@@ -409,12 +409,12 @@ jobs:
       # their build dependencies
       - run: |
           sudo apt-get -qq update
-          sudo apt-get -qq install build-essential libffi-dev python-dev \
+          sudo apt-get -qq install build-essential libffi-dev python3-dev \
             libxml2-dev libxslt-dev xmlsec1 zlib1g-dev libjpeg-dev libwebp-dev
 
       - uses: actions/setup-python@v5
         with:
-          python-version: '3.8'
+          python-version: '3.9'
 
       - name: Prepare old deps
         if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true'
@@ -458,7 +458,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        python-version: ["pypy-3.8"]
+        python-version: ["pypy-3.9"]
         extras: ["all"]
 
     steps:
@@ -580,11 +580,11 @@ jobs:
     strategy:
       matrix:
         include:
-          - python-version: "3.8"
+          - python-version: "3.9"
             postgres-version: "11"
 
-          - python-version: "3.11"
-            postgres-version: "15"
+          - python-version: "3.13"
+            postgres-version: "17"
 
     services:
       postgres:
diff --git a/.github/workflows/twisted_trunk.yml b/.github/workflows/twisted_trunk.yml
index 76609c2118756c742f29286e2800eb2351570f83..cdaa00ef90bfd34466c58c207ea97f2f67a1c074 100644
--- a/.github/workflows/twisted_trunk.yml
+++ b/.github/workflows/twisted_trunk.yml
@@ -99,11 +99,11 @@ jobs:
     if: needs.check_repo.outputs.should_run_workflow == 'true'
     runs-on: ubuntu-latest
     container:
-      # We're using ubuntu:focal because it uses Python 3.8 which is our minimum supported Python version.
+      # We're using debian:bullseye because it uses Python 3.9 which is our minimum supported Python version.
       # This job is a canary to warn us about unreleased twisted changes that would cause problems for us if
       # they were to be released immediately. For simplicity's sake (and to save CI runners) we use the oldest
       # version, assuming that any incompatibilities on newer versions would also be present on the oldest.
-      image: matrixdotorg/sytest-synapse:focal
+      image: matrixdotorg/sytest-synapse:bullseye
       volumes:
         - ${{ github.workspace }}:/src
 
diff --git a/changelog.d/17902.misc b/changelog.d/17902.misc
new file mode 100644
index 0000000000000000000000000000000000000000..f094f57c2fccf449318f536010e82a1d228b61aa
--- /dev/null
+++ b/changelog.d/17902.misc
@@ -0,0 +1 @@
+Update version constraint to allow the latest poetry-core 1.9.1.
diff --git a/changelog.d/17903.bugfix b/changelog.d/17903.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..a4d02fc98307faaf800b012aa7bb2a37734c7bc1
--- /dev/null
+++ b/changelog.d/17903.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug in Synapse which could cause one-time keys to be issued in the incorrect order, causing message decryption failures.
diff --git a/changelog.d/17906.bugfix b/changelog.d/17906.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..f38ce6a5903253a6e89714997306a4273a1f838c
--- /dev/null
+++ b/changelog.d/17906.bugfix
@@ -0,0 +1 @@
+Fix tests to run with latest Twisted.
diff --git a/changelog.d/17907.bugfix b/changelog.d/17907.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..f38ce6a5903253a6e89714997306a4273a1f838c
--- /dev/null
+++ b/changelog.d/17907.bugfix
@@ -0,0 +1 @@
+Fix tests to run with latest Twisted.
diff --git a/changelog.d/17908.misc b/changelog.d/17908.misc
new file mode 100644
index 0000000000000000000000000000000000000000..8f17729148653cb8148d4156fd2d34dc556e38c1
--- /dev/null
+++ b/changelog.d/17908.misc
@@ -0,0 +1 @@
+Remove support for python 3.8.
diff --git a/changelog.d/17909.misc b/changelog.d/17909.misc
new file mode 100644
index 0000000000000000000000000000000000000000..f826aa79488179886af62330ac4130ce38d6fa35
--- /dev/null
+++ b/changelog.d/17909.misc
@@ -0,0 +1 @@
+Update the portdb CI to use Python 3.13 and Postgres 17 as latest dependencies.
\ No newline at end of file
diff --git a/changelog.d/17911.bugfix b/changelog.d/17911.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..f38ce6a5903253a6e89714997306a4273a1f838c
--- /dev/null
+++ b/changelog.d/17911.bugfix
@@ -0,0 +1 @@
+Fix tests to run with latest Twisted.
diff --git a/changelog.d/17915.bugfix b/changelog.d/17915.bugfix
new file mode 100644
index 0000000000000000000000000000000000000000..a5d82e486db032e3810c1e13695bf8fac5c6f7d0
--- /dev/null
+++ b/changelog.d/17915.bugfix
@@ -0,0 +1 @@
+Fix experimental support for [MSC4222](https://github.com/matrix-org/matrix-spec-proposals/pull/4222) where we would return the full state on incremental syncs when using lazy loaded members and there were no new events in the timeline.
diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md
index f079f61b484aea862389dd94604baea9b87112d7..d6efab96cfb11cb54e63127c5896e34869172043 100644
--- a/docs/development/contributing_guide.md
+++ b/docs/development/contributing_guide.md
@@ -322,7 +322,7 @@ The following command will let you run the integration test with the most common
 configuration:
 
 ```sh
-$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:focal
+$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:bullseye
 ```
 (Note that the paths must be full paths! You could also write `$(realpath relative/path)` if needed.)
 
diff --git a/docs/setup/installation.md b/docs/setup/installation.md
index 9cebb89b4d94207b68ea11174092d2bd136198d0..d717880aa5381c9376d213346bc734011ef2767c 100644
--- a/docs/setup/installation.md
+++ b/docs/setup/installation.md
@@ -208,7 +208,7 @@ When following this route please make sure that the [Platform-specific prerequis
 System requirements:
 
 - POSIX-compliant system (tested on Linux & OS X)
-- Python 3.8 or later, up to Python 3.11.
+- Python 3.9 or later, up to Python 3.13.
 - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
 
 If building on an uncommon architecture for which pre-built wheels are
diff --git a/docs/upgrade.md b/docs/upgrade.md
index 52b1adbe904f0afc3a59e9fe706c123234a3b259..ea9824a5ee94c25151f62cd69ad4f75eef28bdc8 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -117,6 +117,17 @@ each upgrade are complete before moving on to the next upgrade, to avoid
 stacking them up. You can monitor the currently running background updates with
 [the Admin API](usage/administration/admin_api/background_updates.html#status).
 
+# Upgrading to v1.119.0
+
+## Minimum supported Python version
+
+The minimum supported Python version has been increased from v3.8 to v3.9.
+You will need Python 3.9+ to run Synapse v1.119.0 (due out Nov 7th, 2024).
+
+If you use current versions of the Matrix.org-distributed Docker images, no action is required.
+Please note that support for Ubuntu `focal` was dropped as well since it uses Python 3.8.
+
+
 # Upgrading to v1.111.0
 
 ## New worker endpoints for authenticated client and federation media
diff --git a/mypy.ini b/mypy.ini
index 3fca15c01beb0816adede30072ff313750e94d60..cf64248cc52f29993e682b240aacfa211da0c87e 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -26,7 +26,7 @@ strict_equality = True
 
 # Run mypy type checking with the minimum supported Python version to catch new usage
 # that isn't backwards-compatible (types, overloads, etc).
-python_version = 3.8
+python_version = 3.9
 
 files =
   docker/,
diff --git a/poetry.lock b/poetry.lock
index 6a5845fd1ec607baf762bf013246ddeec1718f7d..16b7dc504eb45811423b99c4b490fc8a5ab620c9 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1,4 +1,4 @@
-# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
+# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
 
 [[package]]
 name = "annotated-types"
@@ -11,9 +11,6 @@ files = [
     {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"},
 ]
 
-[package.dependencies]
-typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.9\""}
-
 [[package]]
 name = "attrs"
 version = "24.2.0"
@@ -874,9 +871,7 @@ files = [
 
 [package.dependencies]
 attrs = ">=22.2.0"
-importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""}
 jsonschema-specifications = ">=2023.03.6"
-pkgutil-resolve-name = {version = ">=1.3.10", markers = "python_version < \"3.9\""}
 referencing = ">=0.28.4"
 rpds-py = ">=0.7.1"
 
@@ -896,7 +891,6 @@ files = [
 ]
 
 [package.dependencies]
-importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""}
 referencing = ">=0.28.0"
 
 [[package]]
@@ -912,7 +906,6 @@ files = [
 
 [package.dependencies]
 importlib-metadata = {version = ">=4.11.4", markers = "python_version < \"3.12\""}
-importlib-resources = {version = "*", markers = "python_version < \"3.9\""}
 "jaraco.classes" = "*"
 jeepney = {version = ">=0.4.2", markers = "sys_platform == \"linux\""}
 pywin32-ctypes = {version = ">=0.2.0", markers = "sys_platform == \"win32\""}
@@ -1571,17 +1564,6 @@ files = [
 [package.extras]
 testing = ["pytest", "pytest-cov"]
 
-[[package]]
-name = "pkgutil-resolve-name"
-version = "1.3.10"
-description = "Resolve a name to an object."
-optional = false
-python-versions = ">=3.6"
-files = [
-    {file = "pkgutil_resolve_name-1.3.10-py3-none-any.whl", hash = "sha256:ca27cc078d25c5ad71a9de0a7a330146c4e014c2462d9af19c6b828280649c5e"},
-    {file = "pkgutil_resolve_name-1.3.10.tar.gz", hash = "sha256:357d6c9e6a755653cfd78893817c0853af365dd51ec97f3d358a819373bbd174"},
-]
-
 [[package]]
 name = "prometheus-client"
 version = "0.21.0"
@@ -1948,7 +1930,6 @@ files = [
 [package.dependencies]
 cryptography = ">=3.1"
 defusedxml = "*"
-importlib-resources = {version = "*", markers = "python_version < \"3.9\""}
 pyopenssl = "*"
 python-dateutil = "*"
 pytz = "*"
@@ -2164,7 +2145,6 @@ files = [
 [package.dependencies]
 markdown-it-py = ">=2.2.0,<3.0.0"
 pygments = ">=2.13.0,<3.0.0"
-typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.9\""}
 
 [package.extras]
 jupyter = ["ipywidgets (>=7.5.1,<9)"]
@@ -3121,5 +3101,5 @@ user-search = ["pyicu"]
 
 [metadata]
 lock-version = "2.0"
-python-versions = "^3.8.0"
-content-hash = "eaded26b4770b9d19bfcee6dee8b96203df358ce51939d9b90fdbcf605e2f5fd"
+python-versions = "^3.9.0"
+content-hash = "0cd942a5193d01cbcef135a0bebd3fa0f12f7dbc63899d6f1c301e0649e9d902"
diff --git a/pyproject.toml b/pyproject.toml
index 33acff004da10c325c5aa3fb1dcce6edf35ccfa9..2cf4ffb548740df18c16ec8549ad8eef78596857 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -36,7 +36,7 @@
 
 [tool.ruff]
 line-length = 88
-target-version = "py38"
+target-version = "py39"
 
 [tool.ruff.lint]
 # See https://beta.ruff.rs/docs/rules/#error-e
@@ -155,7 +155,7 @@ synapse_review_recent_signups = "synapse._scripts.review_recent_signups:main"
 update_synapse_database = "synapse._scripts.update_synapse_database:main"
 
 [tool.poetry.dependencies]
-python = "^3.8.0"
+python = "^3.9.0"
 
 # Mandatory Dependencies
 # ----------------------
@@ -178,7 +178,7 @@ Twisted = {extras = ["tls"], version = ">=18.9.0"}
 treq = ">=15.1"
 # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
 pyOpenSSL = ">=16.0.0"
-PyYAML = ">=3.13"
+PyYAML = ">=5.3"
 pyasn1 = ">=0.1.9"
 pyasn1-modules = ">=0.0.7"
 bcrypt = ">=3.1.7"
@@ -241,7 +241,7 @@ authlib = { version = ">=0.15.1", optional = true }
 # `contrib/systemd/log_config.yaml`.
 # Note: systemd-python 231 appears to have been yanked from pypi
 systemd-python = { version = ">=231", optional = true }
-lxml = { version = ">=4.2.0", optional = true }
+lxml = { version = ">=4.5.2", optional = true }
 sentry-sdk = { version = ">=0.7.2", optional = true }
 opentracing = { version = ">=2.2.0", optional = true }
 jaeger-client = { version = ">=4.0.0", optional = true }
@@ -370,7 +370,7 @@ tomli = ">=1.2.3"
 # runtime errors caused by build system changes.
 # We are happy to raise these upper bounds upon request,
 # provided we check that it's safe to do so (i.e. that CI passes).
-requires = ["poetry-core>=1.1.0,<=1.9.0", "setuptools_rust>=1.3,<=1.8.1"]
+requires = ["poetry-core>=1.1.0,<=1.9.1", "setuptools_rust>=1.3,<=1.8.1"]
 build-backend = "poetry.core.masonry.api"
 
 
@@ -378,13 +378,13 @@ build-backend = "poetry.core.masonry.api"
 # Skip unsupported platforms (by us or by Rust).
 # See https://cibuildwheel.readthedocs.io/en/stable/options/#build-skip for the list of build targets.
 # We skip:
-#  - CPython 3.6 and 3.7: EOLed
-#  - PyPy 3.7: we only support Python 3.8+
+#  - CPython 3.6, 3.7 and 3.8: EOLed
+#  - PyPy 3.7 and 3.8: we only support Python 3.9+
 #  - musllinux i686: excluded to reduce number of wheels we build.
 #    c.f. https://github.com/matrix-org/synapse/pull/12595#discussion_r963107677
 #  - PyPy on Aarch64 and musllinux on aarch64: too slow to build.
 #    c.f. https://github.com/matrix-org/synapse/pull/14259
-skip = "cp36* cp37* pp37* *-musllinux_i686 pp*aarch64 *-musllinux_aarch64"
+skip = "cp36* cp37* cp38* pp37* pp38* *-musllinux_i686 pp*aarch64 *-musllinux_aarch64"
 
 # We need a rust compiler
 before-all =  "curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y --profile minimal"
diff --git a/scripts-dev/build_debian_packages.py b/scripts-dev/build_debian_packages.py
index 88c84194009cc21bed8a65469466e7474744d2fc..6ee695b2ba8f110ba51470fbab67783077e843bf 100755
--- a/scripts-dev/build_debian_packages.py
+++ b/scripts-dev/build_debian_packages.py
@@ -28,9 +28,8 @@ from typing import Collection, Optional, Sequence, Set
 # example)
 DISTS = (
     "debian:bullseye",  # (EOL ~2024-07) (our EOL forced by Python 3.9 is 2025-10-05)
-    "debian:bookworm",  # (EOL not specified yet) (our EOL forced by Python 3.11 is 2027-10-24)
-    "debian:sid",  # (EOL not specified yet) (our EOL forced by Python 3.11 is 2027-10-24)
-    "ubuntu:focal",  # 20.04 LTS (EOL 2025-04) (our EOL forced by Python 3.8 is 2024-10-14)
+    "debian:bookworm",  # (EOL 2026-06) (our EOL forced by Python 3.11 is 2027-10-24)
+    "debian:sid",  # (rolling distro, no EOL)
     "ubuntu:jammy",  # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04)
     "ubuntu:noble",  # 24.04 LTS (EOL 2029-06)
     "ubuntu:oracular",  # 24.10 (EOL 2025-07)
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 73b92f12beaeda9f70240af0672f5597cf4f0656..e7784ac5d75370528e05fa84b25074b502961c3a 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -39,8 +39,8 @@ ImageFile.LOAD_TRUNCATED_IMAGES = True
 # Note that we use an (unneeded) variable here so that pyupgrade doesn't nuke the
 # if-statement completely.
 py_version = sys.version_info
-if py_version < (3, 8):
-    print("Synapse requires Python 3.8 or above.")
+if py_version < (3, 9):
+    print("Synapse requires Python 3.9 or above.")
     sys.exit(1)
 
 # Allow using the asyncio reactor via env var.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f78e66ad0a1322dab9a38795cd77d75a549b0dbe..315461fefb379a544fa506173e3a0897d12e65a8 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -615,7 +615,7 @@ class E2eKeysHandler:
         3. Attempt to fetch fallback keys from the database.
 
         Args:
-            local_query: An iterable of tuples of (user ID, device ID, algorithm).
+            local_query: An iterable of tuples of (user ID, device ID, algorithm, number of keys).
             always_include_fallback_keys: True to always include fallback keys.
 
         Returns:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 204965afeec9425c920da60213b91348a18a4d49..df3010ecf689c14e2c57e7b4c64e69ab688b3eac 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -196,7 +196,9 @@ class MessageHandler:
             AuthError (403) if the user doesn't have permission to view
             members of this room.
         """
-        state_filter = state_filter or StateFilter.all()
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
         user_id = requester.user.to_string()
 
         if at_token:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index df9a088063fbfd24c370d3005c1519193e50d6ca..350c3fa09a190dcb845dd6ce9c6bf09de5a629b3 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1520,7 +1520,7 @@ class SyncHandler:
         if sync_config.use_state_after:
             delta_state_ids: MutableStateMap[str] = {}
 
-            if members_to_fetch is not None:
+            if members_to_fetch:
                 # We're lazy-loading, so the client might need some more member
                 # events to understand the events in this timeline. So we always
                 # fish out all the member events corresponding to the timeline
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index f047edee8e16c23170d7c9a1a33483346f31bfb1..ac34fa6525d567fc822c0c76b3a0d88140bbe0cf 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -39,7 +39,7 @@ from twisted.internet.endpoints import (
 )
 from twisted.internet.interfaces import (
     IPushProducer,
-    IReactorTCP,
+    IReactorTime,
     IStreamClientEndpoint,
 )
 from twisted.internet.protocol import Factory, Protocol
@@ -113,7 +113,7 @@ class RemoteHandler(logging.Handler):
         port: int,
         maximum_buffer: int = 1000,
         level: int = logging.NOTSET,
-        _reactor: Optional[IReactorTCP] = None,
+        _reactor: Optional[IReactorTime] = None,
     ):
         super().__init__(level=level)
         self.host = host
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index b50eb8868ec71f4edbab32b5fe84665243cb4d80..f28f5d7e0390a70e3a732ca6aa738d6a221e1854 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -234,8 +234,11 @@ class StateStorageController:
             RuntimeError if we don't have a state group for one or more of the events
                (ie they are outliers or unknown)
         """
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
         await_full_state = True
-        if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
+        if not state_filter.must_await_full_state(self._is_mine_id):
             await_full_state = False
 
         event_to_groups = await self.get_state_group_for_events(
@@ -244,7 +247,7 @@ class StateStorageController:
 
         groups = set(event_to_groups.values())
         group_to_state = await self.stores.state._get_state_for_groups(
-            groups, state_filter or StateFilter.all()
+            groups, state_filter
         )
 
         state_event_map = await self.stores.main.get_events(
@@ -292,10 +295,11 @@ class StateStorageController:
             RuntimeError if we don't have a state group for one or more of the events
                 (ie they are outliers or unknown)
         """
-        if (
-            await_full_state
-            and state_filter
-            and not state_filter.must_await_full_state(self._is_mine_id)
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
+        if await_full_state and not state_filter.must_await_full_state(
+            self._is_mine_id
         ):
             # Full state is not required if the state filter is restrictive enough.
             await_full_state = False
@@ -306,7 +310,7 @@ class StateStorageController:
 
         groups = set(event_to_groups.values())
         group_to_state = await self.stores.state._get_state_for_groups(
-            groups, state_filter or StateFilter.all()
+            groups, state_filter
         )
 
         event_to_state = {
@@ -335,9 +339,10 @@ class StateStorageController:
             RuntimeError if we don't have a state group for the event (ie it is an
                 outlier or is unknown)
         """
-        state_map = await self.get_state_for_events(
-            [event_id], state_filter or StateFilter.all()
-        )
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
+        state_map = await self.get_state_for_events([event_id], state_filter)
         return state_map[event_id]
 
     @trace
@@ -365,9 +370,12 @@ class StateStorageController:
             RuntimeError if we don't have a state group for the event (ie it is an
                 outlier or is unknown)
         """
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
         state_map = await self.get_state_ids_for_events(
             [event_id],
-            state_filter or StateFilter.all(),
+            state_filter,
             await_full_state=await_full_state,
         )
         return state_map[event_id]
@@ -388,9 +396,12 @@ class StateStorageController:
                 at the event and `state_filter` is not satisfied by partial state.
                 Defaults to `True`.
         """
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
         state_ids = await self.get_state_ids_for_event(
             event_id,
-            state_filter=state_filter or StateFilter.all(),
+            state_filter=state_filter,
             await_full_state=await_full_state,
         )
 
@@ -426,6 +437,9 @@ class StateStorageController:
                 at the last event in the room before `stream_position` and
                 `state_filter` is not satisfied by partial state. Defaults to `True`.
         """
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
         # FIXME: This gets the state at the latest event before the stream ordering,
         # which might not be the same as the "current state" of the room at the time
         # of the stream token if there were multiple forward extremities at the time.
@@ -442,7 +456,7 @@ class StateStorageController:
         if last_event_id:
             state = await self.get_state_after_event(
                 last_event_id,
-                state_filter=state_filter or StateFilter.all(),
+                state_filter=state_filter,
                 await_full_state=await_full_state,
             )
 
@@ -500,9 +514,10 @@ class StateStorageController:
         Returns:
             Dict of state group to state map.
         """
-        return await self.stores.state._get_state_for_groups(
-            groups, state_filter or StateFilter.all()
-        )
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
+        return await self.stores.state._get_state_for_groups(groups, state_filter)
 
     @trace
     @tag_args
@@ -583,12 +598,13 @@ class StateStorageController:
         Returns:
             The current state of the room.
         """
-        if await_full_state and (
-            not state_filter or state_filter.must_await_full_state(self._is_mine_id)
-        ):
+        if state_filter is None:
+            state_filter = StateFilter.all()
+
+        if await_full_state and state_filter.must_await_full_state(self._is_mine_id):
             await self._partial_state_room_tracker.await_full_state(room_id)
 
-        if state_filter and not state_filter.is_full():
+        if state_filter is not None and not state_filter.is_full():
             return await self.stores.main.get_partial_filtered_current_state_ids(
                 room_id, state_filter
             )
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 575aaf498baf96488af1dc38b565a2ea04bb1e0c..1fbc49e7c5ac1d5311abb7f19721a98abb15b079 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -99,6 +99,13 @@ class EndToEndKeyBackgroundStore(SQLBaseStore):
             unique=True,
         )
 
+        self.db_pool.updates.register_background_index_update(
+            update_name="add_otk_ts_added_index",
+            index_name="e2e_one_time_keys_json_user_id_device_id_algorithm_ts_added_idx",
+            table="e2e_one_time_keys_json",
+            columns=("user_id", "device_id", "algorithm", "ts_added_ms"),
+        )
+
 
 class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorkerStore):
     def __init__(
@@ -1122,7 +1129,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         """Take a list of one time keys out of the database.
 
         Args:
-            query_list: An iterable of tuples of (user ID, device ID, algorithm).
+            query_list: An iterable of tuples of (user ID, device ID, algorithm, number of keys).
 
         Returns:
             A tuple (results, missing) of:
@@ -1310,9 +1317,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             OTK was found.
         """
 
+        # Return the oldest keys from this device (based on `ts_added_ms`).
+        # Doing so means that keys are issued in the same order they were uploaded,
+        # which reduces the chances of a client expiring its copy of a (private)
+        # key while the public key is still on the server, waiting to be issued.
         sql = """
             SELECT key_id, key_json FROM e2e_one_time_keys_json
             WHERE user_id = ? AND device_id = ? AND algorithm = ?
+            ORDER BY ts_added_ms
             LIMIT ?
         """
 
@@ -1354,13 +1366,22 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             A list of tuples (user_id, device_id, algorithm, key_id, key_json)
             for each OTK claimed.
         """
+        # Find, delete, and return the oldest keys from each device (based on
+        # `ts_added_ms`).
+        #
+        # Doing so means that keys are issued in the same order they were uploaded,
+        # which reduces the chances of a client expiring its copy of a (private)
+        # key while the public key is still on the server, waiting to be issued.
         sql = """
             WITH claims(user_id, device_id, algorithm, claim_count) AS (
                 VALUES ?
             ), ranked_keys AS (
                 SELECT
                     user_id, device_id, algorithm, key_id, claim_count,
-                    ROW_NUMBER() OVER (PARTITION BY (user_id, device_id, algorithm)) AS r
+                    ROW_NUMBER() OVER (
+                        PARTITION BY (user_id, device_id, algorithm)
+                        ORDER BY ts_added_ms
+                    ) AS r
                 FROM e2e_one_time_keys_json
                     JOIN claims USING (user_id, device_id, algorithm)
             )
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 33569a4391e696dbbc35a71872ea037519dbddd2..cc3ce0951e7440344e447032d0999f95a00e9449 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2550,7 +2550,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             still contains events with partial state.
         """
         try:
-            async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
+            async with (
+                self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id
+            ):
                 await self.db_pool.runInteraction(
                     "clear_partial_state_room",
                     self._clear_partial_state_room_txn,
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 60312d770d43d8eecf5af79d9a823c6a68bf5df5..788f7d1e325aefd784059ac87a93306190abb3f5 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -572,10 +572,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             Map from type/state_key to event ID.
         """
+        if state_filter is None:
+            state_filter = StateFilter.all()
 
-        where_clause, where_args = (
-            state_filter or StateFilter.all()
-        ).make_sql_filter_clause()
+        where_clause, where_args = (state_filter).make_sql_filter_clause()
 
         if not where_clause:
             # We delegate to the cached version
@@ -584,7 +584,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         def _get_filtered_current_state_ids_txn(
             txn: LoggingTransaction,
         ) -> StateMap[str]:
-            results = StateMapWrapper(state_filter=state_filter or StateFilter.all())
+            results = StateMapWrapper(state_filter=state_filter)
 
             sql = """
                 SELECT type, state_key, event_id FROM current_state_events
@@ -681,7 +681,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         context: EventContext,
     ) -> None:
         """Update the state group for a partial state event"""
-        async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+        async with (
+            self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id
+        ):
             await self.db_pool.runInteraction(
                 "update_state_for_partial_state_event",
                 self._update_state_for_partial_state_event_txn,
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index ea7d8199a7d505a38723a9de945a9547a2021c75..f7824cba0f2139ec8af457c0052e7965ae6475d4 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -112,8 +112,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
         Returns:
             Map from state_group to a StateMap at that point.
         """
-
-        state_filter = state_filter or StateFilter.all()
+        if state_filter is None:
+            state_filter = StateFilter.all()
 
         results: Dict[int, MutableStateMap[str]] = {group: {} for group in groups}
 
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 875dba33496c27bf214c64e6a04b1c9289b56762..f7a59c8992db96abb8466d2df9c0cab70401f81e 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -284,7 +284,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
         Returns:
             Dict of state group to state map.
         """
-        state_filter = state_filter or StateFilter.all()
+        if state_filter is None:
+            state_filter = StateFilter.all()
 
         member_filter, non_member_filter = state_filter.get_member_split()
 
diff --git a/synapse/storage/schema/main/delta/88/03_add_otk_ts_added_index.sql b/synapse/storage/schema/main/delta/88/03_add_otk_ts_added_index.sql
new file mode 100644
index 0000000000000000000000000000000000000000..7712ea68ad67d9b767dbcbff0cefcf27e6302bd8
--- /dev/null
+++ b/synapse/storage/schema/main/delta/88/03_add_otk_ts_added_index.sql
@@ -0,0 +1,18 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+
+-- Add an index on (user_id, device_id, algorithm, ts_added_ms) on e2e_one_time_keys_json, so that OTKs can
+-- efficiently be issued in the same order they were uploaded.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8803, 'add_otk_ts_added_index', '{}');
diff --git a/synapse/types/state.py b/synapse/types/state.py
index 67d1c3fe9722cc502e14bdf88e6513255543eddd..e641215f184037d219a653ea21d1a948330f27ae 100644
--- a/synapse/types/state.py
+++ b/synapse/types/state.py
@@ -68,15 +68,23 @@ class StateFilter:
     include_others: bool = False
 
     def __attrs_post_init__(self) -> None:
-        # If `include_others` is set we canonicalise the filter by removing
-        # wildcards from the types dictionary
         if self.include_others:
+            # If `include_others` is set we canonicalise the filter by removing
+            # wildcards from the types dictionary
+
             # this is needed to work around the fact that StateFilter is frozen
             object.__setattr__(
                 self,
                 "types",
                 immutabledict({k: v for k, v in self.types.items() if v is not None}),
             )
+        else:
+            # Otherwise we remove entries where the value is the empty set.
+            object.__setattr__(
+                self,
+                "types",
+                immutabledict({k: v for k, v in self.types.items() if v is None or v}),
+            )
 
     @staticmethod
     def all() -> "StateFilter":
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 8a3dfdcf75c10b5180f272a3ef091a149b590fed..bca314db83c816f4f2895a9771db3443b8915489 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -151,18 +151,30 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
     def test_claim_one_time_key(self) -> None:
         local_user = "@boris:" + self.hs.hostname
         device_id = "xyz"
-        keys = {"alg1:k1": "key1"}
-
         res = self.get_success(
             self.handler.upload_keys_for_user(
-                local_user, device_id, {"one_time_keys": keys}
+                local_user, device_id, {"one_time_keys": {"alg1:k1": "key1"}}
             )
         )
         self.assertDictEqual(
             res, {"one_time_key_counts": {"alg1": 1, "signed_curve25519": 0}}
         )
 
-        res2 = self.get_success(
+        # Keys should be returned in the order they were uploaded. To test, advance time
+        # a little, then upload a second key with an earlier key ID; it should get
+        # returned second.
+        self.reactor.advance(1)
+        res = self.get_success(
+            self.handler.upload_keys_for_user(
+                local_user, device_id, {"one_time_keys": {"alg1:k0": "key0"}}
+            )
+        )
+        self.assertDictEqual(
+            res, {"one_time_key_counts": {"alg1": 2, "signed_curve25519": 0}}
+        )
+
+        # now claim both keys back. They should be in the same order
+        res = self.get_success(
             self.handler.claim_one_time_keys(
                 {local_user: {device_id: {"alg1": 1}}},
                 self.requester,
@@ -171,12 +183,27 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
             )
         )
         self.assertEqual(
-            res2,
+            res,
             {
                 "failures": {},
                 "one_time_keys": {local_user: {device_id: {"alg1:k1": "key1"}}},
             },
         )
+        res = self.get_success(
+            self.handler.claim_one_time_keys(
+                {local_user: {device_id: {"alg1": 1}}},
+                self.requester,
+                timeout=None,
+                always_include_fallback_keys=False,
+            )
+        )
+        self.assertEqual(
+            res,
+            {
+                "failures": {},
+                "one_time_keys": {local_user: {device_id: {"alg1:k0": "key0"}}},
+            },
+        )
 
     def test_claim_one_time_key_bulk(self) -> None:
         """Like test_claim_one_time_key but claims multiple keys in one handler call."""
@@ -336,6 +363,47 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
                     counts_by_alg, expected_counts_by_alg, f"{user_id}:{device_id}"
                 )
 
+    def test_claim_one_time_key_bulk_ordering(self) -> None:
+        """Keys returned by the bulk claim call should be returned in the correct order"""
+
+        # Alice has lots of keys, uploaded in a specific order
+        alice = f"@alice:{self.hs.hostname}"
+        alice_dev = "alice_dev_1"
+
+        self.get_success(
+            self.handler.upload_keys_for_user(
+                alice,
+                alice_dev,
+                {"one_time_keys": {"alg1:k20": 20, "alg1:k21": 21, "alg1:k22": 22}},
+            )
+        )
+        # Advance time by 1s, to ensure that there is a difference in upload time.
+        self.reactor.advance(1)
+        self.get_success(
+            self.handler.upload_keys_for_user(
+                alice,
+                alice_dev,
+                {"one_time_keys": {"alg1:k10": 10, "alg1:k11": 11, "alg1:k12": 12}},
+            )
+        )
+
+        # Now claim some, and check we get the right ones.
+        claim_res = self.get_success(
+            self.handler.claim_one_time_keys(
+                {alice: {alice_dev: {"alg1": 2}}},
+                self.requester,
+                timeout=None,
+                always_include_fallback_keys=False,
+            )
+        )
+        # We should get the first-uploaded keys, even though they have later key ids.
+        # We should get a random set of two of k20, k21, k22.
+        self.assertEqual(claim_res["failures"], {})
+        claimed_keys = claim_res["one_time_keys"]["@alice:test"]["alice_dev_1"]
+        self.assertEqual(len(claimed_keys), 2)
+        for key_id in claimed_keys.keys():
+            self.assertIn(key_id, ["alg1:k20", "alg1:k21", "alg1:k22"])
+
     def test_fallback_key(self) -> None:
         local_user = "@boris:" + self.hs.hostname
         device_id = "xyz"
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 9847893fce7474cf9d2ce3bc816764730196ac00..b64a8a86a2b872cab7774501ec55759dcd8246d3 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -661,9 +661,12 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase):
             )
         )
 
-        with patch.object(
-            fed_client, "make_membership_event", mock_make_membership_event
-        ), patch.object(fed_client, "send_join", mock_send_join):
+        with (
+            patch.object(
+                fed_client, "make_membership_event", mock_make_membership_event
+            ),
+            patch.object(fed_client, "send_join", mock_send_join),
+        ):
             # Join and check that our join event is rejected
             # (The join event is rejected because it doesn't have any signatures)
             join_exc = self.get_failure(
@@ -708,9 +711,12 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase):
         fed_handler = self.hs.get_federation_handler()
         store = self.hs.get_datastores().main
 
-        with patch.object(
-            fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room
-        ), patch.object(store, "is_partial_state_room", mock_is_partial_state_room):
+        with (
+            patch.object(
+                fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room
+            ),
+            patch.object(store, "is_partial_state_room", mock_is_partial_state_room),
+        ):
             # Start the partial state sync.
             fed_handler._start_partial_state_room_sync("hs1", {"hs2"}, "room_id")
             self.assertEqual(mock_sync_partial_state_room.call_count, 1)
@@ -760,9 +766,12 @@ class PartialJoinTestCase(unittest.FederatingHomeserverTestCase):
         fed_handler = self.hs.get_federation_handler()
         store = self.hs.get_datastores().main
 
-        with patch.object(
-            fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room
-        ), patch.object(store, "is_partial_state_room", mock_is_partial_state_room):
+        with (
+            patch.object(
+                fed_handler, "_sync_partial_state_room", mock_sync_partial_state_room
+            ),
+            patch.object(store, "is_partial_state_room", mock_is_partial_state_room),
+        ):
             # Start the partial state sync.
             fed_handler._start_partial_state_room_sync("hs1", {"hs2"}, "room_id")
             self.assertEqual(mock_sync_partial_state_room.call_count, 1)
diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py
index ad77356ede7b115d5faa7b6619f2a77125aed6c1..f43ce664839fee0e67504c5b279538e0da3d7b51 100644
--- a/tests/handlers/test_room_member.py
+++ b/tests/handlers/test_room_member.py
@@ -172,20 +172,25 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase):
             )
         )
 
-        with patch.object(
-            self.handler.federation_handler.federation_client,
-            "make_membership_event",
-            mock_make_membership_event,
-        ), patch.object(
-            self.handler.federation_handler.federation_client,
-            "send_join",
-            mock_send_join,
-        ), patch(
-            "synapse.event_auth._is_membership_change_allowed",
-            return_value=None,
-        ), patch(
-            "synapse.handlers.federation_event.check_state_dependent_auth_rules",
-            return_value=None,
+        with (
+            patch.object(
+                self.handler.federation_handler.federation_client,
+                "make_membership_event",
+                mock_make_membership_event,
+            ),
+            patch.object(
+                self.handler.federation_handler.federation_client,
+                "send_join",
+                mock_send_join,
+            ),
+            patch(
+                "synapse.event_auth._is_membership_change_allowed",
+                return_value=None,
+            ),
+            patch(
+                "synapse.handlers.federation_event.check_state_dependent_auth_rules",
+                return_value=None,
+            ),
         ):
             self.get_success(
                 self.handler.update_membership(
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 1960d2f0e1083f83be334784f1113c89d7106568..9dd0e98971bf2581d3a573f137205fbc2cf8378d 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -1262,3 +1262,35 @@ class SyncStateAfterTestCase(tests.unittest.HomeserverTestCase):
             )
         )
         self.assertEqual(state[("m.test_event", "")], second_state["event_id"])
+
+    def test_incremental_sync_lazy_loaded_no_timeline(self) -> None:
+        """Test that lazy-loading with an empty timeline doesn't return the full
+        state.
+
+        There was a bug where an empty state filter would cause the DB to return
+        the full state, rather than an empty set.
+        """
+        user = self.register_user("user", "password")
+        tok = self.login("user", "password")
+
+        # Create a room as the user and set some custom state.
+        joined_room = self.helper.create_room_as(user, tok=tok)
+
+        since_token = self.hs.get_event_sources().get_current_token()
+        end_stream_token = self.hs.get_event_sources().get_current_token()
+
+        state = self.get_success(
+            self.sync_handler._compute_state_delta_for_incremental_sync(
+                room_id=joined_room,
+                sync_config=generate_sync_config(user, use_state_after=True),
+                batch=TimelineBatch(
+                    prev_batch=end_stream_token, events=[], limited=True
+                ),
+                since_token=since_token,
+                end_token=end_stream_token,
+                members_to_fetch=set(),
+                timeline_state={},
+            )
+        )
+
+        self.assertEqual(state, {})
diff --git a/tests/http/server/_base.py b/tests/http/server/_base.py
index 731b0c4e59a43ac42c7e848d3da6af66af7fa129..dff5a5d262d11216bb6b9560fbcb275a5a85081d 100644
--- a/tests/http/server/_base.py
+++ b/tests/http/server/_base.py
@@ -27,6 +27,7 @@ from typing import (
     Callable,
     ContextManager,
     Dict,
+    Generator,
     List,
     Optional,
     Set,
@@ -49,7 +50,10 @@ from synapse.http.server import (
     respond_with_json,
 )
 from synapse.http.site import SynapseRequest
-from synapse.logging.context import LoggingContext, make_deferred_yieldable
+from synapse.logging.context import (
+    LoggingContext,
+    make_deferred_yieldable,
+)
 from synapse.types import JsonDict
 
 from tests.server import FakeChannel, make_request
@@ -199,7 +203,7 @@ def make_request_with_cancellation_test(
     #
     # We would like to trigger a cancellation at the first `await`, re-run the
     # request and cancel at the second `await`, and so on. By patching
-    # `Deferred.__next__`, we can intercept `await`s, track which ones we have or
+    # `Deferred.__await__`, we can intercept `await`s, track which ones we have or
     # have not seen, and force them to block when they wouldn't have.
 
     # The set of previously seen `await`s.
@@ -211,7 +215,7 @@ def make_request_with_cancellation_test(
     )
 
     for request_number in itertools.count(1):
-        deferred_patch = Deferred__next__Patch(seen_awaits, request_number)
+        deferred_patch = Deferred__await__Patch(seen_awaits, request_number)
 
         try:
             with mock.patch(
@@ -250,6 +254,8 @@ def make_request_with_cancellation_test(
                             )
 
                 if respond_mock.called:
+                    _log_for_request(request_number, "--- response finished ---")
+
                     # The request ran to completion and we are done with testing it.
 
                     # `respond_with_json` writes the response asynchronously, so we
@@ -311,8 +317,8 @@ def make_request_with_cancellation_test(
     assert False, "unreachable"  # noqa: B011
 
 
-class Deferred__next__Patch:
-    """A `Deferred.__next__` patch that will intercept `await`s and force them
+class Deferred__await__Patch:
+    """A `Deferred.__await__` patch that will intercept `await`s and force them
     to block once it sees a new `await`.
 
     When done with the patch, `unblock_awaits()` must be called to clean up after any
@@ -322,7 +328,7 @@ class Deferred__next__Patch:
 
     Usage:
         seen_awaits = set()
-        deferred_patch = Deferred__next__Patch(seen_awaits, 1)
+        deferred_patch = Deferred__await__Patch(seen_awaits, 1)
         try:
             with deferred_patch.patch():
                 # do things
@@ -335,14 +341,14 @@ class Deferred__next__Patch:
         """
         Args:
             seen_awaits: The set of stack traces of `await`s that have been previously
-                seen. When the `Deferred.__next__` patch sees a new `await`, it will add
+                seen. When the `Deferred.__await__` patch sees a new `await`, it will add
                 it to the set.
             request_number: The request number to log against.
         """
         self._request_number = request_number
         self._seen_awaits = seen_awaits
 
-        self._original_Deferred___next__ = Deferred.__next__  # type: ignore[misc,unused-ignore]
+        self._original_Deferred__await__ = Deferred.__await__  # type: ignore[misc,unused-ignore]
 
         # The number of `await`s on `Deferred`s we have seen so far.
         self.awaits_seen = 0
@@ -350,8 +356,13 @@ class Deferred__next__Patch:
         # Whether we have seen a new `await` not in `seen_awaits`.
         self.new_await_seen = False
 
+        # Whether to block new await points we see. This gets set to False once
+        # we have cancelled the request to allow things to run after
+        # cancellation.
+        self._block_new_awaits = True
+
         # To force `await`s on resolved `Deferred`s to block, we make up a new
-        # unresolved `Deferred` and return it out of `Deferred.__next__` /
+        # unresolved `Deferred` and return it out of `Deferred.__await__` /
         # `coroutine.send()`. We have to resolve it later, in case the `await`ing
         # coroutine is part of some shared processing, such as `@cached`.
         self._to_unblock: Dict[Deferred, Union[object, Failure]] = {}
@@ -360,15 +371,15 @@ class Deferred__next__Patch:
         self._previous_stack: List[inspect.FrameInfo] = []
 
     def patch(self) -> ContextManager[Mock]:
-        """Returns a context manager which patches `Deferred.__next__`."""
+        """Returns a context manager which patches `Deferred.__await__`."""
 
-        def Deferred___next__(
-            deferred: "Deferred[T]", value: object = None
-        ) -> "Deferred[T]":
-            """Intercepts `await`s on `Deferred`s and rigs them to block once we have
-            seen enough of them.
+        def Deferred___await__(
+            deferred: "Deferred[T]",
+        ) -> Generator["Deferred[T]", None, T]:
+            """Intercepts calls to `__await__`, which returns a generator
+            yielding deferreds that we await on.
 
-            `Deferred.__next__` will normally:
+            The generator for `__await__` will normally:
                 * return `self` if the `Deferred` is unresolved, in which case
                    `coroutine.send()` will return the `Deferred`, and
                    `_defer.inlineCallbacks` will stop running the coroutine until the
@@ -376,9 +387,43 @@ class Deferred__next__Patch:
                 * raise a `StopIteration(result)`, containing the result of the `await`.
                 * raise another exception, which will come out of the `await`.
             """
+
+            # Get the original generator.
+            gen = self._original_Deferred__await__(deferred)
+
+            # Run the generator, handling each iteration to see if we need to
+            # block.
+            try:
+                while True:
+                    # We've hit a new await point (or the deferred has
+                    # completed), handle it.
+                    handle_next_iteration(deferred)
+
+                    # Continue on.
+                    yield gen.send(None)
+            except StopIteration as e:
+                # We need to convert `StopIteration` into a normal return.
+                return e.value
+
+        def handle_next_iteration(
+            deferred: "Deferred[T]",
+        ) -> None:
+            """Intercepts `await`s on `Deferred`s and rigs them to block once we have
+            seen enough of them.
+
+            Args:
+                deferred: The deferred that we've captured and are intercepting
+                    `await` calls within.
+            """
+            if not self._block_new_awaits:
+                # We're no longer blocking awaits points
+                return
+
             self.awaits_seen += 1
 
-            stack = _get_stack(skip_frames=1)
+            stack = _get_stack(
+                skip_frames=2  # Ignore this function and `Deferred___await__` in stack trace
+            )
             stack_hash = _hash_stack(stack)
 
             if stack_hash not in self._seen_awaits:
@@ -389,20 +434,29 @@ class Deferred__next__Patch:
             if not self.new_await_seen:
                 # This `await` isn't interesting. Let it proceed normally.
 
+                _log_await_stack(
+                    stack,
+                    self._previous_stack,
+                    self._request_number,
+                    "already seen",
+                )
+
                 # Don't log the stack. It's been seen before in a previous run.
                 self._previous_stack = stack
 
-                return self._original_Deferred___next__(deferred, value)
+                return
 
             # We want to block at the current `await`.
             if deferred.called and not deferred.paused:
-                # This `Deferred` already has a result.
-                # We return a new, unresolved, `Deferred` for `_inlineCallbacks` to wait
-                # on. This blocks the coroutine that did this `await`.
+                # This `Deferred` already has a result. We chain a new,
+                # unresolved, `Deferred` to the end of this Deferred that it
+                # will wait on. This blocks the coroutine that did this `await`.
                 # We queue it up for unblocking later.
                 new_deferred: "Deferred[T]" = Deferred()
                 self._to_unblock[new_deferred] = deferred.result
 
+                deferred.addBoth(lambda _: make_deferred_yieldable(new_deferred))
+
                 _log_await_stack(
                     stack,
                     self._previous_stack,
@@ -411,7 +465,9 @@ class Deferred__next__Patch:
                 )
                 self._previous_stack = stack
 
-                return make_deferred_yieldable(new_deferred)
+                # Continue iterating on the deferred now that we've blocked it
+                # again.
+                return
 
             # This `Deferred` does not have a result yet.
             # The `await` will block normally, so we don't have to do anything.
@@ -423,9 +479,9 @@ class Deferred__next__Patch:
             )
             self._previous_stack = stack
 
-            return self._original_Deferred___next__(deferred, value)
+            return
 
-        return mock.patch.object(Deferred, "__next__", new=Deferred___next__)
+        return mock.patch.object(Deferred, "__await__", new=Deferred___await__)
 
     def unblock_awaits(self) -> None:
         """Unblocks any shared processing that we forced to block.
@@ -433,6 +489,9 @@ class Deferred__next__Patch:
         Must be called when done, otherwise processing shared between multiple requests,
         such as database queries started by `@cached`, will become permanently stuck.
         """
+        # Also disable blocking at future await points
+        self._block_new_awaits = False
+
         to_unblock = self._to_unblock
         self._to_unblock = {}
         for deferred, result in to_unblock.items():
diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py
index fc73f3dc2ac0270628f8abe2affa2588f1a168c3..16c12928128a729c32739e38d5a6b3eec8721680 100644
--- a/tests/push/test_bulk_push_rule_evaluator.py
+++ b/tests/push/test_bulk_push_rule_evaluator.py
@@ -120,9 +120,11 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
         #
         # We have seen stringy and null values for "room" in the wild, so presumably
         # some of this validation was missing in the past.
-        with patch("synapse.events.validator.validate_canonicaljson"), patch(
-            "synapse.events.validator.jsonschema.validate"
-        ), patch("synapse.handlers.event_auth.check_state_dependent_auth_rules"):
+        with (
+            patch("synapse.events.validator.validate_canonicaljson"),
+            patch("synapse.events.validator.jsonschema.validate"),
+            patch("synapse.handlers.event_auth.check_state_dependent_auth_rules"),
+        ):
             pl_event_id = self.helper.send_state(
                 self.room_id,
                 "m.room.power_levels",
diff --git a/tests/server.py b/tests/server.py
index 95aff6f66c410cd2f0502e6c9333d05db39ab40a..23c81203a5a4c7b47790ba142233ee73a17e4b48 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -58,6 +58,7 @@ import twisted
 from twisted.enterprise import adbapi
 from twisted.internet import address, tcp, threads, udp
 from twisted.internet._resolver import SimpleResolverComplexifier
+from twisted.internet.address import IPv4Address, IPv6Address
 from twisted.internet.defer import Deferred, fail, maybeDeferred, succeed
 from twisted.internet.error import DNSLookupError
 from twisted.internet.interfaces import (
@@ -73,6 +74,7 @@ from twisted.internet.interfaces import (
     IReactorPluggableNameResolver,
     IReactorTime,
     IResolverSimple,
+    ITCPTransport,
     ITransport,
 )
 from twisted.internet.protocol import ClientFactory, DatagramProtocol, Factory
@@ -780,7 +782,7 @@ def get_clock() -> Tuple[ThreadedMemoryReactorClock, Clock]:
     return clock, hs_clock
 
 
-@implementer(ITransport)
+@implementer(ITCPTransport)
 @attr.s(cmp=False, auto_attribs=True)
 class FakeTransport:
     """
@@ -809,12 +811,12 @@ class FakeTransport:
     will get called back for connectionLost() notifications etc.
     """
 
-    _peer_address: IAddress = attr.Factory(
+    _peer_address: Union[IPv4Address, IPv6Address] = attr.Factory(
         lambda: address.IPv4Address("TCP", "127.0.0.1", 5678)
     )
     """The value to be returned by getPeer"""
 
-    _host_address: IAddress = attr.Factory(
+    _host_address: Union[IPv4Address, IPv6Address] = attr.Factory(
         lambda: address.IPv4Address("TCP", "127.0.0.1", 1234)
     )
     """The value to be returned by getHost"""
@@ -826,10 +828,10 @@ class FakeTransport:
     producer: Optional[IPushProducer] = None
     autoflush: bool = True
 
-    def getPeer(self) -> IAddress:
+    def getPeer(self) -> Union[IPv4Address, IPv6Address]:
         return self._peer_address
 
-    def getHost(self) -> IAddress:
+    def getHost(self) -> Union[IPv4Address, IPv6Address]:
         return self._host_address
 
     def loseConnection(self) -> None:
@@ -939,6 +941,51 @@ class FakeTransport:
             logger.info("FakeTransport: Buffer now empty, completing disconnect")
             self.disconnected = True
 
+    ## ITCPTransport methods. ##
+
+    def loseWriteConnection(self) -> None:
+        """
+        Half-close the write side of a TCP connection.
+
+        If the protocol instance this is attached to provides
+        IHalfCloseableProtocol, it will get notified when the operation is
+        done. When closing write connection, as with loseConnection this will
+        only happen when buffer has emptied and there is no registered
+        producer.
+        """
+        raise NotImplementedError()
+
+    def getTcpNoDelay(self) -> bool:
+        """
+        Return if C{TCP_NODELAY} is enabled.
+        """
+        return False
+
+    def setTcpNoDelay(self, enabled: bool) -> None:
+        """
+        Enable/disable C{TCP_NODELAY}.
+
+        Enabling C{TCP_NODELAY} turns off Nagle's algorithm. Small packets are
+        sent sooner, possibly at the expense of overall throughput.
+        """
+        # Ignore setting this.
+
+    def getTcpKeepAlive(self) -> bool:
+        """
+        Return if C{SO_KEEPALIVE} is enabled.
+        """
+        return False
+
+    def setTcpKeepAlive(self, enabled: bool) -> None:
+        """
+        Enable/disable C{SO_KEEPALIVE}.
+
+        Enabling C{SO_KEEPALIVE} sends packets periodically when the connection
+        is otherwise idle, usually once every two hours. They are intended
+        to allow detection of lost peers in a non-infinite amount of time.
+        """
+        # Ignore setting this.
+
 
 def connect_client(
     reactor: ThreadedMemoryReactorClock, client_id: int
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index ed5f286243936c0dac052d1dbb9c9b20767e80de..38a56419f373cf569ae54d1ccf8bfa329695046f 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -1465,20 +1465,25 @@ class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
             )
         )
 
-        with patch.object(
-            self.room_member_handler.federation_handler.federation_client,
-            "make_membership_event",
-            mock_make_membership_event,
-        ), patch.object(
-            self.room_member_handler.federation_handler.federation_client,
-            "send_join",
-            mock_send_join,
-        ), patch(
-            "synapse.event_auth._is_membership_change_allowed",
-            return_value=None,
-        ), patch(
-            "synapse.handlers.federation_event.check_state_dependent_auth_rules",
-            return_value=None,
+        with (
+            patch.object(
+                self.room_member_handler.federation_handler.federation_client,
+                "make_membership_event",
+                mock_make_membership_event,
+            ),
+            patch.object(
+                self.room_member_handler.federation_handler.federation_client,
+                "send_join",
+                mock_send_join,
+            ),
+            patch(
+                "synapse.event_auth._is_membership_change_allowed",
+                return_value=None,
+            ),
+            patch(
+                "synapse.handlers.federation_event.check_state_dependent_auth_rules",
+                return_value=None,
+            ),
         ):
             self.get_success(
                 self.room_member_handler.update_membership(
diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py
index 350a2b7c8cdd13e9c62b7a76fcf07f5077ef7586..cfd2882410e7e30b7b62eb5915566be613b1d756 100644
--- a/tests/util/test_async_helpers.py
+++ b/tests/util/test_async_helpers.py
@@ -320,12 +320,19 @@ class ConcurrentlyExecuteTest(TestCase):
                 await concurrently_execute(callback, [1], 2)
             except _TestException as e:
                 tb = traceback.extract_tb(e.__traceback__)
-                # we expect to see "caller", "concurrently_execute", "callback",
-                # and some magic from inside ensureDeferred that happens when .fail
-                # is called.
+
+                # Remove twisted internals from the stack, as we don't care
+                # about the precise details.
+                tb = traceback.StackSummary(
+                    t for t in tb if "/twisted/" not in t.filename
+                )
+
+                # we expect to see "caller", "concurrently_execute" at the top of the stack
                 self.assertEqual(tb[0].name, "caller")
                 self.assertEqual(tb[1].name, "concurrently_execute")
-                self.assertEqual(tb[-2].name, "callback")
+                # ... some stack frames from the implementation of `concurrently_execute` ...
+                # and at the bottom of the stack we expect to see "callback"
+                self.assertEqual(tb[-1].name, "callback")
             else:
                 self.fail("No exception thrown")
 
diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py
index 13a4e6ddaa5af40449a604f7c5c58c86410a2ba4..c052ba2b75f8ee594f29e22b4788ca8dd8afe12b 100644
--- a/tests/util/test_check_dependencies.py
+++ b/tests/util/test_check_dependencies.py
@@ -109,10 +109,13 @@ class TestDependencyChecker(TestCase):
 
     def test_checks_ignore_dev_dependencies(self) -> None:
         """Both generic and per-extra checks should ignore dev dependencies."""
-        with patch(
-            "synapse.util.check_dependencies.metadata.requires",
-            return_value=["dummypkg >= 1; extra == 'mypy'"],
-        ), patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}):
+        with (
+            patch(
+                "synapse.util.check_dependencies.metadata.requires",
+                return_value=["dummypkg >= 1; extra == 'mypy'"],
+            ),
+            patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}),
+        ):
             # We're testing that none of these calls raise.
             with self.mock_installed_package(None):
                 check_requirements()
@@ -141,10 +144,13 @@ class TestDependencyChecker(TestCase):
 
     def test_check_for_extra_dependencies(self) -> None:
         """Complain if a package required for an extra is missing or old."""
-        with patch(
-            "synapse.util.check_dependencies.metadata.requires",
-            return_value=["dummypkg >= 1; extra == 'cool-extra'"],
-        ), patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}):
+        with (
+            patch(
+                "synapse.util.check_dependencies.metadata.requires",
+                return_value=["dummypkg >= 1; extra == 'cool-extra'"],
+            ),
+            patch("synapse.util.check_dependencies.RUNTIME_EXTRAS", {"cool-extra"}),
+        ):
             with self.mock_installed_package(None):
                 self.assertRaises(DependencyException, check_requirements, "cool-extra")
             with self.mock_installed_package(old):
diff --git a/tox.ini b/tox.ini
index 4cd9dfb966c754073d3f0280a4bc4456d0ec0a28..a506b5034d77589e9d00ebed6128d3cdb6654931 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
 [tox]
-envlist = py37, py38, py39, py310
+envlist = py39, py310, py311, py312, py313
 
 # we require tox>=2.3.2 for the fix to https://github.com/tox-dev/tox/issues/208
 minversion = 2.3.2