diff --git a/.github/workflows/create-wheels.yaml b/.github/workflows/create-wheels.yaml index ce81123d7c8..5ca98ea257c 100644 --- a/.github/workflows/create-wheels.yaml +++ b/.github/workflows/create-wheels.yaml @@ -22,6 +22,7 @@ jobs: python: - "cp310-* cp311-*" - "cp312-* cp313-*" + - "cp313t-*" wheel_mode: - compiled os: @@ -81,10 +82,11 @@ jobs: - name: Build compiled wheels if: ${{ matrix.wheel_mode == 'compiled' }} - uses: pypa/cibuildwheel@v2.22.0 + uses: pypa/cibuildwheel@v3.2.0 env: CIBW_ARCHS_LINUX: ${{ matrix.linux_archs }} CIBW_BUILD: ${{ matrix.python }} + CIBW_ENABLE: ${{ matrix.python == 'cp313t-*' && 'cpython-freethreading' || '' }} # setting it here does not work on linux # PYTHONNOUSERSITE: "1" diff --git a/.github/workflows/run-on-pr.yaml b/.github/workflows/run-on-pr.yaml index 00cacf48d68..90440b5919e 100644 --- a/.github/workflows/run-on-pr.yaml +++ b/.github/workflows/run-on-pr.yaml @@ -27,6 +27,7 @@ jobs: python-version: - "3.13" build-type: + - "cext-greenlet" - "cext" - "nocext" architecture: @@ -52,7 +53,7 @@ jobs: pip list - name: Run tests - run: tox -e github-${{ matrix.build-type }} -- -q --nomemory --notimingintensive ${{ matrix.pytest-args }} + run: tox -e github-${{ matrix.build-type }} -- ${{ matrix.pytest-args }} run-tox: name: ${{ matrix.tox-env }}-${{ matrix.python-version }} diff --git a/.github/workflows/run-test.yaml b/.github/workflows/run-test.yaml index 9818625603c..854248d0901 100644 --- a/.github/workflows/run-test.yaml +++ b/.github/workflows/run-test.yaml @@ -37,9 +37,16 @@ jobs: - "3.11" - "3.12" - "3.13" + - "3.13t" - "3.14.0-alpha - 3.14" + - "3.14t-dev" - "pypy-3.10" build-type: + # builds greenlet, runs asyncio tests. includes aiosqlite driver + - "cext-greenlet" + + # these do not install greenlet at all and skip asyncio tests. + # does not include aiosqlite driver - "cext" - "nocext" architecture: @@ -51,13 +58,34 @@ jobs: # autocommit tests fail on the ci for some reason - python-version: "pypy-3.10" pytest-args: "-k 'not test_autocommit_on and not test_turn_autocommit_off_via_default_iso_level and not test_autocommit_isolation_level'" - - os: "ubuntu-22.04" - pytest-args: "--dbdriver pysqlite --dbdriver aiosqlite" - - os: "ubuntu-22.04-arm" - pytest-args: "--dbdriver pysqlite --dbdriver aiosqlite" - exclude: + # cext-greenlet only runs on ubuntu x64 and arm64 + - build-type: "cext-greenlet" + os: "windows-latest" + - build-type: "cext-greenlet" + os: "windows-11-arm" + - build-type: "cext-greenlet" + os: "macos-latest" + - build-type: "cext-greenlet" + os: "macos-13" + + # the threaded pythons are not stable under greenlet. Even + # though we can run individual tests, when you run the whole suite + # with xdist and the greenlet wrapper, the workers keep crashing + # and getting replaced + - build-type: "cext-greenlet" + python-version: "3.13t" + + - build-type: "cext-greenlet" + python-version: "3.14t-dev" + + # skip py 3.14 on x64, because greenlet builds are not on + # pypi and these dont compile yet + - architecture: x64 + python-version: + - "3.14.0-alpha - 3.14" + - "3.14t-dev" # linux do not have x86 / arm64 python - os: "ubuntu-22.04" architecture: x86 @@ -112,12 +140,6 @@ jobs: python-version: ${{ matrix.python-version }} architecture: ${{ matrix.architecture }} - - name: Remove greenlet - if: ${{ matrix.no-greenlet == 'true' }} - shell: pwsh - run: | - (cat setup.cfg) | %{$_ -replace "^\s*greenlet.+",""} | set-content setup.cfg - - name: Install dependencies run: | python -m pip install --upgrade pip @@ -125,7 +147,10 @@ jobs: pip list - name: Run tests - run: tox -e github-${{ matrix.build-type }} -- -q --nomemory --notimingintensive ${{ matrix.pytest-args }} + run: tox -e github-${{ matrix.build-type }} -- ${{ matrix.pytest-args }} + env: + # under free threading, make sure to disable GIL + PYTHON_GIL: ${{ contains(matrix.python-version, 't') && '0' || '' }} continue-on-error: ${{ matrix.python-version == 'pypy-3.10' }} run-tox: @@ -137,8 +162,6 @@ jobs: os: - "ubuntu-22.04" python-version: - - "3.10" - - "3.11" - "3.12" - "3.13" tox-env: diff --git a/doc/build/changelog/unreleased_20/12881.rst b/doc/build/changelog/unreleased_20/12881.rst new file mode 100644 index 00000000000..5b9a5fe9084 --- /dev/null +++ b/doc/build/changelog/unreleased_20/12881.rst @@ -0,0 +1,14 @@ +.. change:: + :tags: bug, engine + :tickets: 12881 + + Implemented initial support for free-threaded Python by adding new tests + and reworking the test harness and GitHub Actions to include Python 3.13t + and Python 3.14t in test runs. Two concurrency issues have been identified + and fixed: the first involves initialization of the ``.c`` collection on a + ``FromClause``, a continuation of :ticket:`12302`, where an optional mutex + under free-threading is added; the second involves synchronization of the + pool "first_connect" event, which first received thread synchronization in + :ticket:`2964`, however under free-threading the creation of the mutex + itself runs under the same free-threading mutex. Initial pull request and + test suite courtesy Lysandros Nikolaou. diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py index 0e11df7d464..67c724a5ff8 100644 --- a/lib/sqlalchemy/event/attr.py +++ b/lib/sqlalchemy/event/attr.py @@ -343,11 +343,20 @@ def for_modify( obj = cast("_Dispatch[_ET]", obj) assert obj._instance_cls is not None - result = _ListenerCollection(self.parent, obj._instance_cls) - if getattr(obj, self.name) is self: - setattr(obj, self.name, result) - else: - assert isinstance(getattr(obj, self.name), _JoinedListener) + existing = getattr(obj, self.name) + + with util.mini_gil: + if existing is self or isinstance(existing, _JoinedListener): + result = _ListenerCollection(self.parent, obj._instance_cls) + else: + # this codepath is an extremely rare race condition + # that has been observed in test_pool.py->test_timeout_race + # with freethreaded. + assert isinstance(existing, _ListenerCollection) + return existing + + if existing is self: + setattr(obj, self.name, result) return result def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn: @@ -409,7 +418,7 @@ class _CompoundListener(_InstanceLevelDispatch[_ET]): "_is_asyncio", ) - _exec_once_mutex: _MutexProtocol + _exec_once_mutex: Optional[_MutexProtocol] parent_listeners: Collection[_ListenerFnType] listeners: Collection[_ListenerFnType] _exec_once: bool @@ -422,16 +431,23 @@ def __init__(self, *arg: Any, **kw: Any): def _set_asyncio(self) -> None: self._is_asyncio = True - def _memoized_attr__exec_once_mutex(self) -> _MutexProtocol: - if self._is_asyncio: - return AsyncAdaptedLock() - else: - return threading.Lock() + def _get_exec_once_mutex(self) -> _MutexProtocol: + with util.mini_gil: + if self._exec_once_mutex is not None: + return self._exec_once_mutex + + if self._is_asyncio: + mutex = AsyncAdaptedLock() + else: + mutex = threading.Lock() # type: ignore[assignment] + self._exec_once_mutex = mutex + + return mutex def _exec_once_impl( self, retry_on_exception: bool, *args: Any, **kw: Any ) -> None: - with self._exec_once_mutex: + with self._get_exec_once_mutex(): if not self._exec_once: try: self(*args, **kw) @@ -470,13 +486,15 @@ def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None: raised an exception. If _exec_w_sync_on_first_run was already called and didn't raise an - exception, then a mutex is not used. + exception, then a mutex is not used. It's not guaranteed + the mutex won't be used more than once in the case of very rare + race conditions. .. versionadded:: 1.4.11 """ if not self._exec_w_sync_once: - with self._exec_once_mutex: + with self._get_exec_once_mutex(): try: self(*args, **kw) except: @@ -538,6 +556,7 @@ def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]): parent.update_subclass(target_cls) self._exec_once = False self._exec_w_sync_once = False + self._exec_once_mutex = None self.parent_listeners = parent._clslevel[target_cls] self.parent = parent self.name = parent.name @@ -615,6 +634,8 @@ def __init__( local: _EmptyListener[_ET], ): self._exec_once = False + self._exec_w_sync_once = False + self._exec_once_mutex = None self.parent_dispatch = parent_dispatch self.name = name self.local = local diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index 4d72377c2bb..347ba0e144b 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -912,25 +912,28 @@ def c(self) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: return self._columns.as_readonly() def _setup_collections(self) -> None: - assert "_columns" not in self.__dict__ - assert "primary_key" not in self.__dict__ - assert "foreign_keys" not in self.__dict__ - - _columns: ColumnCollection[Any, Any] = ColumnCollection() - primary_key = ColumnSet() - foreign_keys: Set[KeyedColumnElement[Any]] = set() - - self._populate_column_collection( - columns=_columns, - primary_key=primary_key, - foreign_keys=foreign_keys, - ) + with util.mini_gil: + # detect another thread that raced ahead + if "_columns" in self.__dict__: + assert "primary_key" in self.__dict__ + assert "foreign_keys" in self.__dict__ + return + + _columns: ColumnCollection[Any, Any] = ColumnCollection() + primary_key = ColumnSet() + foreign_keys: Set[KeyedColumnElement[Any]] = set() + + self._populate_column_collection( + columns=_columns, + primary_key=primary_key, + foreign_keys=foreign_keys, + ) - # assigning these three collections separately is not itself atomic, - # but greatly reduces the surface for problems - self._columns = _columns - self.primary_key = primary_key # type: ignore - self.foreign_keys = foreign_keys # type: ignore + # assigning these three collections separately is not itself + # atomic, but greatly reduces the surface for problems + self._columns = _columns + self.primary_key = primary_key # type: ignore + self.foreign_keys = foreign_keys # type: ignore @util.ro_non_memoized_property def entity_namespace(self) -> _EntityNamespace: diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py index 0d90947e444..66b6b00996b 100644 --- a/lib/sqlalchemy/testing/profiling.py +++ b/lib/sqlalchemy/testing/profiling.py @@ -26,6 +26,7 @@ from . import config from .util import gc_collect +from ..util import freethreading from ..util import has_compiled_ext @@ -99,6 +100,8 @@ def platform_key(self): # keep it at 2.7, 3.1, 3.2, etc. for now. py_version = ".".join([str(v) for v in sys.version_info[0:2]]) + if freethreading: + py_version += "t" platform_tokens = [ platform.machine(), diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index 09c93e5c641..580579a2d96 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -1646,6 +1646,12 @@ def cpython(self): lambda: util.cpython, "cPython interpreter needed" ) + @property + def gil_enabled(self): + return exclusions.only_if( + lambda: not util.freethreading, "GIL-enabled build needed" + ) + @property def is64bit(self): return exclusions.only_if(lambda: util.is64bit, "64bit required") @@ -1668,7 +1674,7 @@ def predictable_gc(self): gc.collect() is called, as well as clean out unreferenced subclasses. """ - return self.cpython + self.only_linux + return self.cpython + self.gil_enabled @property def no_coverage(self): diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index aa1a4e90a84..e5e0111601b 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -2699,6 +2699,25 @@ def test_check_constraint_mixed(self, metadata, inspect_for_table): ], ) + def test_index_column_order(self, metadata, inspect_for_table): + """test for #12894""" + with inspect_for_table("sa_multi_index") as (schema, inspector): + test_table = Table( + "sa_multi_index", + metadata, + Column("Column1", Integer, primary_key=True), + Column("Column2", Integer), + Column("Column3", Integer), + ) + Index( + "Index_Example", + test_table.c.Column3, + test_table.c.Column1, + test_table.c.Column2, + ) + indexes = inspector.get_indexes("sa_multi_index") + eq_(indexes[0]["column_names"], ["Column3", "Column1", "Column2"]) + @testing.requires.indexes_with_expressions def test_reflect_expression_based_indexes(self, metadata, connection): t = Table( diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py index 167d135adbf..a61d980378c 100644 --- a/lib/sqlalchemy/util/__init__.py +++ b/lib/sqlalchemy/util/__init__.py @@ -55,10 +55,12 @@ from .compat import dataclass_fields as dataclass_fields from .compat import decode_backslashreplace as decode_backslashreplace from .compat import dottedgetter as dottedgetter +from .compat import freethreading as freethreading from .compat import has_refcount_gc as has_refcount_gc from .compat import inspect_getfullargspec as inspect_getfullargspec from .compat import is64bit as is64bit from .compat import local_dataclass_fields as local_dataclass_fields +from .compat import mini_gil as mini_gil from .compat import osx as osx from .compat import py311 as py311 from .compat import py312 as py312 diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py index a62bb6a2574..26e2210d640 100644 --- a/lib/sqlalchemy/util/compat.py +++ b/lib/sqlalchemy/util/compat.py @@ -18,6 +18,7 @@ import operator import platform import sys +import sysconfig import typing from typing import Any from typing import Callable @@ -38,6 +39,7 @@ py311 = sys.version_info >= (3, 11) pypy = platform.python_implementation() == "PyPy" cpython = platform.python_implementation() == "CPython" +freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) win32 = sys.platform.startswith("win") osx = sys.platform.startswith("darwin") @@ -250,3 +252,14 @@ def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: return [f for f in dataclasses.fields(cls) if f not in super_fields] else: return [] + + +if freethreading: + import threading + + mini_gil = threading.RLock() + """provide a threading.RLock() under python freethreading only""" +else: + import contextlib + + mini_gil = contextlib.nullcontext() # type: ignore[assignment] diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py index d5250f7bbbf..a7ea2c5ecb6 100644 --- a/lib/sqlalchemy/util/concurrency.py +++ b/lib/sqlalchemy/util/concurrency.py @@ -6,6 +6,8 @@ # the MIT License: https://www.opensource.org/licenses/mit-license.php # mypy: allow-untyped-defs, allow-untyped-calls +"""asyncio-related concurrency functions.""" + from __future__ import annotations import asyncio diff --git a/lib/sqlalchemy/util/langhelpers.py b/lib/sqlalchemy/util/langhelpers.py index 528723494c6..53d6cb1e8fe 100644 --- a/lib/sqlalchemy/util/langhelpers.py +++ b/lib/sqlalchemy/util/langhelpers.py @@ -1324,6 +1324,9 @@ class MemoizedSlots: This allows the functionality of memoized_property and memoized_instancemethod to be available to a class using __slots__. + The memoized get is not threadsafe under freethreading and the + creator method may in extremely rare cases be called more than once. + """ __slots__ = () @@ -1344,20 +1347,20 @@ def __getattr__(self, key: str) -> Any: setattr(self, key, value) return value elif hasattr(self.__class__, f"_memoized_method_{key}"): - fn = getattr(self, f"_memoized_method_{key}") + meth = getattr(self, f"_memoized_method_{key}") def oneshot(*args, **kw): - result = fn(*args, **kw) + result = meth(*args, **kw) def memo(*a, **kw): return result - memo.__name__ = fn.__name__ - memo.__doc__ = fn.__doc__ + memo.__name__ = meth.__name__ + memo.__doc__ = meth.__doc__ setattr(self, key, memo) return result - oneshot.__doc__ = fn.__doc__ + oneshot.__doc__ = meth.__doc__ return oneshot else: return self._fallback_getattr(key) diff --git a/setup.py b/setup.py index e0971fa30de..1c89be07257 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,10 @@ assert _cy_Extension is not None assert _cy_build_ext is not None - cython_directives = {"language_level": "3"} + cython_directives = { + "language_level": "3", + "freethreading_compatible": True, + } module_prefix = "sqlalchemy." source_prefix = "lib/sqlalchemy/" diff --git a/test/aaa_profiling/test_memusage.py b/test/aaa_profiling/test_memusage.py index d3e7dfb7c0e..f2809172380 100644 --- a/test/aaa_profiling/test_memusage.py +++ b/test/aaa_profiling/test_memusage.py @@ -1781,83 +1781,103 @@ def test_gced_delete_on_rollback(self, user_fixture): class WeakIdentityMapTest(_fixtures.FixtureTest): run_inserts = None + def run_up_to_n_times(self, fn, times): + error = None + for _ in range(times): + try: + fn() + except Exception as err: + error = err + continue + else: + break + else: + if error: + raise error + @testing.requires.predictable_gc def test_weakref(self): """test the weak-referencing identity map, which strongly- references modified items.""" users, User = self.tables.users, self.classes.User - - s = fixture_session() self.mapper_registry.map_imperatively(User, users) - gc_collect() - s.add(User(name="ed")) - s.flush() - assert not s.dirty + def go(): + with Session(testing.db) as s: + gc_collect() - user = s.query(User).one() + s.add(User(name="ed")) + s.flush() + assert not s.dirty - # heisenberg the GC a little bit, since #7823 caused a lot more - # GC when mappings are set up, larger test suite started failing - # on this being gc'ed - user_is = user._sa_instance_state - del user - gc_collect() - gc_collect() - gc_collect() - assert user_is.obj() is None + user = s.query(User).one() - assert len(s.identity_map) == 0 + # heisenberg the GC a little bit, since #7823 caused a lot more + # GC when mappings are set up, larger test suite started + # failing on this being gc'ed + user_is = user._sa_instance_state + del user + gc_collect() + gc_collect() + gc_collect() + assert user_is.obj() is None - user = s.query(User).one() - user.name = "fred" - del user - gc_collect() - assert len(s.identity_map) == 1 - assert len(s.dirty) == 1 - assert None not in s.dirty - s.flush() - gc_collect() - assert not s.dirty - assert not s.identity_map + assert len(s.identity_map) == 0 - user = s.query(User).one() - assert user.name == "fred" - assert s.identity_map + user = s.query(User).one() + user.name = "fred" + del user + gc_collect() + assert len(s.identity_map) == 1 + assert len(s.dirty) == 1 + assert None not in s.dirty + s.flush() + gc_collect() + assert not s.dirty + assert not s.identity_map + + user = s.query(User).one() + assert user.name == "fred" + assert s.identity_map + + self.run_up_to_n_times(go, 10) @testing.requires.predictable_gc def test_weakref_pickled(self): users, User = self.tables.users, pickleable.User - - s = fixture_session() self.mapper_registry.map_imperatively(User, users) - gc_collect() - s.add(User(name="ed")) - s.flush() - assert not s.dirty + def go(): + with Session(testing.db) as s: + gc_collect() - user = s.query(User).one() - user.name = "fred" - s.expunge(user) + s.add(User(name="ed")) + s.flush() + assert not s.dirty - u2 = pickle.loads(pickle.dumps(user)) + user = s.query(User).one() + user.name = "fred" + s.expunge(user) - del user - s.add(u2) + u2 = pickle.loads(pickle.dumps(user)) - del u2 - gc_collect() + del user + s.add(u2) - assert len(s.identity_map) == 1 - assert len(s.dirty) == 1 - assert None not in s.dirty - s.flush() - gc_collect() - assert not s.dirty + del u2 + gc_collect() - assert not s.identity_map + assert len(s.identity_map) == 1 + assert len(s.dirty) == 1 + assert None not in s.dirty + s.flush() + gc_collect() + assert not s.dirty + + assert not s.identity_map + + self.run_up_to_n_times(go, 10) @testing.requires.predictable_gc def test_weakref_with_cycles_o2m(self): @@ -1868,7 +1888,6 @@ def test_weakref_with_cycles_o2m(self): self.classes.User, ) - s = fixture_session() self.mapper_registry.map_imperatively( User, users, @@ -1877,27 +1896,39 @@ def test_weakref_with_cycles_o2m(self): self.mapper_registry.map_imperatively(Address, addresses) gc_collect() - s.add(User(name="ed", addresses=[Address(email_address="ed1")])) - s.commit() + def go(): + with Session(testing.db) as s: + s.add( + User(name="ed", addresses=[Address(email_address="ed1")]) + ) + s.commit() - user = s.query(User).options(joinedload(User.addresses)).one() - user.addresses[0].user # lazyload - eq_(user, User(name="ed", addresses=[Address(email_address="ed1")])) + user = s.query(User).options(joinedload(User.addresses)).one() + user.addresses[0].user # lazyload + eq_( + user, + User(name="ed", addresses=[Address(email_address="ed1")]), + ) - del user - gc_collect() - assert len(s.identity_map) == 0 + del user + gc_collect() + assert len(s.identity_map) == 0 - user = s.query(User).options(joinedload(User.addresses)).one() - user.addresses[0].email_address = "ed2" - user.addresses[0].user # lazyload - del user - gc_collect() - assert len(s.identity_map) == 2 + user = s.query(User).options(joinedload(User.addresses)).one() + user.addresses[0].email_address = "ed2" + user.addresses[0].user # lazyload + del user + gc_collect() + assert len(s.identity_map) == 2 - s.commit() - user = s.query(User).options(joinedload(User.addresses)).one() - eq_(user, User(name="ed", addresses=[Address(email_address="ed2")])) + s.commit() + user = s.query(User).options(joinedload(User.addresses)).one() + eq_( + user, + User(name="ed", addresses=[Address(email_address="ed2")]), + ) + + self.run_up_to_n_times(go, 10) @testing.requires.predictable_gc def test_weakref_with_cycles_o2o(self): diff --git a/test/aaa_profiling/test_threading.py b/test/aaa_profiling/test_threading.py new file mode 100644 index 00000000000..9aecdfdd465 --- /dev/null +++ b/test/aaa_profiling/test_threading.py @@ -0,0 +1,291 @@ +import random +import threading +import time + +import sqlalchemy as sa +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import testing +from sqlalchemy.orm import scoped_session +from sqlalchemy.orm import sessionmaker +from sqlalchemy.testing import eq_ +from sqlalchemy.testing import fixtures +from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import Table + +NUM_THREADS = 10 +ITERATIONS = 10 + + +class _ThreadTest: + def run_threaded( + self, + func, + *thread_args, + nthreads=NUM_THREADS, + use_barrier=False, + **thread_kwargs, + ): + barrier = threading.Barrier(nthreads) + results = [] + errors = [] + + def thread_func(*args, **kwargs): + thread_name = threading.current_thread().name + if use_barrier: + barrier.wait() + + local_result = [] + try: + func(local_result, thread_name, *args, **kwargs) + results.append(tuple(local_result)) + except Exception as e: + # raise + errors.append((thread_name, repr(e))) + + threads = [ + threading.Thread( + name=f"thread-{i}", + target=thread_func, + args=thread_args, + kwargs=thread_kwargs, + ) + for i in range(nthreads) + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + return results, errors + + @testing.fixture + def num_threads_engine(self, testing_engine): + return testing_engine(options=dict(pool_size=NUM_THREADS)) + + +@testing.add_to_marker.timing_intensive +class EngineThreadSafetyTest(_ThreadTest, fixtures.TablesTest): + run_dispose_bind = "once" + + __requires__ = ("multithreading_support",) + + @classmethod + def define_tables(cls, metadata): + Table( + "test_table", + metadata, + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True + ), + Column("thread_id", Integer), + Column("data", String(50)), + ) + + @testing.combinations( + (NUM_THREADS, 0), + (3, 5), + (3, 0), + (7, 0), + argnames="pool_size, max_overflow", + ) + def test_engine_thread_safe(self, testing_engine, pool_size, max_overflow): + """Test that a single Engine can be safely shared across threads.""" + test_table = self.tables.test_table + + engine = testing_engine( + options=dict(pool_size=pool_size, max_overflow=max_overflow) + ) + + def worker(results, thread_name): + for _ in range(ITERATIONS): + with engine.connect() as conn: + conn.execute( + test_table.insert(), + {"data": thread_name}, + ) + conn.commit() + + result = conn.execute( + sa.select(test_table.c.data).where( + test_table.c.data == thread_name + ) + ).scalar() + results.append(result) + + results, errors = self.run_threaded(worker) + + eq_(errors, []) + eq_( + set(results), + { + tuple([f"thread-{i}" for j in range(ITERATIONS)]) + for i in range(NUM_THREADS) + }, + ) + + def test_metadata_thread_safe(self, num_threads_engine): + """Test that MetaData objects are thread-safe for reads.""" + metadata = sa.MetaData() + + for thread_id in range(NUM_THREADS): + Table( + f"thread-{thread_id}", + metadata, + Column("id", Integer, primary_key=True), + Column("data", String(50)), + ) + + metadata.create_all(testing.db) + + def worker(results, thread_name): + table_key = thread_name + assert table_key in metadata.tables, f"{table_key} does not exist" + with num_threads_engine.connect() as conn: + # Will raise if it cannot connect so erros will be populated + conn.execute(sa.select(metadata.tables[table_key])) + + _, errors = self.run_threaded(worker) + eq_(errors, []) + + +@testing.add_to_marker.timing_intensive +class SessionThreadingTest(_ThreadTest, fixtures.MappedTest): + run_dispose_bind = "once" + + __requires__ = ("multithreading_support",) + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True + ), + Column("name", String(50)), + Column("thread_id", String(50)), + ) + + @classmethod + def setup_classes(cls): + class User(cls.Comparable): + pass + + def test_sessionmaker_thread_safe(self, num_threads_engine): + """Test that sessionmaker factory is thread-safe.""" + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + # Single sessionmaker shared across threads + SessionFactory = sessionmaker(num_threads_engine) + + def worker(results, thread_name): + thread_id = thread_name + + for _ in range(ITERATIONS): + with SessionFactory() as session: + for i in range(3): + user = User( + name=f"user_{thread_id}_{i}", thread_id=thread_id + ) + session.add(user) + session.commit() + + count = ( + session.query(User) + .filter_by(thread_id=thread_id) + .count() + ) + results.append(count) + + results, errors = self.run_threaded(worker) + + eq_(errors, []) + eq_( + results, + [ + tuple(range(3, 3 * ITERATIONS + 3, 3)) + for _ in range(NUM_THREADS) + ], + ) + + def test_scoped_session_thread_local(self, num_threads_engine): + """Test that scoped_session provides thread-local sessions.""" + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + # Create scoped session + Session = scoped_session(sessionmaker(num_threads_engine)) + + session_ids = {} + + def worker(results, thread_name): + thread_id = thread_name + + session = Session() + session_ids[thread_id] = id(session) + session.close() + + for _ in range(ITERATIONS): + user = User( + name=f"scoped_user_{thread_id}", thread_id=thread_id + ) + Session.add(user) + Session.commit() + + session2 = Session() + assert id(session2) == session_ids[thread_id] + session2.close() + + count = ( + Session.query(User).filter_by(thread_id=thread_id).count() + ) + results.append(count) + Session.remove() + + results, errors = self.run_threaded(worker) + + eq_(errors, []) + unique_sessions = set(session_ids.values()) + eq_(len(unique_sessions), NUM_THREADS) + eq_( + results, + [tuple(range(1, ITERATIONS + 1)) for _ in range(NUM_THREADS)], + ) + + +@testing.add_to_marker.timing_intensive +class FromClauseConcurrencyTest(_ThreadTest, fixtures.TestBase): + """test for issue #12302""" + + @testing.variation("collection", ["c", "primary_key", "foreign_keys"]) + def test_c_collection(self, collection): + dictionary_meta = MetaData() + all_indexes_table = Table( + "all_indexes", + dictionary_meta, + *[Column(f"col{i}", Integer) for i in range(50)], + ) + + def use_table(results, errors): + for i in range(3): + time.sleep(random.random() * 0.0001) + if collection.c: + all_indexes.c.col35 + elif collection.primary_key: + all_indexes.primary_key + elif collection.foreign_keys: + all_indexes.foreign_keys + + for j in range(1000): + all_indexes = all_indexes_table.alias("a_indexes") + + results, errors = self.run_threaded( + use_table, use_barrier=False, nthreads=5 + ) + + eq_(errors, []) + eq_(len(results), 5) diff --git a/test/base/test_events.py b/test/base/test_events.py index ccb53f2bb37..cc21708b778 100644 --- a/test/base/test_events.py +++ b/test/base/test_events.py @@ -854,6 +854,7 @@ class Target: self.Target = Target + @testing.requires.predictable_gc def test_subclass(self): class SubTarget(self.Target): pass diff --git a/test/base/test_tutorials.py b/test/base/test_tutorials.py index 672606dfeac..616da4d5817 100644 --- a/test/base/test_tutorials.py +++ b/test/base/test_tutorials.py @@ -177,6 +177,9 @@ def test_orm_queryguide_columns(self): def test_orm_quickstart(self): self._run_doctest("orm/quickstart.rst") + # this crashes on 3.13t but passes on 3.14t. + # just requiring non-freethreaded for now + @requires.gil_enabled @requires.greenlet def test_asyncio(self): try: diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 33abfed4d27..151b7633db1 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -965,6 +965,7 @@ def on_connect(dbapi_con, rec): evt.connect() def checkout(): + barrier.wait() for j in range(2): c1 = pool.connect() time.sleep(0.02) @@ -979,6 +980,7 @@ def checkout(): # any of the connections get returned. so first_connect() # sleeps for one second, then pings the mock. the threads should # not have made it to the "checkout() event for that one second. + barrier = threading.Barrier(5) for i in range(5): th = threading.Thread(target=checkout) th.start() @@ -1110,6 +1112,7 @@ def test_timeout_race(self): timeouts = [] def checkout(): + barrier.wait() for x in range(1): now = time.time() try: @@ -1120,6 +1123,7 @@ def checkout(): time.sleep(4) c1.close() + barrier = threading.Barrier(10) threads = [] for i in range(10): th = threading.Thread(target=checkout) diff --git a/test/profiles.txt b/test/profiles.txt index 512c700249c..641fae49233 100644 --- a/test/profiles.txt +++ b/test/profiles.txt @@ -1,15 +1,15 @@ # /home/classic/dev/sqlalchemy/test/profiles.txt # This file is written out on a per-environment basis. -# For each test in aaa_profiling, the corresponding function and +# For each test in aaa_profiling, the corresponding function and # environment is located within this file. If it doesn't exist, # the test is skipped. -# If a callcount does exist, it is compared to what we received. +# If a callcount does exist, it is compared to what we received. # assertions are raised if the counts do not match. -# -# To add a new callcount test, apply the function_call_count -# decorator and re-run the tests using the --write-profiles +# +# To add a new callcount test, apply the function_call_count +# decorator and re-run the tests using the --write-profiles # option - this file will be rewritten including the new count. -# +# # TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert @@ -436,10 +436,10 @@ test.aaa_profiling.test_orm.WithExpresionLoaderOptTest.test_from_opt_no_cache x8 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 78 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 78 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 79 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 79 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 72 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 72 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 73 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 73 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect diff --git a/test/requirements.py b/test/requirements.py index d69bacaca53..29051536d3e 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -408,7 +408,13 @@ def predictable_gc(self): gc.collect() is called, as well as clean out unreferenced subclasses. """ - return self.cpython + self.only_linux + skip_if("+aiosqlite") + return self.cpython + self.gil_enabled + skip_if("+aiosqlite") + + @property + def multithreading_support(self): + """target platform allows use of threads without any problem""" + + return skip_if("+aiosqlite") + skip_if(self.sqlite_memory) @property def memory_process_intensive(self): diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 91eacd55bba..7451de6747b 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,9 +1,6 @@ """Test various algorithmic properties of selectables.""" from itertools import zip_longest -import random -import threading -import time from sqlalchemy import and_ from sqlalchemy import bindparam @@ -4061,39 +4058,3 @@ def test_copy_internals_multiple_nesting(self): a3 = a2._clone() a3._copy_internals() is_(a1.corresponding_column(a3.c.c), a1.c.c) - - -class FromClauseConcurrencyTest(fixtures.TestBase): - """test for issue 12302""" - - @testing.requires.timing_intensive - def test_c_collection(self): - dictionary_meta = MetaData() - all_indexes_table = Table( - "all_indexes", - dictionary_meta, - *[Column(f"col{i}", Integer) for i in range(50)], - ) - - fails = 0 - - def use_table(): - nonlocal fails - try: - for i in range(3): - time.sleep(random.random() * 0.0001) - all_indexes.c.col35 - except: - fails += 1 - raise - - for j in range(1000): - all_indexes = all_indexes_table.alias("a_indexes") - - threads = [threading.Thread(target=use_table) for i in range(5)] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not fails, "one or more runs failed" diff --git a/tox.ini b/tox.ini index 383edfc4e2d..ea428ca909a 100644 --- a/tox.ini +++ b/tox.ini @@ -7,10 +7,15 @@ extras= asyncio sqlite: aiosqlite sqlite_file: aiosqlite - postgresql: postgresql_asyncpg + + # asyncpg doesnt build on free threading backends + py{38,39,310,311,312,313,314}-postgresql: postgresql_asyncpg + mysql: asyncmy mysql: aiomysql - mssql: aioodbc + + # aioodbc builds on free threading but segfaults once you use it + py{38,39,310,311,312,313,314}-mssql: aioodbc [testenv] cov_args=--cov=sqlalchemy --cov-report term --cov-append --cov-report xml --exclude-tag memory-intensive --exclude-tag timing-intensive -k "not aaa_profiling" @@ -27,7 +32,9 @@ extras= # this can be limited to specific python versions IF there is no # greenlet available for the most recent python. otherwise # keep this present in all cases - py{38,39,310,311,312,313,314}: {[greenletextras]extras} + # py{38,39,310,311,312,313,314,313t,314t}: {[greenletextras]extras} + + {[greenletextras]extras} postgresql: postgresql postgresql: postgresql_pg8000 @@ -111,6 +118,8 @@ setenv= WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5} + {py313t,py314t}: PYTHON_GIL=0 # when running with 313t, 314t, go for broke + # affect setup.py to skip or build the cython extensions. nocext: DISABLE_SQLALCHEMY_CEXT=1 cext: REQUIRE_SQLALCHEMY_CEXT=1 @@ -126,28 +135,30 @@ setenv= oracle: WORKERS={env:TOX_WORKERS:-n2 --max-worker-restart=5} oracle: ORACLE={env:TOX_ORACLE:--db oracle} oracle: EXTRA_ORACLE_DRIVERS={env:EXTRA_ORACLE_DRIVERS:--dbdriver cx_oracle --dbdriver oracledb --dbdriver oracledb_async} + oracle-nogreenlet: EXTRA_ORACLE_DRIVERS={env:EXTRA_ORACLE_DRIVERS:--dbdriver cx_oracle --dbdriver oracledb} sqlite: SQLITE={env:TOX_SQLITE:--db sqlite} - sqlite-py{38,39,310,311,312,313}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric --dbdriver aiosqlite} - sqlite-{py314,nogreenlet}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric} + sqlite-py{38,39,310,311,312,313,314}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric --dbdriver aiosqlite} + sqlite-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric} sqlite_file: SQLITE={env:TOX_SQLITE_FILE:--db sqlite_file} sqlite_file: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver aiosqlite} - sqlite_file-{py314,nogreenlet}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite} + sqlite_file-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite} postgresql: POSTGRESQL={env:TOX_POSTGRESQL:--db postgresql} postgresql: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver asyncpg --dbdriver pg8000 --dbdriver psycopg --dbdriver psycopg_async} postgresql-nogreenlet: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver pg8000 --dbdriver psycopg} + postgresql-{py313t,py314t}: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver pg8000 --dbdriver psycopg --dbdriver psycopg_async} mysql: MYSQL={env:TOX_MYSQL:--db mysql} mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver asyncmy --dbdriver aiomysql --dbdriver mariadbconnector} mysql-nogreenlet: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector} + # for mssql, aioodbc frequently segfaults under free-threaded builds mssql: MSSQL={env:TOX_MSSQL:--db mssql} mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc --dbdriver pymssql} mssql-py314: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc} - mssql-nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql} - mssql-py314-nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc} + mssql-{py313t,py314t,nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql} oracle,mssql,sqlite_file: IDENTS=--write-idents db_idents.txt @@ -172,6 +183,7 @@ passenv= EXTRA_PG_DRIVERS EXTRA_MYSQL_DRIVERS EXTRA_ORACLE_DRIVERS + PYTHON_GIL commands= @@ -287,23 +299,54 @@ extras = {[testenv:lint]extras} # command run in the github action when cext are active. -[testenv:github-cext] +# unfortunately it seems impossible to use generative tags with envs +# that are not the default. so in the interim, build out three separate +# envs with explicit names + +[testenv:githubbase] extras= - {[greenletextras]extras} +setenv= + PYTHONNOUSERSITE=1 + + WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5} + PYTEST_COLOR={tty:--color=yes} + PYTEST_EXCLUDES=-m "not memory_intensive and not mypy and not timing_intensive" + SQLITE=--db sqlite -deps = {[testenv]deps} - .[aiosqlite] commands= - python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs} + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt -# command run in the github action when cext are not active. -[testenv:github-nocext] +[testenv:github-cext-greenlet] extras= - {[greenletextras]extras} + asyncio + aiosqlite +setenv= + {[testenv:githubbase]setenv} + REQUIRE_SQLALCHEMY_CEXT=1 + SQLITE=--db sqlite --db aiosqlite +commands= + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} + oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt + +[testenv:github-cext] +extras= +setenv= + {[testenv:githubbase]setenv} + REQUIRE_SQLALCHEMY_CEXT=1 +deps = {[testenv:githubbase]deps} +commands= + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} + oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt -deps = {[testenv]deps} - .[aiosqlite] + +[testenv:github-nocext] +extras= +setenv= + {[testenv:githubbase]setenv} + DISABLE_SQLALCHEMY_CEXT=1 +deps = {[testenv:githubbase]deps} commands= - python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs} + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt +