diff --git a/changelog.d/17934.feature b/changelog.d/17934.feature
new file mode 100644
index 0000000000000000000000000000000000000000..f0e138a30ff327971bff4ce3e1f1f05732c0e094
--- /dev/null
+++ b/changelog.d/17934.feature
@@ -0,0 +1 @@
+Add a one-off task to delete old one-time-keys, to guard against us having old OTKs in the database that the client has long forgotten about.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 315461fefb379a544fa506173e3a0897d12e65a8..540995e0627a9e10e81ebc60811ed1ab7e4246e7 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -39,6 +39,8 @@ from synapse.replication.http.devices import ReplicationUploadKeysForUserRestSer
 from synapse.types import (
     JsonDict,
     JsonMapping,
+    ScheduledTask,
+    TaskStatus,
     UserID,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
@@ -70,6 +72,7 @@ class E2eKeysHandler:
         self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
         self._worker_lock_handler = hs.get_worker_locks_handler()
+        self._task_scheduler = hs.get_task_scheduler()
 
         federation_registry = hs.get_federation_registry()
 
@@ -116,6 +119,10 @@ class E2eKeysHandler:
             hs.config.experimental.msc3984_appservice_key_query
         )
 
+        self._task_scheduler.register_action(
+            self._delete_old_one_time_keys_task, "delete_old_otks"
+        )
+
     @trace
     @cancellable
     async def query_devices(
@@ -1574,6 +1581,45 @@ class E2eKeysHandler:
                     return True
         return False
 
+    async def _delete_old_one_time_keys_task(
+        self, task: ScheduledTask
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        """Scheduler task to delete old one time keys.
+
+        Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
+        that it could still have old OTKs that the client has dropped. This task is scheduled exactly once
+        by a database schema delta file, and it clears out old one-time-keys that look like they came from libolm.
+        """
+        last_user = task.result.get("from_user", "") if task.result else ""
+        while True:
+            # We process users in batches of 100
+            users, rowcount = await self.store.delete_old_otks_for_next_user_batch(
+                last_user, 100
+            )
+            if len(users) == 0:
+                # We're done!
+                return TaskStatus.COMPLETE, None, None
+
+            logger.debug(
+                "Deleted %i old one-time-keys for users '%s'..'%s'",
+                rowcount,
+                users[0],
+                users[-1],
+            )
+            last_user = users[-1]
+
+            # Store our progress
+            await self._task_scheduler.update_task(
+                task.id, result={"from_user": last_user}
+            )
+
+            # Sleep a little before doing the next user.
+            #
+            # matrix.org has about 15M users in the e2e_one_time_keys_json table
+            # (comprising 20M devices). We want this to take about a week, so we need
+            # to do about one batch of 100 users every 4 seconds.
+            await self.clock.sleep(4)
+
 
 def _check_cross_signing_key(
     key: JsonDict, user_id: str, key_type: str, signing_key: Optional[VerifyKey] = None
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 1fbc49e7c5ac1d5311abb7f19721a98abb15b079..3bb8fccb5e6b8e0c1b9460db24fda3574ed4b8aa 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1453,6 +1453,54 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             impl,
         )
 
+    async def delete_old_otks_for_next_user_batch(
+        self, after_user_id: str, number_of_users: int
+    ) -> Tuple[List[str], int]:
+        """Deletes old OTKs belonging to the next batch of users
+
+        Returns:
+            `(users, rows)`, where:
+             * `users` is the user IDs of the updated users. An empty list if we are done.
+             * `rows` is the number of deleted rows
+        """
+
+        def impl(txn: LoggingTransaction) -> Tuple[List[str], int]:
+            # Find a batch of users
+            txn.execute(
+                """
+                SELECT DISTINCT(user_id) FROM e2e_one_time_keys_json
+                    WHERE user_id > ?
+                    ORDER BY user_id
+                    LIMIT ?
+                """,
+                (after_user_id, number_of_users),
+            )
+            users = [row[0] for row in txn.fetchall()]
+            if len(users) == 0:
+                return users, 0
+
+            # Delete any old OTKs belonging to those users.
+            #
+            # We only actually consider OTKs whose key ID is 6 characters long. These
+            # keys were likely made by libolm rather than Vodozemac; libolm only kept
+            # 100 private OTKs, so was far more vulnerable than Vodozemac to throwing
+            # away keys prematurely.
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "user_id", users
+            )
+            sql = f"""
+                DELETE FROM e2e_one_time_keys_json
+                WHERE {clause} AND ts_added_ms < ? AND length(key_id) = 6
+                """
+            args.append(self._clock.time_msec() - (7 * 24 * 3600 * 1000))
+            txn.execute(sql, args)
+
+            return users, txn.rowcount
+
+        return await self.db_pool.runInteraction(
+            "delete_old_otks_for_next_user_batch", impl
+        )
+
 
 class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def __init__(
diff --git a/synapse/storage/schema/main/delta/88/05_drop_old_otks.sql.postgres b/synapse/storage/schema/main/delta/88/05_drop_old_otks.sql.postgres
new file mode 100644
index 0000000000000000000000000000000000000000..93a68836ee557becafdf69d7afd47cdacd20cf7d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/88/05_drop_old_otks.sql.postgres
@@ -0,0 +1,19 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
+-- that it could still have old OTKs that the client has dropped.
+--
+-- We create a scheduled task which will drop old OTKs, to flush them out.
+INSERT INTO scheduled_tasks(id, action, status, timestamp)
+    VALUES ('delete_old_otks_task', 'delete_old_otks', 'scheduled', extract(epoch from current_timestamp) * 1000);
diff --git a/synapse/storage/schema/main/delta/88/05_drop_old_otks.sql.sqlite b/synapse/storage/schema/main/delta/88/05_drop_old_otks.sql.sqlite
new file mode 100644
index 0000000000000000000000000000000000000000..cdc2b5d211c9c1abc7b2b65b350697921fcf4b02
--- /dev/null
+++ b/synapse/storage/schema/main/delta/88/05_drop_old_otks.sql.sqlite
@@ -0,0 +1,19 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
+-- that it could still have old OTKs that the client has dropped.
+--
+-- We create a scheduled task which will drop old OTKs, to flush them out.
+INSERT INTO scheduled_tasks(id, action, status, timestamp)
+    VALUES ('delete_old_otks_task', 'delete_old_otks', 'scheduled', strftime('%s', 'now') * 1000);
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index bca314db83c816f4f2895a9771db3443b8915489..e67efcc17f4c46a9972b2d5c9dde915c86d55b09 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -19,6 +19,7 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+import time
 from typing import Dict, Iterable
 from unittest import mock
 
@@ -1826,3 +1827,72 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
         )
         self.assertIs(exists, True)
         self.assertIs(replaceable_without_uia, False)
+
+    def test_delete_old_one_time_keys(self) -> None:
+        """Test the db migration that clears out old OTKs"""
+
+        # We upload two sets of keys, one just over a week ago, and one just less than
+        # a week ago. Each batch contains some keys that match the deletion pattern
+        # (key IDs of 6 chars), and some that do not.
+        #
+        # Finally, set the scheduled task going, and check what gets deleted.
+
+        user_id = "@user000:" + self.hs.hostname
+        device_id = "xyz"
+
+        # The scheduled task should be for "now" in real, wallclock time, so
+        # set the test reactor to just over a week ago.
+        self.reactor.advance(time.time() - 7.5 * 24 * 3600)
+
+        # Upload some keys
+        self.get_success(
+            self.handler.upload_keys_for_user(
+                user_id,
+                device_id,
+                {
+                    "one_time_keys": {
+                        # some keys to delete
+                        "alg1:AAAAAA": "key1",
+                        "alg2:AAAAAB": {"key": "key2", "signatures": {"k1": "sig1"}},
+                        # A key to *not* delete
+                        "alg2:AAAAAAAAAA": {"key": "key3"},
+                    }
+                },
+            )
+        )
+
+        # A day passes
+        self.reactor.advance(24 * 3600)
+
+        # Upload some more keys
+        self.get_success(
+            self.handler.upload_keys_for_user(
+                user_id,
+                device_id,
+                {
+                    "one_time_keys": {
+                        # some keys which match the pattern
+                        "alg1:BAAAAA": "key1",
+                        "alg2:BAAAAB": {"key": "key2", "signatures": {"k1": "sig1"}},
+                        # A key to *not* delete
+                        "alg2:BAAAAAAAAA": {"key": "key3"},
+                    }
+                },
+            )
+        )
+
+        # The rest of the week passes, which should set the scheduled task going.
+        self.reactor.advance(6.5 * 24 * 3600)
+
+        # Check what we're left with in the database
+        remaining_key_ids = {
+            row[0]
+            for row in self.get_success(
+                self.handler.store.db_pool.simple_select_list(
+                    "e2e_one_time_keys_json", None, ["key_id"]
+                )
+            )
+        }
+        self.assertEqual(
+            remaining_key_ids, {"AAAAAAAAAA", "BAAAAA", "BAAAAB", "BAAAAAAAAA"}
+        )