|
| 1 | +import collections.abc |
| 2 | +import functools |
| 3 | +import operator |
| 4 | +import sys |
| 5 | +import types |
| 6 | +import typing |
| 7 | +from typing import ForwardRef, List |
| 8 | +import typing_extensions |
| 9 | +from typing_extensions import ParamSpec |
| 10 | + |
| 11 | +_GenericAlias = type(List[int]) |
| 12 | +SpecialType = (_GenericAlias, types.GenericAlias) |
| 13 | + |
| 14 | +if sys.version_info >= (3, 10): |
| 15 | + SpecialType += (types.UnionType,) |
| 16 | + |
| 17 | +if sys.version_info >= (3, 11): |
| 18 | + eval_type = typing._eval_type # type: ignore |
| 19 | +else: |
| 20 | + def _is_param_expr(arg): |
| 21 | + return arg is ... or isinstance(arg, (tuple, list, ParamSpec, typing_extensions._ConcatenateGenericAlias)) # type: ignore |
| 22 | + |
| 23 | + |
| 24 | + def _should_unflatten_callable_args(typ, args): |
| 25 | + """Internal helper for munging collections.abc.Callable's __args__. |
| 26 | +
|
| 27 | + The canonical representation for a Callable's __args__ flattens the |
| 28 | + argument types, see https://github.com/python/cpython/issues/86361. |
| 29 | +
|
| 30 | + For example:: |
| 31 | +
|
| 32 | + >>> import collections.abc |
| 33 | + >>> P = ParamSpec('P') |
| 34 | + >>> collections.abc.Callable[[int, int], str].__args__ == (int, int, str) |
| 35 | + True |
| 36 | + >>> collections.abc.Callable[P, str].__args__ == (P, str) |
| 37 | + True |
| 38 | +
|
| 39 | + As a result, if we need to reconstruct the Callable from its __args__, |
| 40 | + we need to unflatten it. |
| 41 | + """ |
| 42 | + return ( |
| 43 | + typ.__origin__ is collections.abc.Callable |
| 44 | + and not (len(args) == 2 and _is_param_expr(args[0])) |
| 45 | + ) |
| 46 | + |
| 47 | + |
| 48 | + def eval_type(t, globalns, localns, recursive_guard=frozenset()): |
| 49 | + """Evaluate all forward references in the given type t. |
| 50 | +
|
| 51 | + For use of globalns and localns see the docstring for get_type_hints(). |
| 52 | + recursive_guard is used to prevent infinite recursion with a recursive |
| 53 | + ForwardRef. |
| 54 | + """ |
| 55 | + if isinstance(t, ForwardRef): |
| 56 | + return t._evaluate(globalns, localns, recursive_guard) |
| 57 | + if isinstance(t, SpecialType): |
| 58 | + if isinstance(t, types.GenericAlias): |
| 59 | + args = tuple( |
| 60 | + ForwardRef(arg) if isinstance(arg, str) else arg |
| 61 | + for arg in t.__args__ |
| 62 | + ) |
| 63 | + is_unpacked = getattr(t, "__unpacked__", False) |
| 64 | + if _should_unflatten_callable_args(t, args): |
| 65 | + t = t.__origin__[(args[:-1], args[-1])] # type: ignore |
| 66 | + else: |
| 67 | + t = t.__origin__[args] # type: ignore |
| 68 | + if is_unpacked: |
| 69 | + t = typing_extensions.Unpack[t] |
| 70 | + ev_args = tuple(eval_type(a, globalns, localns, recursive_guard) for a in t.__args__) # type: ignore |
| 71 | + if ev_args == t.__args__: # type: ignore |
| 72 | + return t |
| 73 | + if isinstance(t, types.GenericAlias): |
| 74 | + return types.GenericAlias(t.__origin__, ev_args) |
| 75 | + if hasattr(types, "UnionType") and isinstance(t, types.UnionType): # type: ignore |
| 76 | + return functools.reduce(operator.or_, ev_args) |
| 77 | + return t.copy_with(ev_args) # type: ignore |
| 78 | + return t |
0 commit comments