diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index 177e2ed2ca..c0d4c8f43b 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -26,7 +26,7 @@ from test import support from test.support import os_helper from test.support import ( - TestFailed, run_with_locale, no_tracing, + TestFailed, run_with_locales, no_tracing, _2G, _4G, bigmemtest ) from test.support.import_helper import forget @@ -144,6 +144,14 @@ class E(C): def __getinitargs__(self): return () +import __main__ +__main__.C = C +C.__module__ = "__main__" +__main__.D = D +D.__module__ = "__main__" +__main__.E = E +E.__module__ = "__main__" + # Simple mutable object. class Object: pass @@ -157,14 +165,6 @@ def __reduce__(self): # Shouldn't support the recursion itself return K, (self.value,) -import __main__ -__main__.C = C -C.__module__ = "__main__" -__main__.D = D -D.__module__ = "__main__" -__main__.E = E -E.__module__ = "__main__" - class myint(int): def __init__(self, x): self.str = str(x) @@ -1080,6 +1080,11 @@ def test_large_32b_binunicode8(self): self.check_unpickling_error((pickle.UnpicklingError, OverflowError), dumped) + def test_large_binstring(self): + errmsg = 'BINSTRING pickle has negative byte count' + with self.assertRaisesRegex(pickle.UnpicklingError, errmsg): + self.loads(b'T\0\0\0\x80') + def test_get(self): pickled = b'((lp100000\ng100000\nt.' unpickled = self.loads(pickled) @@ -1179,6 +1184,153 @@ def test_compat_unpickle(self): self.assertIs(type(unpickled), collections.UserDict) self.assertEqual(unpickled, collections.UserDict({1: 2})) + def test_load_global(self): + self.assertIs(self.loads(b'cbuiltins\nstr\n.'), str) + self.assertIs(self.loads(b'cmath\nlog\n.'), math.log) + self.assertIs(self.loads(b'cos.path\njoin\n.'), os.path.join) + self.assertIs(self.loads(b'\x80\x04cbuiltins\nstr.upper\n.'), str.upper) + with support.swap_item(sys.modules, 'mödule', types.SimpleNamespace(glöbal=42)): + self.assertEqual(self.loads(b'\x80\x04cm\xc3\xb6dule\ngl\xc3\xb6bal\n.'), 42) + + self.assertRaises(UnicodeDecodeError, self.loads, b'c\xff\nlog\n.') + self.assertRaises(UnicodeDecodeError, self.loads, b'cmath\n\xff\n.') + self.assertRaises(self.truncated_errors, self.loads, b'c\nlog\n.') + self.assertRaises(self.truncated_errors, self.loads, b'cmath\n\n.') + self.assertRaises(self.truncated_errors, self.loads, b'\x80\x04cmath\n\n.') + + def test_load_stack_global(self): + self.assertIs(self.loads(b'\x8c\x08builtins\x8c\x03str\x93.'), str) + self.assertIs(self.loads(b'\x8c\x04math\x8c\x03log\x93.'), math.log) + self.assertIs(self.loads(b'\x8c\x07os.path\x8c\x04join\x93.'), + os.path.join) + self.assertIs(self.loads(b'\x80\x04\x8c\x08builtins\x8c\x09str.upper\x93.'), + str.upper) + with support.swap_item(sys.modules, 'mödule', types.SimpleNamespace(glöbal=42)): + self.assertEqual(self.loads(b'\x80\x04\x8c\x07m\xc3\xb6dule\x8c\x07gl\xc3\xb6bal\x93.'), 42) + + self.assertRaises(UnicodeDecodeError, self.loads, b'\x8c\x01\xff\x8c\x03log\x93.') + self.assertRaises(UnicodeDecodeError, self.loads, b'\x8c\x04math\x8c\x01\xff\x93.') + self.assertRaises(ValueError, self.loads, b'\x8c\x00\x8c\x03log\x93.') + self.assertRaises(AttributeError, self.loads, b'\x8c\x04math\x8c\x00\x93.') + self.assertRaises(AttributeError, self.loads, b'\x80\x04\x8c\x04math\x8c\x00\x93.') + + self.assertRaises(pickle.UnpicklingError, self.loads, b'N\x8c\x03log\x93.') + self.assertRaises(pickle.UnpicklingError, self.loads, b'\x8c\x04mathN\x93.') + self.assertRaises(pickle.UnpicklingError, self.loads, b'\x80\x04\x8c\x04mathN\x93.') + + def test_find_class(self): + unpickler = self.unpickler(io.BytesIO()) + unpickler_nofix = self.unpickler(io.BytesIO(), fix_imports=False) + unpickler4 = self.unpickler(io.BytesIO(b'\x80\x04N.')) + unpickler4.load() + + self.assertIs(unpickler.find_class('__builtin__', 'str'), str) + self.assertRaises(ModuleNotFoundError, + unpickler_nofix.find_class, '__builtin__', 'str') + self.assertIs(unpickler.find_class('builtins', 'str'), str) + self.assertIs(unpickler_nofix.find_class('builtins', 'str'), str) + self.assertIs(unpickler.find_class('math', 'log'), math.log) + self.assertIs(unpickler.find_class('os.path', 'join'), os.path.join) + self.assertIs(unpickler.find_class('os.path', 'join'), os.path.join) + + self.assertIs(unpickler4.find_class('builtins', 'str.upper'), str.upper) + with self.assertRaises(AttributeError): + unpickler.find_class('builtins', 'str.upper') + + with self.assertRaises(AttributeError): + unpickler.find_class('math', 'spam') + with self.assertRaises(AttributeError): + unpickler4.find_class('math', 'spam') + with self.assertRaises(AttributeError): + unpickler.find_class('math', 'log.spam') + with self.assertRaises(AttributeError): + unpickler4.find_class('math', 'log.spam') + with self.assertRaises(AttributeError): + unpickler.find_class('math', 'log..spam') + with self.assertRaises(AttributeError): + unpickler4.find_class('math', 'log..spam') + with self.assertRaises(AttributeError): + unpickler.find_class('math', '') + with self.assertRaises(AttributeError): + unpickler4.find_class('math', '') + self.assertRaises(ModuleNotFoundError, unpickler.find_class, 'spam', 'log') + self.assertRaises(ValueError, unpickler.find_class, '', 'log') + + self.assertRaises(TypeError, unpickler.find_class, None, 'log') + self.assertRaises(TypeError, unpickler.find_class, 'math', None) + self.assertRaises((TypeError, AttributeError), unpickler4.find_class, 'math', None) + + def test_custom_find_class(self): + def loads(data): + class Unpickler(self.unpickler): + def find_class(self, module_name, global_name): + return (module_name, global_name) + return Unpickler(io.BytesIO(data)).load() + + self.assertEqual(loads(b'cmath\nlog\n.'), ('math', 'log')) + self.assertEqual(loads(b'\x8c\x04math\x8c\x03log\x93.'), ('math', 'log')) + + def loads(data): + class Unpickler(self.unpickler): + @staticmethod + def find_class(module_name, global_name): + return (module_name, global_name) + return Unpickler(io.BytesIO(data)).load() + + self.assertEqual(loads(b'cmath\nlog\n.'), ('math', 'log')) + self.assertEqual(loads(b'\x8c\x04math\x8c\x03log\x93.'), ('math', 'log')) + + def loads(data): + class Unpickler(self.unpickler): + @classmethod + def find_class(cls, module_name, global_name): + return (module_name, global_name) + return Unpickler(io.BytesIO(data)).load() + + self.assertEqual(loads(b'cmath\nlog\n.'), ('math', 'log')) + self.assertEqual(loads(b'\x8c\x04math\x8c\x03log\x93.'), ('math', 'log')) + + def loads(data): + class Unpickler(self.unpickler): + pass + def find_class(module_name, global_name): + return (module_name, global_name) + unpickler = Unpickler(io.BytesIO(data)) + unpickler.find_class = find_class + return unpickler.load() + + self.assertEqual(loads(b'cmath\nlog\n.'), ('math', 'log')) + self.assertEqual(loads(b'\x8c\x04math\x8c\x03log\x93.'), ('math', 'log')) + + def test_bad_ext_code(self): + # unregistered extension code + self.check_unpickling_error(ValueError, b'\x82\x01.') + self.check_unpickling_error(ValueError, b'\x82\xff.') + self.check_unpickling_error(ValueError, b'\x83\x01\x00.') + self.check_unpickling_error(ValueError, b'\x83\xff\xff.') + self.check_unpickling_error(ValueError, b'\x84\x01\x00\x00\x00.') + self.check_unpickling_error(ValueError, b'\x84\xff\xff\xff\x7f.') + # EXT specifies code <= 0 + self.check_unpickling_error(pickle.UnpicklingError, b'\x82\x00.') + self.check_unpickling_error(pickle.UnpicklingError, b'\x83\x00\x00.') + self.check_unpickling_error(pickle.UnpicklingError, b'\x84\x00\x00\x00\x00.') + self.check_unpickling_error(pickle.UnpicklingError, b'\x84\x00\x00\x00\x80.') + self.check_unpickling_error(pickle.UnpicklingError, b'\x84\xff\xff\xff\xff.') + + @support.cpython_only + def test_bad_ext_inverted_registry(self): + code = 1 + def check(key, exc): + with support.swap_item(copyreg._inverted_registry, code, key): + with self.assertRaises(exc): + self.loads(b'\x82\x01.') + check(None, ValueError) + check((), ValueError) + check((__name__,), (TypeError, ValueError)) + check((__name__, "MyList", "x"), (TypeError, ValueError)) + check((__name__, None), (TypeError, ValueError)) + check((None, "MyList"), (TypeError, ValueError)) + def test_bad_reduce(self): self.assertEqual(self.loads(b'cbuiltins\nint\n)R.'), 0) self.check_unpickling_error(TypeError, b'N)R.') @@ -1197,6 +1349,41 @@ def test_bad_newobj_ex(self): self.check_unpickling_error(error, b'cbuiltins\nint\nN}\x92.') self.check_unpickling_error(error, b'cbuiltins\nint\n)N\x92.') + def test_bad_state(self): + c = C() + c.x = None + base = b'c__main__\nC\n)\x81' + self.assertEqual(self.loads(base + b'}X\x01\x00\x00\x00xNsb.'), c) + self.assertEqual(self.loads(base + b'N}X\x01\x00\x00\x00xNs\x86b.'), c) + # non-hashable dict key + self.check_unpickling_error(TypeError, base + b'}]Nsb.') + # state = list + error = (pickle.UnpicklingError, AttributeError) + self.check_unpickling_error(error, base + b'](}}eb.') + # state = 1-tuple + self.check_unpickling_error(error, base + b'}\x85b.') + # state = 3-tuple + self.check_unpickling_error(error, base + b'}}}\x87b.') + # non-hashable slot name + self.check_unpickling_error(TypeError, base + b'}}]Ns\x86b.') + # non-string slot name + self.check_unpickling_error(TypeError, base + b'}}NNs\x86b.') + # dict = True + self.check_unpickling_error(error, base + b'\x88}\x86b.') + # slots dict = True + self.check_unpickling_error(error, base + b'}\x88\x86b.') + + class BadKey1: + count = 1 + def __hash__(self): + if not self.count: + raise CustomError + self.count -= 1 + return 42 + __main__.BadKey1 = BadKey1 + # bad hashable dict key + self.check_unpickling_error(CustomError, base + b'}c__main__\nBadKey1\n)\x81Nsb.') + def test_bad_stack(self): badpickles = [ b'.', # STOP @@ -1443,6 +1630,502 @@ def t(): [ToBeUnpickled] * 2) +class AbstractPicklingErrorTests: + # Subclass must define self.dumps, self.pickler. + + def test_bad_reduce_result(self): + obj = REX([print, ()]) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj = REX((print,)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj = REX((print, (), None, None, None, None, None)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_bad_reconstructor(self): + obj = REX((42, ())) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_unpickleable_reconstructor(self): + obj = REX((UnpickleableCallable(), ())) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_bad_reconstructor_args(self): + obj = REX((print, [])) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_unpickleable_reconstructor_args(self): + obj = REX((print, (1, 2, UNPICKLEABLE))) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_bad_newobj_args(self): + obj = REX((copyreg.__newobj__, ())) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises((IndexError, pickle.PicklingError)) as cm: + self.dumps(obj, proto) + + obj = REX((copyreg.__newobj__, [REX])) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises((IndexError, pickle.PicklingError)): + self.dumps(obj, proto) + + def test_bad_newobj_class(self): + obj = REX((copyreg.__newobj__, (NoNew(),))) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_wrong_newobj_class(self): + obj = REX((copyreg.__newobj__, (str,))) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_unpickleable_newobj_class(self): + class LocalREX(REX): pass + obj = LocalREX((copyreg.__newobj__, (LocalREX,))) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((pickle.PicklingError, AttributeError)): + self.dumps(obj, proto) + + def test_unpickleable_newobj_args(self): + obj = REX((copyreg.__newobj__, (REX, 1, 2, UNPICKLEABLE))) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_bad_newobj_ex_args(self): + obj = REX((copyreg.__newobj_ex__, ())) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises((ValueError, pickle.PicklingError)): + self.dumps(obj, proto) + + obj = REX((copyreg.__newobj_ex__, 42)) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj = REX((copyreg.__newobj_ex__, (REX, 42, {}))) + is_py = self.pickler is pickle._Pickler + for proto in protocols[2:4] if is_py else protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises((TypeError, pickle.PicklingError)): + self.dumps(obj, proto) + + obj = REX((copyreg.__newobj_ex__, (REX, (), []))) + for proto in protocols[2:4] if is_py else protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises((TypeError, pickle.PicklingError)): + self.dumps(obj, proto) + + def test_bad_newobj_ex__class(self): + obj = REX((copyreg.__newobj_ex__, (NoNew(), (), {}))) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_wrong_newobj_ex_class(self): + if self.pickler is not pickle._Pickler: + self.skipTest('only verified in the Python implementation') + obj = REX((copyreg.__newobj_ex__, (str, (), {}))) + for proto in protocols[2:]: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_unpickleable_newobj_ex_class(self): + class LocalREX(REX): pass + obj = LocalREX((copyreg.__newobj_ex__, (LocalREX, (), {}))) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((pickle.PicklingError, AttributeError)): + self.dumps(obj, proto) + + def test_unpickleable_newobj_ex_args(self): + obj = REX((copyreg.__newobj_ex__, (REX, (1, 2, UNPICKLEABLE), {}))) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_newobj_ex_kwargs(self): + obj = REX((copyreg.__newobj_ex__, (REX, (), {'a': UNPICKLEABLE}))) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_state(self): + obj = REX_state(UNPICKLEABLE) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_bad_state_setter(self): + if self.pickler is pickle._Pickler: + self.skipTest('only verified in the C implementation') + obj = REX((print, (), 'state', None, None, 42)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_unpickleable_state_setter(self): + obj = REX((print, (), 'state', None, None, UnpickleableCallable())) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_state_with_state_setter(self): + obj = REX((print, (), UNPICKLEABLE, None, None, print)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_bad_object_list_items(self): + # Issue4176: crash when 4th and 5th items of __reduce__() + # are not iterators + obj = REX((list, (), None, 42)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((TypeError, pickle.PicklingError)): + self.dumps(obj, proto) + + if self.pickler is not pickle._Pickler: + # Python implementation is less strict and also accepts iterables. + obj = REX((list, (), None, [])) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((TypeError, pickle.PicklingError)): + self.dumps(obj, proto) + + def test_unpickleable_object_list_items(self): + obj = REX_six([1, 2, UNPICKLEABLE]) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_bad_object_dict_items(self): + # Issue4176: crash when 4th and 5th items of __reduce__() + # are not iterators + obj = REX((dict, (), None, None, 42)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((TypeError, pickle.PicklingError)): + self.dumps(obj, proto) + + for proto in protocols: + obj = REX((dict, (), None, None, iter([('a',)]))) + with self.subTest(proto=proto): + with self.assertRaises((ValueError, TypeError)): + self.dumps(obj, proto) + + if self.pickler is not pickle._Pickler: + # Python implementation is less strict and also accepts iterables. + obj = REX((dict, (), None, None, [])) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((TypeError, pickle.PicklingError)): + self.dumps(obj, proto) + + def test_unpickleable_object_dict_items(self): + obj = REX_seven({'a': UNPICKLEABLE}) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_list_items(self): + obj = [1, [2, 3, UNPICKLEABLE]] + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + for n in [0, 1, 1000, 1005]: + obj = [*range(n), UNPICKLEABLE] + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_tuple_items(self): + obj = (1, (2, 3, UNPICKLEABLE)) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + obj = (*range(10), UNPICKLEABLE) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_dict_items(self): + obj = {'a': {'b': UNPICKLEABLE}} + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + for n in [0, 1, 1000, 1005]: + obj = dict.fromkeys(range(n)) + obj['a'] = UNPICKLEABLE + for proto in protocols: + with self.subTest(proto=proto, n=n): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_set_items(self): + obj = {UNPICKLEABLE} + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_unpickleable_frozenset_items(self): + obj = frozenset({frozenset({UNPICKLEABLE})}) + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(CustomError): + self.dumps(obj, proto) + + def test_global_lookup_error(self): + # Global name does not exist + obj = REX('spam') + obj.__module__ = __name__ + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj.__module__ = 'nonexisting' + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj.__module__ = '' + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((ValueError, pickle.PicklingError)): + self.dumps(obj, proto) + + obj.__module__ = None + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_nonencodable_global_name_error(self): + for proto in protocols[:4]: + with self.subTest(proto=proto): + name = 'nonascii\xff' if proto < 3 else 'nonencodable\udbff' + obj = REX(name) + obj.__module__ = __name__ + with support.swap_item(globals(), name, obj): + with self.assertRaises((UnicodeEncodeError, pickle.PicklingError)): + self.dumps(obj, proto) + + def test_nonencodable_module_name_error(self): + for proto in protocols[:4]: + with self.subTest(proto=proto): + name = 'nonascii\xff' if proto < 3 else 'nonencodable\udbff' + obj = REX('test') + obj.__module__ = name + mod = types.SimpleNamespace(test=obj) + with support.swap_item(sys.modules, name, mod): + with self.assertRaises((UnicodeEncodeError, pickle.PicklingError)): + self.dumps(obj, proto) + + def test_nested_lookup_error(self): + # Nested name does not exist + global TestGlobal + class TestGlobal: + class A: + pass + obj = REX('TestGlobal.A.B.C') + obj.__module__ = __name__ + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj.__module__ = None + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_wrong_object_lookup_error(self): + # Name is bound to different object + global TestGlobal + class TestGlobal: + pass + obj = REX('TestGlobal') + obj.__module__ = __name__ + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + obj.__module__ = None + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises(pickle.PicklingError): + self.dumps(obj, proto) + + def test_local_lookup_error(self): + # Test that whichmodule() errors out cleanly when looking up + # an assumed globally-reachable object fails. + def f(): + pass + # Since the function is local, lookup will fail + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((AttributeError, pickle.PicklingError)): + self.dumps(f, proto) + # Same without a __module__ attribute (exercises a different path + # in _pickle.c). + del f.__module__ + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((AttributeError, pickle.PicklingError)): + self.dumps(f, proto) + # Yet a different path. + f.__name__ = f.__qualname__ + for proto in protocols: + with self.subTest(proto=proto): + with self.assertRaises((AttributeError, pickle.PicklingError)): + self.dumps(f, proto) + + def test_reduce_ex_None(self): + c = REX_None() + with self.assertRaises(TypeError): + self.dumps(c) + + def test_reduce_None(self): + c = R_None() + with self.assertRaises(TypeError): + self.dumps(c) + + @no_tracing + def test_bad_getattr(self): + # Issue #3514: crash when there is an infinite loop in __getattr__ + x = BadGetattr() + for proto in range(2): + with support.infinite_recursion(25): + self.assertRaises(RuntimeError, self.dumps, x, proto) + for proto in range(2, pickle.HIGHEST_PROTOCOL + 1): + s = self.dumps(x, proto) + + def test_picklebuffer_error(self): + # PickleBuffer forbidden with protocol < 5 + pb = pickle.PickleBuffer(b"foobar") + for proto in range(0, 5): + with self.subTest(proto=proto): + with self.assertRaises(pickle.PickleError) as cm: + self.dumps(pb, proto) + self.assertEqual(str(cm.exception), + 'PickleBuffer can only be pickled with protocol >= 5') + + def test_non_continuous_buffer(self): + for proto in protocols[5:]: + with self.subTest(proto=proto): + pb = pickle.PickleBuffer(memoryview(b"foobar")[::2]) + with self.assertRaises((pickle.PicklingError, BufferError)): + self.dumps(pb, proto) + + def test_buffer_callback_error(self): + def buffer_callback(buffers): + raise CustomError + pb = pickle.PickleBuffer(b"foobar") + with self.assertRaises(CustomError): + self.dumps(pb, 5, buffer_callback=buffer_callback) + + def test_evil_pickler_mutating_collection(self): + # https://github.com/python/cpython/issues/92930 + global Clearer + class Clearer: + pass + + def check(collection): + class EvilPickler(self.pickler): + def persistent_id(self, obj): + if isinstance(obj, Clearer): + collection.clear() + return None + pickler = EvilPickler(io.BytesIO(), proto) + try: + pickler.dump(collection) + except RuntimeError as e: + expected = "changed size during iteration" + self.assertIn(expected, str(e)) + + for proto in protocols: + check([Clearer()]) + check([Clearer(), Clearer()]) + check({Clearer()}) + check({Clearer(), Clearer()}) + check({Clearer(): 1}) + check({Clearer(): 1, Clearer(): 2}) + check({1: Clearer(), 2: Clearer()}) + + @support.cpython_only + def test_bad_ext_code(self): + # This should never happen in normal circumstances, because the type + # and the value of the extension code is checked in copyreg.add_extension(). + key = (__name__, 'MyList') + def check(code, exc): + assert key not in copyreg._extension_registry + assert code not in copyreg._inverted_registry + with (support.swap_item(copyreg._extension_registry, key, code), + support.swap_item(copyreg._inverted_registry, code, key)): + for proto in protocols[2:]: + with self.assertRaises(exc): + self.dumps(MyList, proto) + + check(object(), TypeError) + check(None, TypeError) + check(-1, (RuntimeError, struct.error)) + check(0, RuntimeError) + check(2**31, (RuntimeError, OverflowError, struct.error)) + check(2**1000, (OverflowError, struct.error)) + check(-2**1000, (OverflowError, struct.error)) + class AbstractPickleTests: # Subclass must define self.dumps, self.loads. @@ -1845,6 +2528,25 @@ def test_bytes(self): p = self.dumps(s, proto) self.assert_is_copy(s, self.loads(p)) + def test_bytes_memoization(self): + for proto in protocols: + for array_type in [bytes, ZeroCopyBytes]: + for s in b'', b'xyz', b'xyz'*100: + with self.subTest(proto=proto, array_type=array_type, s=s, independent=False): + b = array_type(s) + p = self.dumps((b, b), proto) + x, y = self.loads(p) + self.assertIs(x, y) + self.assert_is_copy((b, b), (x, y)) + + with self.subTest(proto=proto, array_type=array_type, s=s, independent=True): + b1, b2 = array_type(s), array_type(s) + p = self.dumps((b1, b2), proto) + # Note that (b1, b2) = self.loads(p) might have identical + # components, i.e., b1 is b2, but this is not always the + # case if the content is large (equality still holds). + self.assert_is_copy((b1, b2), self.loads(p)) + def test_bytearray(self): for proto in protocols: for s in b'', b'xyz', b'xyz'*100: @@ -1864,13 +2566,31 @@ def test_bytearray(self): self.assertNotIn(b'bytearray', p) self.assertTrue(opcode_in_pickle(pickle.BYTEARRAY8, p)) - def test_bytearray_memoization_bug(self): + def test_bytearray_memoization(self): for proto in protocols: - for s in b'', b'xyz', b'xyz'*100: - b = bytearray(s) - p = self.dumps((b, b), proto) - b1, b2 = self.loads(p) - self.assertIs(b1, b2) + for array_type in [bytearray, ZeroCopyBytearray]: + for s in b'', b'xyz', b'xyz'*100: + with self.subTest(proto=proto, array_type=array_type, s=s, independent=False): + b = array_type(s) + p = self.dumps((b, b), proto) + b1, b2 = self.loads(p) + self.assertIs(b1, b2) + + with self.subTest(proto=proto, array_type=array_type, s=s, independent=True): + b1a, b2a = array_type(s), array_type(s) + # Unlike bytes, equal but independent bytearray objects are + # never identical. + self.assertIsNot(b1a, b2a) + + p = self.dumps((b1a, b2a), proto) + b1b, b2b = self.loads(p) + self.assertIsNot(b1b, b2b) + + self.assertIsNot(b1a, b1b) + self.assert_is_copy(b1a, b1b) + + self.assertIsNot(b2a, b2b) + self.assert_is_copy(b2a, b2b) def test_ints(self): for proto in protocols: @@ -1915,7 +2635,7 @@ def test_float(self): got = self.loads(pickle) self.assert_is_copy(value, got) - @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') + @run_with_locales('LC_ALL', 'de_DE', 'fr_FR', '') def test_float_format(self): # make sure that floats are formatted locale independent with proto 0 self.assertEqual(self.dumps(1.2, 0)[0:3], b'F1.') @@ -2010,8 +2730,6 @@ def test_builtin_exceptions(self): else: self.assertIs(u, t) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_builtin_functions(self): for t in builtins.__dict__.values(): if isinstance(t, types.BuiltinFunctionType): @@ -2418,40 +3136,11 @@ def test_reduce_calls_base(self): y = self.loads(s) self.assertEqual(y._reduce_called, 1) - # TODO: RUSTPYTHON - @unittest.expectedFailure - @no_tracing - def test_bad_getattr(self): - # Issue #3514: crash when there is an infinite loop in __getattr__ - x = BadGetattr() - for proto in range(2): - with support.infinite_recursion(): - self.assertRaises(RuntimeError, self.dumps, x, proto) - for proto in range(2, pickle.HIGHEST_PROTOCOL + 1): - s = self.dumps(x, proto) - - def test_reduce_bad_iterator(self): - # Issue4176: crash when 4th and 5th items of __reduce__() - # are not iterators - class C(object): - def __reduce__(self): - # 4th item is not an iterator - return list, (), None, [], None - class D(object): - def __reduce__(self): - # 5th item is not an iterator - return dict, (), None, None, [] - - # Python implementation is less strict and also accepts iterables. - for proto in protocols: - try: - self.dumps(C(), proto) - except pickle.PicklingError: - pass - try: - self.dumps(D(), proto) - except pickle.PicklingError: - pass + def test_pickle_setstate_None(self): + c = C_None_setstate() + p = self.dumps(c) + with self.assertRaises(TypeError): + self.loads(p) def test_many_puts_and_gets(self): # Test that internal data structures correctly deal with lots of @@ -2769,6 +3458,18 @@ class Recursive: self.assertIs(unpickled, Recursive) del Recursive.mod # break reference loop + def test_recursive_nested_names2(self): + global Recursive + class Recursive: + pass + Recursive.ref = Recursive + Recursive.__qualname__ = 'Recursive.ref' + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(proto=proto): + unpickled = self.loads(self.dumps(Recursive, proto)) + self.assertIs(unpickled, Recursive) + del Recursive.ref # break reference loop + def test_py_methods(self): global PyMethodsTest class PyMethodsTest: @@ -2818,6 +3519,15 @@ def pie(self): unpickled = self.loads(self.dumps(method, proto)) self.assertEqual(method(obj), unpickled(obj)) + descriptors = ( + PyMethodsTest.__dict__['cheese'], # static method descriptor + PyMethodsTest.__dict__['wine'], # class method descriptor + ) + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + for descr in descriptors: + with self.subTest(proto=proto, descr=descr): + self.assertRaises(TypeError, self.dumps, descr, proto) + def test_c_methods(self): global Subclass class Subclass(tuple): @@ -2853,6 +3563,15 @@ class Nested(str): unpickled = self.loads(self.dumps(method, proto)) self.assertEqual(method(*args), unpickled(*args)) + descriptors = ( + bytearray.__dict__['maketrans'], # built-in static method descriptor + dict.__dict__['fromkeys'], # built-in class method descriptor + ) + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + for descr in descriptors: + with self.subTest(proto=proto, descr=descr): + self.assertRaises(TypeError, self.dumps, descr, proto) + def test_compat_pickle(self): tests = [ (range(1, 7), '__builtin__', 'xrange'), @@ -2871,27 +3590,6 @@ def test_compat_pickle(self): self.assertIn(('c%s\n%s' % (mod, name)).encode(), pickled) self.assertIs(type(self.loads(pickled)), type(val)) - def test_local_lookup_error(self): - # Test that whichmodule() errors out cleanly when looking up - # an assumed globally-reachable object fails. - def f(): - pass - # Since the function is local, lookup will fail - for proto in range(0, pickle.HIGHEST_PROTOCOL + 1): - with self.assertRaises((AttributeError, pickle.PicklingError)): - pickletools.dis(self.dumps(f, proto)) - # Same without a __module__ attribute (exercises a different path - # in _pickle.c). - del f.__module__ - for proto in range(0, pickle.HIGHEST_PROTOCOL + 1): - with self.assertRaises((AttributeError, pickle.PicklingError)): - pickletools.dis(self.dumps(f, proto)) - # Yet a different path. - f.__name__ = f.__qualname__ - for proto in range(0, pickle.HIGHEST_PROTOCOL + 1): - with self.assertRaises((AttributeError, pickle.PicklingError)): - pickletools.dis(self.dumps(f, proto)) - # # PEP 574 tests below # @@ -3002,20 +3700,6 @@ def test_oob_buffers_writable_to_readonly(self): self.assertIs(type(new), type(obj)) self.assertEqual(new, obj) - def test_picklebuffer_error(self): - # PickleBuffer forbidden with protocol < 5 - pb = pickle.PickleBuffer(b"foobar") - for proto in range(0, 5): - with self.assertRaises(pickle.PickleError): - self.dumps(pb, proto) - - def test_buffer_callback_error(self): - def buffer_callback(buffers): - 1/0 - pb = pickle.PickleBuffer(b"foobar") - with self.assertRaises(ZeroDivisionError): - self.dumps(pb, 5, buffer_callback=buffer_callback) - def test_buffers_error(self): pb = pickle.PickleBuffer(b"foobar") for proto in range(5, pickle.HIGHEST_PROTOCOL + 1): @@ -3107,37 +3791,6 @@ def __reduce__(self): expected = "changed size during iteration" self.assertIn(expected, str(e)) - def test_evil_pickler_mutating_collection(self): - # https://github.com/python/cpython/issues/92930 - if not hasattr(self, "pickler"): - raise self.skipTest(f"{type(self)} has no associated pickler type") - - global Clearer - class Clearer: - pass - - def check(collection): - class EvilPickler(self.pickler): - def persistent_id(self, obj): - if isinstance(obj, Clearer): - collection.clear() - return None - pickler = EvilPickler(io.BytesIO(), proto) - try: - pickler.dump(collection) - except RuntimeError as e: - expected = "changed size during iteration" - self.assertIn(expected, str(e)) - - for proto in protocols: - check([Clearer()]) - check([Clearer(), Clearer()]) - check({Clearer()}) - check({Clearer(), Clearer()}) - check({Clearer(): 1}) - check({Clearer(): 1, Clearer(): 2}) - check({1: Clearer(), 2: Clearer()}) - class BigmemPickleTests: @@ -3268,6 +3921,18 @@ def test_huge_str_64b(self, size): # Test classes for reduce_ex +class R: + def __init__(self, reduce=None): + self.reduce = reduce + def __reduce__(self, proto): + return self.reduce + +class REX: + def __init__(self, reduce_ex=None): + self.reduce_ex = reduce_ex + def __reduce_ex__(self, proto): + return self.reduce_ex + class REX_one(object): """No __reduce_ex__ here, but inheriting it from object""" _reduce_called = 0 @@ -3343,6 +4008,34 @@ def __setstate__(self, state): def __reduce__(self): return type(self), (), self.state +class REX_None: + """ Setting __reduce_ex__ to None should fail """ + __reduce_ex__ = None + +class R_None: + """ Setting __reduce__ to None should fail """ + __reduce__ = None + +class C_None_setstate: + """ Setting __setstate__ to None should fail """ + def __getstate__(self): + return 1 + + __setstate__ = None + +class CustomError(Exception): + pass + +class Unpickleable: + def __reduce__(self): + raise CustomError + +UNPICKLEABLE = Unpickleable() + +class UnpickleableCallable(Unpickleable): + def __call__(self, *args, **kwargs): + pass + # Test classes for newobj @@ -3392,7 +4085,9 @@ class MyIntWithNew2(MyIntWithNew): class SlotList(MyList): __slots__ = ["foo"] -class SimpleNewObj(int): +# Ruff "redefined while unused" false positive here due to `global` variables +# being assigned (and then restored) from within test methods earlier in the file +class SimpleNewObj(int): # noqa: F811 def __init__(self, *args, **kwargs): # raise an error, to make sure this isn't called raise TypeError("SimpleNewObj.__init__() didn't expect to get called") @@ -3411,6 +4106,12 @@ class BadGetattr: def __getattr__(self, key): self.foo +class NoNew: + def __getattribute__(self, name): + if name == '__new__': + raise AttributeError + return super().__getattribute__(name) + class AbstractPickleModuleTests: @@ -3483,7 +4184,7 @@ def raises_oserror(self, *args, **kwargs): raise OSError @property def bad_property(self): - 1/0 + raise CustomError # File without read and readline class F: @@ -3504,23 +4205,23 @@ class F: class F: read = bad_property readline = raises_oserror - self.assertRaises(ZeroDivisionError, self.Unpickler, F()) + self.assertRaises(CustomError, self.Unpickler, F()) # File with bad readline class F: readline = bad_property read = raises_oserror - self.assertRaises(ZeroDivisionError, self.Unpickler, F()) + self.assertRaises(CustomError, self.Unpickler, F()) # File with bad readline, no read class F: readline = bad_property - self.assertRaises(ZeroDivisionError, self.Unpickler, F()) + self.assertRaises(CustomError, self.Unpickler, F()) # File with bad read, no readline class F: read = bad_property - self.assertRaises((AttributeError, ZeroDivisionError), self.Unpickler, F()) + self.assertRaises((AttributeError, CustomError), self.Unpickler, F()) # File with bad peek class F: @@ -3529,7 +4230,7 @@ class F: readline = raises_oserror try: self.Unpickler(F()) - except ZeroDivisionError: + except CustomError: pass # File with bad readinto @@ -3539,7 +4240,7 @@ class F: readline = raises_oserror try: self.Unpickler(F()) - except ZeroDivisionError: + except CustomError: pass def test_pickler_bad_file(self): @@ -3552,8 +4253,8 @@ class F: class F: @property def write(self): - 1/0 - self.assertRaises(ZeroDivisionError, self.Pickler, F()) + raise CustomError + self.assertRaises(CustomError, self.Pickler, F()) def check_dumps_loads_oob_buffers(self, dumps, loads): # No need to do the full gamut of tests here, just enough to @@ -3661,9 +4362,15 @@ def test_return_correct_type(self): def test_protocol0_is_ascii_only(self): non_ascii_str = "\N{EMPTY SET}" - self.assertRaises(pickle.PicklingError, self.dumps, non_ascii_str, 0) + with self.assertRaises(pickle.PicklingError) as cm: + self.dumps(non_ascii_str, 0) + self.assertEqual(str(cm.exception), + 'persistent IDs in protocol 0 must be ASCII strings') pickled = pickle.PERSID + non_ascii_str.encode('utf-8') + b'\n.' - self.assertRaises(pickle.UnpicklingError, self.loads, pickled) + with self.assertRaises(pickle.UnpicklingError) as cm: + self.loads(pickled) + self.assertEqual(str(cm.exception), + 'persistent IDs in protocol 0 must be ASCII strings') class AbstractPicklerUnpicklerObjectTests: @@ -3824,6 +4531,25 @@ def test_unpickling_buffering_readline(self): unpickler = self.unpickler_class(f) self.assertEqual(unpickler.load(), data) + def test_pickle_invalid_reducer_override(self): + # gh-103035 + obj = object() + + f = io.BytesIO() + class MyPickler(self.pickler_class): + pass + pickler = MyPickler(f) + pickler.dump(obj) + + pickler.clear_memo() + pickler.reducer_override = None + with self.assertRaises(TypeError): + pickler.dump(obj) + + pickler.clear_memo() + pickler.reducer_override = 10 + with self.assertRaises(TypeError): + pickler.dump(obj) # Tests for dispatch_table attribute @@ -3986,6 +4712,15 @@ def dumps(obj, protocol=None): self._test_dispatch_table(dumps, dt) + def test_dispatch_table_None_item(self): + # gh-93627 + obj = object() + f = io.BytesIO() + pickler = self.pickler_class(f) + pickler.dispatch_table = {type(obj): None} + with self.assertRaises(TypeError): + pickler.dump(obj) + def _test_dispatch_table(self, dumps, dispatch_table): def custom_load_dump(obj): return pickle.loads(dumps(obj, 0)) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 26a8b16724..b69278f990 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -6,8 +6,8 @@ import contextlib import dataclasses import functools -import getpass -import opcode +import logging +# import _opcode # TODO: RUSTPYTHON import os import re import stat @@ -19,8 +19,6 @@ import unittest import warnings -from .testresult import get_test_runner - __all__ = [ # globals @@ -29,12 +27,11 @@ "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", # io "record_original_stdout", "get_original_stdout", "captured_stdout", - "captured_stdin", "captured_stderr", + "captured_stdin", "captured_stderr", "captured_output", # unittest "is_resource_enabled", "requires", "requires_freebsd_version", - "requires_linux_version", "requires_mac_ver", + "requires_gil_enabled", "requires_linux_version", "requires_mac_ver", "check_syntax_error", - "run_unittest", "run_doctest", "requires_gzip", "requires_bz2", "requires_lzma", "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", "requires_IEEE_754", "requires_zlib", @@ -46,8 +43,8 @@ "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer", "requires_limited_api", "requires_specialization", # sys - "is_jython", "is_android", "is_emscripten", "is_wasi", - "check_impl_detail", "unix_shell", "setswitchinterval", + "MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi", + "is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval", # os "get_pagesize", # network @@ -60,8 +57,13 @@ "run_with_tz", "PGO", "missing_compiler_executable", "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", - "Py_DEBUG", "EXCEEDS_RECURSION_LIMIT", "C_RECURSION_LIMIT", + "Py_DEBUG", "exceeds_recursion_limit", "get_c_recursion_limit", "skip_on_s390x", + "without_optimizer", + "force_not_colorized", + "force_not_colorized_test_class", + "make_clean_env", + "BrokenIter", ] @@ -107,6 +109,7 @@ STDLIB_DIR = os.path.dirname(TEST_HOME_DIR) REPO_ROOT = os.path.dirname(STDLIB_DIR) + class Error(Exception): """Base class for regression test exceptions.""" @@ -384,7 +387,7 @@ def skip_if_buildbot(reason=None): try: isbuildbot = getpass.getuser().lower() == 'buildbot' except (KeyError, OSError) as err: - warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning) + logging.getLogger(__name__).warning('getpass.getuser() failed %s.', err, exc_info=err) isbuildbot = False return unittest.skipIf(isbuildbot, reason) @@ -807,28 +810,47 @@ def gc_collect(): longer than expected. This function tries its best to force all garbage objects to disappear. """ - # TODO: RUSTPYTHON (comment out before) - # import gc - # gc.collect() - # if is_jython: - # time.sleep(0.1) - # gc.collect() - # gc.collect() - pass + return # TODO: RUSTPYTHON + + import gc + gc.collect() + gc.collect() + gc.collect() @contextlib.contextmanager def disable_gc(): - # TODO: RUSTPYTHON (comment out before) - # import gc - # have_gc = gc.isenabled() - # gc.disable() - # try: - # yield - # finally: - # if have_gc: - # gc.enable() - yield + # TODO: RUSTPYTHON; GC is not supported yet + try: + yield + finally: + pass + return + + import gc + have_gc = gc.isenabled() + gc.disable() + try: + yield + finally: + if have_gc: + gc.enable() + +@contextlib.contextmanager +def gc_threshold(*args): + # TODO: RUSTPYTHON; GC is not supported yet + try: + yield + finally: + pass + return + import gc + old_threshold = gc.get_threshold() + gc.set_threshold(*args) + try: + yield + finally: + gc.set_threshold(*old_threshold) def python_is_optimized(): """Find if Python was built with optimizations.""" @@ -837,25 +859,52 @@ def python_is_optimized(): for opt in cflags.split(): if opt.startswith('-O'): final_opt = opt - return final_opt not in ('', '-O0', '-Og') + if sysconfig.get_config_var("CC") == "gcc": + non_opts = ('', '-O0', '-Og') + else: + non_opts = ('', '-O0') + return final_opt not in non_opts + + +def check_cflags_pgo(): + # Check if Python was built with ./configure --enable-optimizations: + # with Profile Guided Optimization (PGO). + cflags_nodist = sysconfig.get_config_var('PY_CFLAGS_NODIST') or '' + pgo_options = [ + # GCC + '-fprofile-use', + # clang: -fprofile-instr-use=code.profclangd + '-fprofile-instr-use', + # ICC + "-prof-use", + ] + PGO_PROF_USE_FLAG = sysconfig.get_config_var('PGO_PROF_USE_FLAG') + if PGO_PROF_USE_FLAG: + pgo_options.append(PGO_PROF_USE_FLAG) + return any(option in cflags_nodist for option in pgo_options) + + +def check_bolt_optimized(): + # Always return false, if the platform is WASI, + # because BOLT optimization does not support WASM binary. + if is_wasi: + return False + config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' + return '--enable-bolt' in config_args -# From CPython 3.13.5 Py_GIL_DISABLED = bool(sysconfig.get_config_var('Py_GIL_DISABLED')) -# From CPython 3.13.5 def requires_gil_enabled(msg="needs the GIL enabled"): """Decorator for skipping tests on the free-threaded build.""" return unittest.skipIf(Py_GIL_DISABLED, msg) -# From CPython 3.13.5 def expected_failure_if_gil_disabled(): """Expect test failure if the GIL is disabled.""" if Py_GIL_DISABLED: return unittest.expectedFailure return lambda test_case: test_case -# From CPython 3.13.5 if Py_GIL_DISABLED: _header = 'PHBBInP' else: @@ -889,9 +938,34 @@ def check_sizeof(test, o, size): % (type(o), result, size) test.assertEqual(result, size, msg) +def subTests(arg_names, arg_values, /, *, _do_cleanups=False): + """Run multiple subtests with different parameters. + """ + single_param = False + if isinstance(arg_names, str): + arg_names = arg_names.replace(',',' ').split() + if len(arg_names) == 1: + single_param = True + arg_values = tuple(arg_values) + def decorator(func): + if isinstance(func, type): + raise TypeError('subTests() can only decorate methods, not classes') + @functools.wraps(func) + def wrapper(self, /, *args, **kwargs): + for values in arg_values: + if single_param: + values = (values,) + subtest_kwargs = dict(zip(arg_names, values)) + with self.subTest(**subtest_kwargs): + func(self, *args, **kwargs, **subtest_kwargs) + if _do_cleanups: + self.doCleanups() + return wrapper + return decorator + #======================================================================= -# Decorator for running a function in a different locale, correctly resetting -# it afterwards. +# Decorator/context manager for running a code in a different locale, +# correctly resetting it afterwards. @contextlib.contextmanager def run_with_locale(catstr, *locales): @@ -902,16 +976,21 @@ def run_with_locale(catstr, *locales): except AttributeError: # if the test author gives us an invalid category string raise - except: + except Exception: # cannot retrieve original locale, so do nothing locale = orig_locale = None + if '' not in locales: + raise unittest.SkipTest('no locales') else: for loc in locales: try: locale.setlocale(category, loc) break - except: + except locale.Error: pass + else: + if '' not in locales: + raise unittest.SkipTest(f'no locales {locales}') try: yield @@ -919,6 +998,46 @@ def run_with_locale(catstr, *locales): if locale and orig_locale: locale.setlocale(category, orig_locale) +#======================================================================= +# Decorator for running a function in multiple locales (if they are +# availasble) and resetting the original locale afterwards. + +def run_with_locales(catstr, *locales): + def deco(func): + @functools.wraps(func) + def wrapper(self, /, *args, **kwargs): + dry_run = '' in locales + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except Exception: + # cannot retrieve original locale, so do nothing + pass + else: + try: + for loc in locales: + with self.subTest(locale=loc): + try: + locale.setlocale(category, loc) + except locale.Error: + self.skipTest(f'no locale {loc!r}') + else: + dry_run = False + func(self, *args, **kwargs) + finally: + locale.setlocale(category, orig_locale) + if dry_run: + # no locales available, so just run the test + # with the current locale + with self.subTest(locale=None): + func(self, *args, **kwargs) + return wrapper + return deco + #======================================================================= # Decorator for running a function in a specific timezone, correctly # resetting it afterwards. @@ -965,27 +1084,31 @@ def inner(*args, **kwds): MAX_Py_ssize_t = sys.maxsize -def set_memlimit(limit): - global max_memuse - global real_max_memuse +def _parse_memlimit(limit: str) -> int: sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } - m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: - raise ValueError('Invalid memory limit %r' % (limit,)) - memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) - real_max_memuse = memlimit - if memlimit > MAX_Py_ssize_t: - memlimit = MAX_Py_ssize_t + raise ValueError(f'Invalid memory limit: {limit!r}') + return int(float(m.group(1)) * sizes[m.group(2).lower()]) + +def set_memlimit(limit: str) -> None: + global max_memuse + global real_max_memuse + memlimit = _parse_memlimit(limit) if memlimit < _2G - 1: - raise ValueError('Memory limit %r too low to be useful' % (limit,)) + raise ValueError(f'Memory limit {limit!r} too low to be useful') + + real_max_memuse = memlimit + memlimit = min(memlimit, MAX_Py_ssize_t) max_memuse = memlimit + class _MemoryWatchdog: """An object which periodically watches the process' memory consumption and prints it out. @@ -1000,8 +1123,7 @@ def start(self): try: f = open(self.procfile, 'r') except OSError as e: - warnings.warn('/proc not available for stats: {}'.format(e), - RuntimeWarning) + logging.getLogger(__name__).warning('/proc not available for stats: %s', e, exc_info=e) sys.stderr.flush() return @@ -1138,18 +1260,30 @@ def check_impl_detail(**guards): def no_tracing(func): """Decorator to temporarily turn off tracing for the duration of a test.""" - if not hasattr(sys, 'gettrace'): - return func - else: + trace_wrapper = func + if hasattr(sys, 'gettrace'): @functools.wraps(func) - def wrapper(*args, **kwargs): + def trace_wrapper(*args, **kwargs): original_trace = sys.gettrace() try: sys.settrace(None) return func(*args, **kwargs) finally: sys.settrace(original_trace) - return wrapper + + coverage_wrapper = trace_wrapper + if 'test.cov' in sys.modules: # -Xpresite=test.cov used + cov = sys.monitoring.COVERAGE_ID + @functools.wraps(func) + def coverage_wrapper(*args, **kwargs): + original_events = sys.monitoring.get_events(cov) + try: + sys.monitoring.set_events(cov, 0) + return trace_wrapper(*args, **kwargs) + finally: + sys.monitoring.set_events(cov, original_events) + + return coverage_wrapper def refcount_test(test): @@ -1166,332 +1300,136 @@ def refcount_test(test): def requires_limited_api(test): try: import _testcapi + import _testlimitedcapi except ImportError: - return unittest.skip('needs _testcapi module')(test) - return unittest.skipUnless( - _testcapi.LIMITED_API_AVAILABLE, 'needs Limited API support')(test) + return unittest.skip('needs _testcapi and _testlimitedcapi modules')(test) + return test + + +# Windows build doesn't support --disable-test-modules feature, so there's no +# 'TEST_MODULES' var in config +TEST_MODULES_ENABLED = (sysconfig.get_config_var('TEST_MODULES') or 'yes') == 'yes' def requires_specialization(test): return unittest.skipUnless( - opcode.ENABLE_SPECIALIZATION, "requires specialization")(test) + _opcode.ENABLE_SPECIALIZATION, "requires specialization")(test) -def _filter_suite(suite, pred): - """Recursively filter test cases in a suite based on a predicate.""" - newtests = [] - for test in suite._tests: - if isinstance(test, unittest.TestSuite): - _filter_suite(test, pred) - newtests.append(test) - else: - if pred(test): - newtests.append(test) - suite._tests = newtests -@dataclasses.dataclass(slots=True) -class TestStats: - tests_run: int = 0 - failures: int = 0 - skipped: int = 0 +#======================================================================= +# Check for the presence of docstrings. - @staticmethod - def from_unittest(result): - return TestStats(result.testsRun, - len(result.failures), - len(result.skipped)) +# Rather than trying to enumerate all the cases where docstrings may be +# disabled, we just check for that directly - @staticmethod - def from_doctest(results): - return TestStats(results.attempted, - results.failed) +def _check_docstrings(): + """Just used to check if docstrings are enabled""" - def accumulate(self, stats): - self.tests_run += stats.tests_run - self.failures += stats.failures - self.skipped += stats.skipped +MISSING_C_DOCSTRINGS = (check_impl_detail() and + sys.platform != 'win32' and + not sysconfig.get_config_var('WITH_DOC_STRINGS')) +HAVE_PY_DOCSTRINGS = _check_docstrings.__doc__ is not None +HAVE_DOCSTRINGS = (HAVE_PY_DOCSTRINGS and not MISSING_C_DOCSTRINGS) -def _run_suite(suite): - """Run tests from a unittest.TestSuite-derived class.""" - runner = get_test_runner(sys.stdout, - verbosity=verbose, - capture_output=(junit_xml_list is not None)) +requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, + "test requires docstrings") - result = runner.run(suite) - if junit_xml_list is not None: - junit_xml_list.append(result.get_xml_element()) +#======================================================================= +# Support for saving and restoring the imported modules. - if not result.testsRun and not result.skipped and not result.errors: - raise TestDidNotRun - if not result.wasSuccessful(): - stats = TestStats.from_unittest(result) - if len(result.errors) == 1 and not result.failures: - err = result.errors[0][1] - elif len(result.failures) == 1 and not result.errors: - err = result.failures[0][1] - else: - err = "multiple errors occurred" - if not verbose: err += "; run in verbose mode for details" - errors = [(str(tc), exc_str) for tc, exc_str in result.errors] - failures = [(str(tc), exc_str) for tc, exc_str in result.failures] - raise TestFailedWithDetails(err, errors, failures, stats=stats) - return result +def flush_std_streams(): + if sys.stdout is not None: + sys.stdout.flush() + if sys.stderr is not None: + sys.stderr.flush() -# By default, don't filter tests -_match_test_func = None +def print_warning(msg): + # bpo-45410: Explicitly flush stdout to keep logs in order + flush_std_streams() + stream = print_warning.orig_stderr + for line in msg.splitlines(): + print(f"Warning -- {line}", file=stream) + stream.flush() -_accept_test_patterns = None -_ignore_test_patterns = None +# bpo-39983: Store the original sys.stderr at Python startup to be able to +# log warnings even if sys.stderr is captured temporarily by a test. +print_warning.orig_stderr = sys.stderr -def match_test(test): - # Function used by support.run_unittest() and regrtest --list-cases - if _match_test_func is None: - return True - else: - return _match_test_func(test.id()) +# Flag used by saved_test_environment of test.libregrtest.save_env, +# to check if a test modified the environment. The flag should be set to False +# before running a new test. +# +# For example, threading_helper.threading_cleanup() sets the flag is the function fails +# to cleanup threads. +environment_altered = False +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + global environment_altered -def _is_full_match_test(pattern): - # If a pattern contains at least one dot, it's considered - # as a full test identifier. - # Example: 'test.test_os.FileTests.test_access'. - # - # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' - # or '[!...]'. For example, ignore 'test_access*'. - return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) + # Need os.waitpid(-1, os.WNOHANG): Windows is not supported + if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): + return + elif not has_subprocess_support: + return + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + while True: + try: + # Read the exit status of any child process which already completed + pid, status = os.waitpid(-1, os.WNOHANG) + except OSError: + break -def set_match_tests(accept_patterns=None, ignore_patterns=None): - global _match_test_func, _accept_test_patterns, _ignore_test_patterns + if pid == 0: + break - if accept_patterns is None: - accept_patterns = () - if ignore_patterns is None: - ignore_patterns = () + print_warning(f"reap_children() reaped child process {pid}") + environment_altered = True - accept_func = ignore_func = None - if accept_patterns != _accept_test_patterns: - accept_patterns, accept_func = _compile_match_function(accept_patterns) - if ignore_patterns != _ignore_test_patterns: - ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. - # Create a copy since patterns can be mutable and so modified later - _accept_test_patterns = tuple(accept_patterns) - _ignore_test_patterns = tuple(ignore_patterns) + Usage: + with swap_attr(obj, "attr", 5): + ... - if accept_func is not None or ignore_func is not None: - def match_function(test_id): - accept = True - ignore = False - if accept_func: - accept = accept_func(test_id) - if ignore_func: - ignore = ignore_func(test_id) - return accept and not ignore + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. - _match_test_func = match_function + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield real_val + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + if hasattr(obj, attr): + delattr(obj, attr) - -def _compile_match_function(patterns): - if not patterns: - func = None - # set_match_tests(None) behaves as set_match_tests(()) - patterns = () - elif all(map(_is_full_match_test, patterns)): - # Simple case: all patterns are full test identifier. - # The test.bisect_cmd utility only uses such full test identifiers. - func = set(patterns).__contains__ - else: - import fnmatch - regex = '|'.join(map(fnmatch.translate, patterns)) - # The search *is* case sensitive on purpose: - # don't use flags=re.IGNORECASE - regex_match = re.compile(regex).match - - def match_test_regex(test_id): - if regex_match(test_id): - # The regex matches the whole identifier, for example - # 'test.test_os.FileTests.test_access'. - return True - else: - # Try to match parts of the test identifier. - # For example, split 'test.test_os.FileTests.test_access' - # into: 'test', 'test_os', 'FileTests' and 'test_access'. - return any(map(regex_match, test_id.split("."))) - - func = match_test_regex - - return patterns, func - - -def run_unittest(*classes): - """Run tests from unittest.TestCase-derived classes.""" - valid_types = (unittest.TestSuite, unittest.TestCase) - loader = unittest.TestLoader() - suite = unittest.TestSuite() - for cls in classes: - if isinstance(cls, str): - if cls in sys.modules: - suite.addTest(loader.loadTestsFromModule(sys.modules[cls])) - else: - raise ValueError("str arguments must be keys in sys.modules") - elif isinstance(cls, valid_types): - suite.addTest(cls) - else: - suite.addTest(loader.loadTestsFromTestCase(cls)) - _filter_suite(suite, match_test) - return _run_suite(suite) - -#======================================================================= -# Check for the presence of docstrings. - -# Rather than trying to enumerate all the cases where docstrings may be -# disabled, we just check for that directly - -def _check_docstrings(): - """Just used to check if docstrings are enabled""" - -MISSING_C_DOCSTRINGS = (check_impl_detail() and - sys.platform != 'win32' and - not sysconfig.get_config_var('WITH_DOC_STRINGS')) - -HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and - not MISSING_C_DOCSTRINGS) - -requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, - "test requires docstrings") - - -#======================================================================= -# doctest driver. - -def run_doctest(module, verbosity=None, optionflags=0): - """Run doctest on the given module. Return (#failures, #tests). - - If optional argument verbosity is not specified (or is None), pass - support's belief about verbosity on to doctest. Else doctest's - usual behavior is used (it searches sys.argv for -v). - """ - - import doctest - - if verbosity is None: - verbosity = verbose - else: - verbosity = None - - results = doctest.testmod(module, - verbose=verbosity, - optionflags=optionflags) - if results.failed: - stats = TestStats.from_doctest(results) - raise TestFailed(f"{results.failed} of {results.attempted} " - f"doctests failed", - stats=stats) - if verbose: - print('doctest (%s) ... %d tests with zero failures' % - (module.__name__, results.attempted)) - return results - - -#======================================================================= -# Support for saving and restoring the imported modules. - -def flush_std_streams(): - if sys.stdout is not None: - sys.stdout.flush() - if sys.stderr is not None: - sys.stderr.flush() - - -def print_warning(msg): - # bpo-45410: Explicitly flush stdout to keep logs in order - flush_std_streams() - stream = print_warning.orig_stderr - for line in msg.splitlines(): - print(f"Warning -- {line}", file=stream) - stream.flush() - -# bpo-39983: Store the original sys.stderr at Python startup to be able to -# log warnings even if sys.stderr is captured temporarily by a test. -print_warning.orig_stderr = sys.stderr - - -# Flag used by saved_test_environment of test.libregrtest.save_env, -# to check if a test modified the environment. The flag should be set to False -# before running a new test. -# -# For example, threading_helper.threading_cleanup() sets the flag is the function fails -# to cleanup threads. -environment_altered = False - -def reap_children(): - """Use this function at the end of test_main() whenever sub-processes - are started. This will help ensure that no extra children (zombies) - stick around to hog resources and create problems when looking - for refleaks. - """ - global environment_altered - - # Need os.waitpid(-1, os.WNOHANG): Windows is not supported - if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): - return - elif not has_subprocess_support: - return - - # Reap all our dead child processes so we don't leave zombies around. - # These hog resources and might be causing some of the buildbots to die. - while True: - try: - # Read the exit status of any child process which already completed - pid, status = os.waitpid(-1, os.WNOHANG) - except OSError: - break - - if pid == 0: - break - - print_warning(f"reap_children() reaped child process {pid}") - environment_altered = True - - -@contextlib.contextmanager -def swap_attr(obj, attr, new_val): - """Temporary swap out an attribute with a new object. - - Usage: - with swap_attr(obj, "attr", 5): - ... - - This will set obj.attr to 5 for the duration of the with: block, - restoring the old value at the end of the block. If `attr` doesn't - exist on `obj`, it will be created and then deleted at the end of the - block. - - The old value (or None if it doesn't exist) will be assigned to the - target of the "as" clause, if there is one. - """ - if hasattr(obj, attr): - real_val = getattr(obj, attr) - setattr(obj, attr, new_val) - try: - yield real_val - finally: - setattr(obj, attr, real_val) - else: - setattr(obj, attr, new_val) - try: - yield - finally: - if hasattr(obj, attr): - delattr(obj, attr) - -@contextlib.contextmanager -def swap_item(obj, item, new_val): - """Temporary swap out an item with a new object. +@contextlib.contextmanager +def swap_item(obj, item, new_val): + """Temporary swap out an item with a new object. Usage: with swap_item(obj, "item", 5): @@ -1919,7 +1857,10 @@ def run_in_subinterp(code): module is enabled. """ _check_tracemalloc() - import _testcapi + try: + import _testcapi + except ImportError: + raise unittest.SkipTest("requires _testcapi") return _testcapi.run_in_subinterp(code) @@ -1929,11 +1870,25 @@ def run_in_subinterp_with_config(code, *, own_gil=None, **config): module is enabled. """ _check_tracemalloc() - import _testcapi + try: + import _testinternalcapi + except ImportError: + raise unittest.SkipTest("requires _testinternalcapi") if own_gil is not None: assert 'gil' not in config, (own_gil, config) - config['gil'] = 2 if own_gil else 1 - return _testcapi.run_in_subinterp_with_config(code, **config) + config['gil'] = 'own' if own_gil else 'shared' + else: + gil = config['gil'] + if gil == 0: + config['gil'] = 'default' + elif gil == 1: + config['gil'] = 'shared' + elif gil == 2: + config['gil'] = 'own' + elif not isinstance(gil, str): + raise NotImplementedError(gil) + config = types.SimpleNamespace(**config) + return _testinternalcapi.run_in_subinterp_with_config(code, config) def _check_tracemalloc(): @@ -1949,24 +1904,31 @@ def _check_tracemalloc(): "if tracemalloc module is tracing " "memory allocations") -# TODO: RUSTPYTHON; GC is not supported yet -# def check_free_after_iterating(test, iter, cls, args=()): -# class A(cls): -# def __del__(self): -# nonlocal done -# done = True -# try: -# next(it) -# except StopIteration: -# pass - -# done = False -# it = iter(A(*args)) -# # Issue 26494: Shouldn't crash -# test.assertRaises(StopIteration, next, it) -# # The sequence should be deallocated just after the end of iterating -# gc_collect() -# test.assertTrue(done) + +def check_free_after_iterating(test, iter, cls, args=()): + # TODO: RUSTPYTHON; GC is not supported yet + test.assertTrue(False) + return + + done = False + def wrapper(): + class A(cls): + def __del__(self): + nonlocal done + done = True + try: + next(it) + except StopIteration: + pass + + it = iter(A(*args)) + # Issue 26494: Shouldn't crash + test.assertRaises(StopIteration, next, it) + + wrapper() + # The sequence should be deallocated just after the end of iterating + gc_collect() + test.assertTrue(done) def missing_compiler_executable(cmd_names=[]): @@ -1978,8 +1940,9 @@ def missing_compiler_executable(cmd_names=[]): missing. """ - from setuptools._distutils import ccompiler, sysconfig, spawn + from setuptools._distutils import ccompiler, sysconfig from setuptools import errors + import shutil compiler = ccompiler.new_compiler() sysconfig.customize_compiler(compiler) @@ -1998,22 +1961,22 @@ def missing_compiler_executable(cmd_names=[]): "the '%s' executable is not configured" % name elif not cmd: continue - if spawn.find_executable(cmd[0]) is None: + if shutil.which(cmd[0]) is None: return cmd[0] -_is_android_emulator = None +_old_android_emulator = None def setswitchinterval(interval): # Setting a very low gil interval on the Android emulator causes python # to hang (issue #26939). - minimum_interval = 1e-5 + minimum_interval = 1e-4 # 100 us if is_android and interval < minimum_interval: - global _is_android_emulator - if _is_android_emulator is None: - import subprocess - _is_android_emulator = (subprocess.check_output( - ['getprop', 'ro.kernel.qemu']).strip() == b'1') - if _is_android_emulator: + global _old_android_emulator + if _old_android_emulator is None: + import platform + av = platform.android_ver() + _old_android_emulator = av.is_emulator and av.api_level < 24 + if _old_android_emulator: interval = minimum_interval return sys.setswitchinterval(interval) @@ -2088,8 +2051,19 @@ def restore(self): def with_pymalloc(): - import _testcapi - return _testcapi.WITH_PYMALLOC + try: + import _testcapi + except ImportError: + raise unittest.SkipTest("requires _testcapi") + return _testcapi.WITH_PYMALLOC and not Py_GIL_DISABLED + + +def with_mimalloc(): + try: + import _testcapi + except ImportError: + raise unittest.SkipTest("requires _testcapi") + return _testcapi.WITH_MIMALLOC class _ALWAYS_EQ: @@ -2361,14 +2335,14 @@ def set_recursion_limit(limit): finally: sys.setrecursionlimit(original_limit) -def infinite_recursion(max_depth=100): - """Set a lower limit for tests that interact with infinite recursions - (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some - debug windows builds, due to not enough functions being inlined the - stack size might not handle the default recursion limit (1000). See - bpo-11105 for details.""" - if max_depth < 3: - raise ValueError("max_depth must be at least 3, got {max_depth}") +def infinite_recursion(max_depth=None): + if max_depth is None: + # Pick a number large enough to cause problems + # but not take too long for code that can handle + # very deep recursion. + max_depth = 20_000 + elif max_depth < 3: + raise ValueError(f"max_depth must be at least 3, got {max_depth}") depth = get_recursion_depth() depth = max(depth - 1, 1) # Ignore infinite_recursion() frame. limit = depth + max_depth @@ -2428,11 +2402,13 @@ def _findwheel(pkgname): If set, the wheels are searched for in WHEEL_PKG_DIR (see ensurepip). Otherwise, they are searched for in the test directory. """ - wheel_dir = sysconfig.get_config_var('WHEEL_PKG_DIR') or TEST_HOME_DIR + wheel_dir = sysconfig.get_config_var('WHEEL_PKG_DIR') or os.path.join( + TEST_HOME_DIR, 'wheeldata', + ) filenames = os.listdir(wheel_dir) filenames = sorted(filenames, reverse=True) # approximate "newest" first for filename in filenames: - # filename is like 'setuptools-67.6.1-py3-none-any.whl' + # filename is like 'setuptools-{version}-py3-none-any.whl' if not filename.endswith(".whl"): continue prefix = pkgname + '-' @@ -2441,20 +2417,29 @@ def _findwheel(pkgname): raise FileNotFoundError(f"No wheel for {pkgname} found in {wheel_dir}") -# Context manager that creates a virtual environment, install setuptools and wheel in it -# and returns the path to the venv directory and the path to the python executable +# Context manager that creates a virtual environment, install setuptools in it, +# and returns the paths to the venv directory and the python executable @contextlib.contextmanager -def setup_venv_with_pip_setuptools_wheel(venv_dir): +def setup_venv_with_pip_setuptools(venv_dir): import subprocess from .os_helper import temp_cwd + def run_command(cmd): + if verbose: + import shlex + print() + print('Run:', ' '.join(map(shlex.quote, cmd))) + subprocess.run(cmd, check=True) + else: + subprocess.run(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True) + with temp_cwd() as temp_dir: # Create virtual environment to get setuptools cmd = [sys.executable, '-X', 'dev', '-m', 'venv', venv_dir] - if verbose: - print() - print('Run:', ' '.join(cmd)) - subprocess.run(cmd, check=True) + run_command(cmd) venv = os.path.join(temp_dir, venv_dir) @@ -2465,14 +2450,11 @@ def setup_venv_with_pip_setuptools_wheel(venv_dir): else: python = os.path.join(venv, 'bin', python_exe) - cmd = [python, '-X', 'dev', + cmd = (python, '-X', 'dev', '-m', 'pip', 'install', _findwheel('setuptools'), - _findwheel('wheel')] - if verbose: - print() - print('Run:', ' '.join(cmd)) - subprocess.run(cmd, check=True) + ) + run_command(cmd) yield python @@ -2595,6 +2577,46 @@ def sleeping_retry(timeout, err_msg=None, /, delay = min(delay * 2, max_delay) +class CPUStopwatch: + """Context manager to roughly time a CPU-bound operation. + + Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of + other processes). + + N.B.: + - This *includes* time spent in other threads. + - Some systems only have a coarse resolution; check + stopwatch.clock_info.rseolution if. + + Usage: + + with ProcessStopwatch() as stopwatch: + ... + elapsed = stopwatch.seconds + resolution = stopwatch.clock_info.resolution + """ + def __enter__(self): + get_time = time.process_time + clock_info = time.get_clock_info('process_time') + if get_time() <= 0: # some platforms like WASM lack process_time() + get_time = time.monotonic + clock_info = time.get_clock_info('monotonic') + self.context = disable_gc() + self.context.__enter__() + self.get_time = get_time + self.clock_info = clock_info + self.start_time = get_time() + return self + + def __exit__(self, *exc): + try: + end_time = self.get_time() + finally: + result = self.context.__exit__(*exc) + self.seconds = end_time - self.start_time + return result + + @contextlib.contextmanager def adjust_int_max_str_digits(max_digits): """Temporarily change the integer string conversion length limit.""" @@ -2606,7 +2628,6 @@ def adjust_int_max_str_digits(max_digits): sys.set_int_max_str_digits(current) -# From CPython 3.13.5 def get_c_recursion_limit(): try: import _testcapi @@ -2615,29 +2636,116 @@ def get_c_recursion_limit(): raise unittest.SkipTest('requires _testcapi') -# From CPython 3.13.5 def exceeds_recursion_limit(): """For recursion tests, easily exceeds default recursion limit.""" return get_c_recursion_limit() * 3 -#For recursion tests, easily exceeds default recursion limit -EXCEEDS_RECURSION_LIMIT = 5000 - -# The default C recursion limit (from Include/cpython/pystate.h). -C_RECURSION_LIMIT = 1500 - # Windows doesn't have os.uname() but it doesn't support s390x. is_s390x = hasattr(os, 'uname') and os.uname().machine == 's390x' -skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x', - 'skipped on s390x') -HAVE_ASAN_FORK_BUG = check_sanitizer(address=True) +skip_on_s390x = unittest.skipIf(is_s390x, 'skipped on s390x') -# From CPython 3.13.5 Py_TRACE_REFS = hasattr(sys, 'getobjects') +# Decorator to disable optimizer while a function run +def without_optimizer(func): + try: + from _testinternalcapi import get_optimizer, set_optimizer + except ImportError: + return func + @functools.wraps(func) + def wrapper(*args, **kwargs): + save_opt = get_optimizer() + try: + set_optimizer(None) + return func(*args, **kwargs) + finally: + set_optimizer(save_opt) + return wrapper + + +_BASE_COPY_SRC_DIR_IGNORED_NAMES = frozenset({ + # SRC_DIR/.git + '.git', + # ignore all __pycache__/ sub-directories + '__pycache__', +}) + +# Ignore function for shutil.copytree() to copy the Python source code. +def copy_python_src_ignore(path, names): + ignored = _BASE_COPY_SRC_DIR_IGNORED_NAMES + if os.path.basename(path) == 'Doc': + ignored |= { + # SRC_DIR/Doc/build/ + 'build', + # SRC_DIR/Doc/venv/ + 'venv', + } + + # check if we are at the root of the source code + elif 'Modules' in names: + ignored |= { + # SRC_DIR/build/ + 'build', + } + return ignored + + +def iter_builtin_types(): + for obj in __builtins__.values(): + if not isinstance(obj, type): + continue + cls = obj + if cls.__module__ != 'builtins': + continue + yield cls + + +def iter_slot_wrappers(cls): + assert cls.__module__ == 'builtins', cls + + def is_slot_wrapper(name, value): + if not isinstance(value, types.WrapperDescriptorType): + assert not repr(value).startswith(' dict[str, str]: + clean_env = os.environ.copy() + for k in clean_env.copy(): + if k.startswith("PYTHON"): + clean_env.pop(k) + clean_env.pop("FORCE_COLOR", None) + clean_env.pop("NO_COLOR", None) + return clean_env + + +def initialized_with_pyrepl(): + """Detect whether PyREPL was used during Python initialization.""" + # If the main module has a __file__ attribute it's a Python module, which means PyREPL. + return hasattr(sys.modules["__main__"], "__file__") + + class BrokenIter: def __init__(self, init_raises=False, next_raises=False, iter_raises=False): if init_raises: @@ -2692,3 +2814,224 @@ def __iter__(self): if self.iter_raises: 1/0 return self + + +def linked_to_musl(): + """ + Test if the Python executable is linked to the musl C library. + """ + if sys.platform != 'linux': + return False + + import subprocess + exe = getattr(sys, '_base_executable', sys.executable) + cmd = ['ldd', exe] + try: + stdout = subprocess.check_output(cmd, + text=True, + stderr=subprocess.STDOUT) + except (OSError, subprocess.CalledProcessError): + return False + return ('musl' in stdout) + + +# TODO: RUSTPYTHON +# Every line of code below allowed us to update `Lib/test/support/__init__.py` without +# needing to update `libregtest` and its dependencies. +# Ideally we want to remove all code below and update `libregtest`. +# +# Code below was copied from: https://github.com/RustPython/RustPython/blob/9499d39f55b73535e2405bf208d5380241f79ada/Lib/test/support/__init__.py + +from .testresult import get_test_runner + +def _filter_suite(suite, pred): + """Recursively filter test cases in a suite based on a predicate.""" + newtests = [] + for test in suite._tests: + if isinstance(test, unittest.TestSuite): + _filter_suite(test, pred) + newtests.append(test) + else: + if pred(test): + newtests.append(test) + suite._tests = newtests + +# By default, don't filter tests +_match_test_func = None + +_accept_test_patterns = None +_ignore_test_patterns = None + +def match_test(test): + # Function used by support.run_unittest() and regrtest --list-cases + if _match_test_func is None: + return True + else: + return _match_test_func(test.id()) + +def _is_full_match_test(pattern): + # If a pattern contains at least one dot, it's considered + # as a full test identifier. + # Example: 'test.test_os.FileTests.test_access'. + # + # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' + # or '[!...]'. For example, ignore 'test_access*'. + return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) + +def set_match_tests(accept_patterns=None, ignore_patterns=None): + global _match_test_func, _accept_test_patterns, _ignore_test_patterns + + if accept_patterns is None: + accept_patterns = () + if ignore_patterns is None: + ignore_patterns = () + + accept_func = ignore_func = None + + if accept_patterns != _accept_test_patterns: + accept_patterns, accept_func = _compile_match_function(accept_patterns) + if ignore_patterns != _ignore_test_patterns: + ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) + + # Create a copy since patterns can be mutable and so modified later + _accept_test_patterns = tuple(accept_patterns) + _ignore_test_patterns = tuple(ignore_patterns) + + if accept_func is not None or ignore_func is not None: + def match_function(test_id): + accept = True + ignore = False + if accept_func: + accept = accept_func(test_id) + if ignore_func: + ignore = ignore_func(test_id) + return accept and not ignore + + _match_test_func = match_function + +def _compile_match_function(patterns): + if not patterns: + func = None + # set_match_tests(None) behaves as set_match_tests(()) + patterns = () + elif all(map(_is_full_match_test, patterns)): + # Simple case: all patterns are full test identifier. + # The test.bisect_cmd utility only uses such full test identifiers. + func = set(patterns).__contains__ + else: + import fnmatch + regex = '|'.join(map(fnmatch.translate, patterns)) + # The search *is* case sensitive on purpose: + # don't use flags=re.IGNORECASE + regex_match = re.compile(regex).match + + def match_test_regex(test_id): + if regex_match(test_id): + # The regex matches the whole identifier, for example + # 'test.test_os.FileTests.test_access'. + return True + else: + # Try to match parts of the test identifier. + # For example, split 'test.test_os.FileTests.test_access' + # into: 'test', 'test_os', 'FileTests' and 'test_access'. + return any(map(regex_match, test_id.split("."))) + + func = match_test_regex + + return patterns, func + +def run_unittest(*classes): + """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) + loader = unittest.TestLoader() + suite = unittest.TestSuite() + for cls in classes: + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(loader.loadTestsFromModule(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): + suite.addTest(cls) + else: + suite.addTest(loader.loadTestsFromTestCase(cls)) + _filter_suite(suite, match_test) + return _run_suite(suite) + +def _run_suite(suite): + """Run tests from a unittest.TestSuite-derived class.""" + runner = get_test_runner(sys.stdout, + verbosity=verbose, + capture_output=(junit_xml_list is not None)) + + result = runner.run(suite) + + if junit_xml_list is not None: + junit_xml_list.append(result.get_xml_element()) + + if not result.testsRun and not result.skipped and not result.errors: + raise TestDidNotRun + if not result.wasSuccessful(): + stats = TestStats.from_unittest(result) + if len(result.errors) == 1 and not result.failures: + err = result.errors[0][1] + elif len(result.failures) == 1 and not result.errors: + err = result.failures[0][1] + else: + err = "multiple errors occurred" + if not verbose: err += "; run in verbose mode for details" + errors = [(str(tc), exc_str) for tc, exc_str in result.errors] + failures = [(str(tc), exc_str) for tc, exc_str in result.failures] + raise TestFailedWithDetails(err, errors, failures, stats=stats) + return result + +@dataclasses.dataclass(slots=True) +class TestStats: + tests_run: int = 0 + failures: int = 0 + skipped: int = 0 + + @staticmethod + def from_unittest(result): + return TestStats(result.testsRun, + len(result.failures), + len(result.skipped)) + + @staticmethod + def from_doctest(results): + return TestStats(results.attempted, + results.failed) + + def accumulate(self, stats): + self.tests_run += stats.tests_run + self.failures += stats.failures + self.skipped += stats.skipped + + +def run_doctest(module, verbosity=None, optionflags=0): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + results = doctest.testmod(module, + verbose=verbosity, + optionflags=optionflags) + if results.failed: + stats = TestStats.from_doctest(results) + raise TestFailed(f"{results.failed} of {results.attempted} " + f"doctests failed", + stats=stats) + if verbose: + print('doctest (%s) ... %d tests with zero failures' % + (module.__name__, results.attempted)) + return results diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index 79c050bae6..964bcc7288 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -1,5 +1,6 @@ """Unit tests for collections.py.""" +import array import collections import copy import doctest @@ -469,6 +470,8 @@ def test_module_parameter(self): NT = namedtuple('NT', ['x', 'y'], module=collections) self.assertEqual(NT.__module__, collections) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_instance(self): Point = namedtuple('Point', 'x y') p = Point(11, 22) @@ -490,12 +493,8 @@ def test_instance(self): self.assertEqual(p._replace(x=1), (1, 22)) # test _replace method self.assertEqual(p._asdict(), dict(x=11, y=22)) # test _asdict method - try: + with self.assertRaises(TypeError): p._replace(x=1, error=2) - except ValueError: - pass - else: - self._fail('Did not detect an incorrect fieldname') # verify that field string can have commas Point = namedtuple('Point', 'x, y') @@ -547,7 +546,9 @@ def test_odd_sizes(self): self.assertEqual(Dot(1)._replace(d=999), (999,)) self.assertEqual(Dot(1)._fields, ('d',)) - n = support.EXCEEDS_RECURSION_LIMIT + @support.requires_resource('cpu') + def test_large_size(self): + n = support.exceeds_recursion_limit() names = list(set(''.join([choice(string.ascii_letters) for j in range(10)]) for i in range(n))) n = len(names) @@ -1150,6 +1151,7 @@ class NonCol(ColImpl): self.assertFalse(issubclass(NonCol, Collection)) self.assertFalse(isinstance(NonCol(), Collection)) + def test_Iterator(self): non_samples = [None, 42, 3.14, 1j, b"", "", (), [], {}, set()] for x in non_samples: @@ -1985,6 +1987,7 @@ def test_MutableSequence(self): for sample in [list, bytearray, deque]: self.assertIsInstance(sample(), MutableSequence) self.assertTrue(issubclass(sample, MutableSequence)) + self.assertTrue(issubclass(array.array, MutableSequence)) self.assertFalse(issubclass(str, MutableSequence)) self.validate_abstract_methods(MutableSequence, '__contains__', '__iter__', '__len__', '__getitem__', '__setitem__', '__delitem__', 'insert') diff --git a/Lib/test/test_dictviews.py b/Lib/test/test_dictviews.py index 172b98aa68..667cccd6cd 100644 --- a/Lib/test/test_dictviews.py +++ b/Lib/test/test_dictviews.py @@ -1,9 +1,8 @@ import collections.abc import copy import pickle -import sys import unittest -from test.support import C_RECURSION_LIMIT +from test.support import get_c_recursion_limit class DictSetTest(unittest.TestCase): @@ -282,7 +281,7 @@ def test_recursive_repr(self): @unittest.expectedFailure def test_deeply_nested_repr(self): d = {} - for i in range(C_RECURSION_LIMIT//2 + 100): + for i in range(get_c_recursion_limit()//2 + 100): d = {42: d.values()} self.assertRaises(RecursionError, repr, d) diff --git a/Lib/test/test_int.py b/Lib/test/test_int.py index 7ac83288f4..3ad4218610 100644 --- a/Lib/test/test_int.py +++ b/Lib/test/test_int.py @@ -1,9 +1,22 @@ import sys +import time import unittest +# TODO: RUSTPYTHON +# This is one of the tests that we run on wasi. `unittest.mock` requires `_socket` +# which we don't have on wasi (yet). Also, every test here the needs `unittest.mock` +# is cpython specifc, so this import is redundent anyway. +# from unittest import mock from test import support -from test.test_grammar import (VALID_UNDERSCORE_LITERALS, - INVALID_UNDERSCORE_LITERALS) +from test.support.numbers import ( + VALID_UNDERSCORE_LITERALS, + INVALID_UNDERSCORE_LITERALS, +) + +try: + import _pylong +except ImportError: + _pylong = None L = [ ('0', 0), @@ -83,6 +96,7 @@ def test_basic(self): self.assertRaises(TypeError, int, 1, 12) + self.assertRaises(TypeError, int, "10", 2, 1) self.assertEqual(int('0o123', 0), 83) self.assertEqual(int('0x123', 16), 291) @@ -148,6 +162,8 @@ def test_basic(self): self.assertEqual(int(' 0O123 ', 0), 83) self.assertEqual(int(' 0X123 ', 0), 291) self.assertEqual(int(' 0B100 ', 0), 4) + with self.assertRaises(ValueError): + int('010', 0) # without base still base 10 self.assertEqual(int('0123'), 123) @@ -214,6 +230,26 @@ def test_basic(self): self.assertEqual(int('2br45qc', 35), 4294967297) self.assertEqual(int('1z141z5', 36), 4294967297) + def test_invalid_signs(self): + with self.assertRaises(ValueError): + int('+') + with self.assertRaises(ValueError): + int('-') + with self.assertRaises(ValueError): + int('- 1') + with self.assertRaises(ValueError): + int('+ 1') + with self.assertRaises(ValueError): + int(' + 1 ') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_unicode(self): + self.assertEqual(int("१२३४५६७८९०1234567890"), 12345678901234567890) + self.assertEqual(int('١٢٣٤٥٦٧٨٩٠'), 1234567890) + self.assertEqual(int("१२३४५६७८९०1234567890", 0), 12345678901234567890) + self.assertEqual(int('١٢٣٤٥٦٧٨٩٠', 0), 1234567890) + def test_underscores(self): for lit in VALID_UNDERSCORE_LITERALS: if any(ch in lit for ch in '.eEjJ'): @@ -233,7 +269,7 @@ def test_underscores(self): self.assertRaises(ValueError, int, "1__00") self.assertRaises(ValueError, int, "100_") - # @support.cpython_only + @support.cpython_only def test_small_ints(self): # Bug #3236: Return small longs from PyLong_FromString self.assertIs(int('10'), 10) @@ -369,12 +405,14 @@ def __trunc__(self): class JustTrunc(base): def __trunc__(self): return 42 - self.assertEqual(int(JustTrunc()), 42) + with self.assertWarns(DeprecationWarning): + self.assertEqual(int(JustTrunc()), 42) class ExceptionalTrunc(base): def __trunc__(self): 1 / 0 - with self.assertRaises(ZeroDivisionError): + with self.assertRaises(ZeroDivisionError), \ + self.assertWarns(DeprecationWarning): int(ExceptionalTrunc()) for trunc_result_base in (object, Classic): @@ -385,7 +423,8 @@ def __index__(self): class TruncReturnsNonInt(base): def __trunc__(self): return Index() - self.assertEqual(int(TruncReturnsNonInt()), 42) + with self.assertWarns(DeprecationWarning): + self.assertEqual(int(TruncReturnsNonInt()), 42) class Intable(trunc_result_base): def __int__(self): @@ -394,7 +433,8 @@ def __int__(self): class TruncReturnsNonIndex(base): def __trunc__(self): return Intable() - self.assertEqual(int(TruncReturnsNonInt()), 42) + with self.assertWarns(DeprecationWarning): + self.assertEqual(int(TruncReturnsNonInt()), 42) class NonIntegral(trunc_result_base): def __trunc__(self): @@ -405,7 +445,8 @@ class TruncReturnsNonIntegral(base): def __trunc__(self): return NonIntegral() try: - int(TruncReturnsNonIntegral()) + with self.assertWarns(DeprecationWarning): + int(TruncReturnsNonIntegral()) except TypeError as e: self.assertEqual(str(e), "__trunc__ returned non-Integral" @@ -423,7 +464,8 @@ class TruncReturnsBadInt(base): def __trunc__(self): return BadInt() - with self.assertRaises(TypeError): + with self.assertRaises(TypeError), \ + self.assertWarns(DeprecationWarning): int(TruncReturnsBadInt()) def test_int_subclass_with_index(self): @@ -517,13 +559,16 @@ def __trunc__(self): self.assertIs(type(n), int) bad_int = TruncReturnsBadInt() - self.assertRaises(TypeError, int, bad_int) + with self.assertWarns(DeprecationWarning): + self.assertRaises(TypeError, int, bad_int) good_int = TruncReturnsIntSubclass() - n = int(good_int) + with self.assertWarns(DeprecationWarning): + n = int(good_int) self.assertEqual(n, 1) self.assertIs(type(n), int) - n = IntSubclass(good_int) + with self.assertWarns(DeprecationWarning): + n = IntSubclass(good_int) self.assertEqual(n, 1) self.assertIs(type(n), IntSubclass) @@ -568,5 +613,329 @@ def test_issue31619(self): self.assertEqual(int('1_2_3_4_5_6_7', 32), 1144132807) +class IntStrDigitLimitsTests(unittest.TestCase): + + int_class = int # Override this in subclasses to reuse the suite. + + def setUp(self): + super().setUp() + self._previous_limit = sys.get_int_max_str_digits() + sys.set_int_max_str_digits(2048) + + def tearDown(self): + sys.set_int_max_str_digits(self._previous_limit) + super().tearDown() + + def test_disabled_limit(self): + self.assertGreater(sys.get_int_max_str_digits(), 0) + self.assertLess(sys.get_int_max_str_digits(), 20_000) + with support.adjust_int_max_str_digits(0): + self.assertEqual(sys.get_int_max_str_digits(), 0) + i = self.int_class('1' * 20_000) + str(i) + self.assertGreater(sys.get_int_max_str_digits(), 0) + + def test_max_str_digits_edge_cases(self): + """Ignore the +/- sign and space padding.""" + int_class = self.int_class + maxdigits = sys.get_int_max_str_digits() + + int_class('1' * maxdigits) + int_class(' ' + '1' * maxdigits) + int_class('1' * maxdigits + ' ') + int_class('+' + '1' * maxdigits) + int_class('-' + '1' * maxdigits) + self.assertEqual(len(str(10 ** (maxdigits - 1))), maxdigits) + + def check(self, i, base=None): + with self.assertRaises(ValueError): + if base is None: + self.int_class(i) + else: + self.int_class(i, base) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_max_str_digits(self): + maxdigits = sys.get_int_max_str_digits() + + self.check('1' * (maxdigits + 1)) + self.check(' ' + '1' * (maxdigits + 1)) + self.check('1' * (maxdigits + 1) + ' ') + self.check('+' + '1' * (maxdigits + 1)) + self.check('-' + '1' * (maxdigits + 1)) + self.check('1' * (maxdigits + 1)) + + i = 10 ** maxdigits + with self.assertRaises(ValueError): + str(i) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_denial_of_service_prevented_int_to_str(self): + """Regression test: ensure we fail before performing O(N**2) work.""" + maxdigits = sys.get_int_max_str_digits() + assert maxdigits < 50_000, maxdigits # A test prerequisite. + + huge_int = int(f'0x{"c"*65_000}', base=16) # 78268 decimal digits. + digits = 78_268 + with ( + support.adjust_int_max_str_digits(digits), + support.CPUStopwatch() as sw_convert): + huge_decimal = str(huge_int) + self.assertEqual(len(huge_decimal), digits) + # Ensuring that we chose a slow enough conversion to measure. + # It takes 0.1 seconds on a Zen based cloud VM in an opt build. + # Some OSes have a low res 1/64s timer, skip if hard to measure. + if sw_convert.seconds < sw_convert.clock_info.resolution * 2: + raise unittest.SkipTest('"slow" conversion took only ' + f'{sw_convert.seconds} seconds.') + + # We test with the limit almost at the size needed to check performance. + # The performant limit check is slightly fuzzy, give it a some room. + with support.adjust_int_max_str_digits(int(.995 * digits)): + with ( + self.assertRaises(ValueError) as err, + support.CPUStopwatch() as sw_fail_huge): + str(huge_int) + self.assertIn('conversion', str(err.exception)) + self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2) + + # Now we test that a conversion that would take 30x as long also fails + # in a similarly fast fashion. + extra_huge_int = int(f'0x{"c"*500_000}', base=16) # 602060 digits. + with ( + self.assertRaises(ValueError) as err, + support.CPUStopwatch() as sw_fail_extra_huge): + # If not limited, 8 seconds said Zen based cloud VM. + str(extra_huge_int) + self.assertIn('conversion', str(err.exception)) + self.assertLess(sw_fail_extra_huge.seconds, sw_convert.seconds/2) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_denial_of_service_prevented_str_to_int(self): + """Regression test: ensure we fail before performing O(N**2) work.""" + maxdigits = sys.get_int_max_str_digits() + assert maxdigits < 100_000, maxdigits # A test prerequisite. + + digits = 133700 + huge = '8'*digits + with ( + support.adjust_int_max_str_digits(digits), + support.CPUStopwatch() as sw_convert): + int(huge) + # Ensuring that we chose a slow enough conversion to measure. + # It takes 0.1 seconds on a Zen based cloud VM in an opt build. + # Some OSes have a low res 1/64s timer, skip if hard to measure. + if sw_convert.seconds < sw_convert.clock_info.resolution * 2: + raise unittest.SkipTest('"slow" conversion took only ' + f'{sw_convert.seconds} seconds.') + + with support.adjust_int_max_str_digits(digits - 1): + with ( + self.assertRaises(ValueError) as err, + support.CPUStopwatch() as sw_fail_huge): + int(huge) + self.assertIn('conversion', str(err.exception)) + self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2) + + # Now we test that a conversion that would take 30x as long also fails + # in a similarly fast fashion. + extra_huge = '7'*1_200_000 + with ( + self.assertRaises(ValueError) as err, + support.CPUStopwatch() as sw_fail_extra_huge): + # If not limited, 8 seconds in the Zen based cloud VM. + int(extra_huge) + self.assertIn('conversion', str(err.exception)) + self.assertLessEqual(sw_fail_extra_huge.seconds, sw_convert.seconds/2) + + def test_power_of_two_bases_unlimited(self): + """The limit does not apply to power of 2 bases.""" + maxdigits = sys.get_int_max_str_digits() + + for base in (2, 4, 8, 16, 32): + with self.subTest(base=base): + self.int_class('1' * (maxdigits + 1), base) + assert maxdigits < 100_000 + self.int_class('1' * 100_000, base) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_underscores_ignored(self): + maxdigits = sys.get_int_max_str_digits() + + triples = maxdigits // 3 + s = '111' * triples + s_ = '1_11' * triples + self.int_class(s) # succeeds + self.int_class(s_) # succeeds + self.check(f'{s}111') + self.check(f'{s_}_111') + + def test_sign_not_counted(self): + int_class = self.int_class + max_digits = sys.get_int_max_str_digits() + s = '5' * max_digits + i = int_class(s) + pos_i = int_class(f'+{s}') + assert i == pos_i + neg_i = int_class(f'-{s}') + assert -pos_i == neg_i + str(pos_i) + str(neg_i) + + def _other_base_helper(self, base): + int_class = self.int_class + max_digits = sys.get_int_max_str_digits() + s = '2' * max_digits + i = int_class(s, base) + if base > 10: + with self.assertRaises(ValueError): + str(i) + elif base < 10: + str(i) + with self.assertRaises(ValueError) as err: + int_class(f'{s}1', base) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_int_from_other_bases(self): + base = 3 + with self.subTest(base=base): + self._other_base_helper(base) + base = 36 + with self.subTest(base=base): + self._other_base_helper(base) + + def test_int_max_str_digits_is_per_interpreter(self): + # Changing the limit in one interpreter does not change others. + code = """if 1: + # Subinterpreters maintain and enforce their own limit + import sys + sys.set_int_max_str_digits(2323) + try: + int('3'*3333) + except ValueError: + pass + else: + raise AssertionError('Expected a int max str digits ValueError.') + """ + with support.adjust_int_max_str_digits(4000): + before_value = sys.get_int_max_str_digits() + self.assertEqual(support.run_in_subinterp(code), 0, + 'subinterp code failure, check stderr.') + after_value = sys.get_int_max_str_digits() + self.assertEqual(before_value, after_value) + + +class IntSubclassStrDigitLimitsTests(IntStrDigitLimitsTests): + int_class = IntSubclass + + +class PyLongModuleTests(unittest.TestCase): + # Tests of the functions in _pylong.py. Those get used when the + # number of digits in the input values are large enough. + + def setUp(self): + super().setUp() + self._previous_limit = sys.get_int_max_str_digits() + sys.set_int_max_str_digits(0) + + def tearDown(self): + sys.set_int_max_str_digits(self._previous_limit) + super().tearDown() + + def _test_pylong_int_to_decimal(self, n, suffix): + s = str(n) + self.assertEqual(s[-10:], suffix) + s2 = str(-n) + self.assertEqual(s2, '-' + s) + s3 = '%d' % n + self.assertEqual(s3, s) + s4 = b'%d' % n + self.assertEqual(s4, s.encode('ascii')) + + def test_pylong_int_to_decimal(self): + self._test_pylong_int_to_decimal((1 << 100_000), '9883109376') + self._test_pylong_int_to_decimal((1 << 100_000) - 1, '9883109375') + self._test_pylong_int_to_decimal(10**30_000, '0000000000') + self._test_pylong_int_to_decimal(10**30_000 - 1, '9999999999') + self._test_pylong_int_to_decimal(3**60_000, '9313200001') + + @support.requires_resource('cpu') + def test_pylong_int_to_decimal_2(self): + self._test_pylong_int_to_decimal(2**1_000_000, '2747109376') + self._test_pylong_int_to_decimal(10**300_000, '0000000000') + self._test_pylong_int_to_decimal(3**600_000, '3132000001') + + def test_pylong_int_divmod(self): + n = (1 << 100_000) + a, b = divmod(n*3 + 1, n) + assert a == 3 and b == 1 + + def test_pylong_str_to_int(self): + v1 = 1 << 100_000 + s = str(v1) + v2 = int(s) + assert v1 == v2 + v3 = int(' -' + s) + assert -v1 == v3 + v4 = int(' +' + s + ' ') + assert v1 == v4 + with self.assertRaises(ValueError) as err: + int(s + 'z') + with self.assertRaises(ValueError) as err: + int(s + '_') + with self.assertRaises(ValueError) as err: + int('_' + s) + + @support.cpython_only # tests implementation details of CPython. + @unittest.skipUnless(_pylong, "_pylong module required") + # @mock.patch.object(_pylong, "int_to_decimal_string") # NOTE(RUSTPYTHON): See comment at top of file + def test_pylong_misbehavior_error_path_to_str( + self, mock_int_to_str): + with support.adjust_int_max_str_digits(20_000): + big_value = int('7'*19_999) + mock_int_to_str.return_value = None # not a str + with self.assertRaises(TypeError) as ctx: + str(big_value) + self.assertIn('_pylong.int_to_decimal_string did not', + str(ctx.exception)) + mock_int_to_str.side_effect = RuntimeError("testABC") + with self.assertRaises(RuntimeError): + str(big_value) + + @support.cpython_only # tests implementation details of CPython. + @unittest.skipUnless(_pylong, "_pylong module required") + # @mock.patch.object(_pylong, "int_from_string") # NOTE(RUSTPYTHON): See comment at top of file + def test_pylong_misbehavior_error_path_from_str( + self, mock_int_from_str): + big_value = '7'*19_999 + with support.adjust_int_max_str_digits(20_000): + mock_int_from_str.return_value = b'not an int' + with self.assertRaises(TypeError) as ctx: + int(big_value) + self.assertIn('_pylong.int_from_string did not', + str(ctx.exception)) + + mock_int_from_str.side_effect = RuntimeError("test123") + with self.assertRaises(RuntimeError): + int(big_value) + + def test_pylong_roundtrip(self): + from random import randrange, getrandbits + bits = 5000 + while bits <= 1_000_000: + bits += randrange(-100, 101) # break bitlength patterns + hibit = 1 << (bits - 1) + n = hibit | getrandbits(bits - 1) + assert n.bit_length() == bits + sn = str(n) + self.assertFalse(sn.startswith('0')) + self.assertEqual(n, int(sn)) + bits <<= 1 + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_json/test_recursion.py b/Lib/test/test_json/test_recursion.py index 0ebc83034a..59f6f2c4b1 100644 --- a/Lib/test/test_json/test_recursion.py +++ b/Lib/test/test_json/test_recursion.py @@ -1,6 +1,8 @@ from test import support from test.test_json import PyTest, CTest +import unittest # XXX: RUSTPYTHON; importing to be able to skip tests + class JSONTestObject: pass @@ -65,7 +67,8 @@ def default(self, o): else: self.fail("didn't raise ValueError on default recursion") - + # TODO: RUSTPYTHON + @unittest.skip("TODO: RUSTPYTHON; crashes") def test_highly_nested_objects_decoding(self): # test that loading highly-nested objects doesn't segfault when C # accelerations are used. See #12017 diff --git a/Lib/test/test_pickle.py b/Lib/test/test_pickle.py index e01ddcf0a8..a9177ada39 100644 --- a/Lib/test/test_pickle.py +++ b/Lib/test/test_pickle.py @@ -16,6 +16,7 @@ from test.pickletester import AbstractHookTests from test.pickletester import AbstractUnpickleTests +from test.pickletester import AbstractPicklingErrorTests from test.pickletester import AbstractPickleTests from test.pickletester import AbstractPickleModuleTests from test.pickletester import AbstractPersistentPicklerTests @@ -40,15 +41,15 @@ class PyPickleTests(AbstractPickleModuleTests, unittest.TestCase): Pickler = pickle._Pickler Unpickler = pickle._Unpickler - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_dump_load_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes - super().test_dump_load_oob_buffers() # TODO: RUSTPYTHON, remove when this passes + def test_dump_load_oob_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_dump_load_oob_buffers() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_dumps_loads_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes - super().test_dumps_loads_oob_buffers() # TODO: RUSTPYTHON, remove when this passes + def test_dumps_loads_oob_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_dumps_loads_oob_buffers() class PyUnpicklerTests(AbstractUnpickleTests, unittest.TestCase): @@ -59,71 +60,68 @@ class PyUnpicklerTests(AbstractUnpickleTests, unittest.TestCase): AttributeError, ValueError, struct.error, IndexError, ImportError) - # TODO: RUSTPYTHON, AssertionError: ValueError not raised - @unittest.expectedFailure - def test_badly_escaped_string(self): # TODO: RUSTPYTHON, remove when this passes - super().test_badly_escaped_string() # TODO: RUSTPYTHON, remove when this passes - - # TODO: RUSTPYTHON, AssertionError - @unittest.expectedFailure - def test_correctly_quoted_string(self): # TODO: RUSTPYTHON, remove when this passes - super().test_correctly_quoted_string() # TODO: RUSTPYTHON, remove when this passes - - # TODO: RUSTPYTHON, AssertionError - @unittest.expectedFailure - def test_load_python2_str_as_bytes(self): # TODO: RUSTPYTHON, remove when this passes - super().test_load_python2_str_as_bytes() # TODO: RUSTPYTHON, remove when this passes - def loads(self, buf, **kwds): f = io.BytesIO(buf) u = self.unpickler(f, **kwds) return u.load() + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_badly_escaped_string(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_badly_escaped_string() -class PyPicklerTests(AbstractPickleTests, unittest.TestCase): + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_correctly_quoted_string(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_correctly_quoted_string() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_load_python2_str_as_bytes(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_load_python2_str_as_bytes() + + +class PyPicklingErrorTests(AbstractPicklingErrorTests, unittest.TestCase): pickler = pickle._Pickler - unpickler = pickle._Unpickler - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' - @unittest.expectedFailure - def test_buffer_callback_error(self): # TODO: RUSTPYTHON, remove when this passes - super().test_buffer_callback_error() # TODO: RUSTPYTHON, remove when this passes + def dumps(self, arg, proto=None, **kwargs): + f = io.BytesIO() + p = self.pickler(f, proto, **kwargs) + p.dump(arg) + f.seek(0) + return bytes(f.read()) - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_buffers_error(self): # TODO: RUSTPYTHON, remove when this passes - super().test_buffers_error() # TODO: RUSTPYTHON, remove when this passes + def test_picklebuffer_error(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_picklebuffer_error() - # TODO: RUSTPYTHON, AssertionError + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_complex_newobj_ex(self): # TODO: RUSTPYTHON, remove when this passes - super().test_complex_newobj_ex() # TODO: RUSTPYTHON, remove when this passes + def test_reduce_ex_None(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_reduce_ex_None() - # TODO: RUSTPYTHON, TypeError: cannot pickle 'method' object + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_in_band_buffers(self): # TODO: RUSTPYTHON, remove when this passes - super().test_in_band_buffers() # TODO: RUSTPYTHON, remove when this passes + def test_bad_getattr(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bad_getattr() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes - super().test_oob_buffers() # TODO: RUSTPYTHON, remove when this passes + def test_buffer_callback_error(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_buffer_callback_error() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_oob_buffers_writable_to_readonly(self): # TODO: RUSTPYTHON, remove when this passes - super().test_oob_buffers_writable_to_readonly() # TODO: RUSTPYTHON, remove when this passes + def test_non_continuous_buffer(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_non_continuous_buffer() - # TODO: RUSTPYTHON, TypeError: Expected type 'bytes', not 'bytearray' - @unittest.expectedFailure - def test_optional_frames(self): # TODO: RUSTPYTHON, remove when this passes - super().test_optional_frames() # TODO: RUSTPYTHON, remove when this passes - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' - @unittest.expectedFailure - def test_picklebuffer_error(self): # TODO: RUSTPYTHON, remove when this passes - super().test_picklebuffer_error() # TODO: RUSTPYTHON, remove when this passes +class PyPicklerTests(AbstractPickleTests, unittest.TestCase): + + pickler = pickle._Pickler + unpickler = pickle._Unpickler def dumps(self, arg, proto=None, **kwargs): f = io.BytesIO() @@ -137,6 +135,121 @@ def loads(self, buf, **kwds): u = self.unpickler(f, **kwds) return u.load() + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_c_methods(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_c_methods() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_complex_newobj_ex(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_complex_newobj_ex() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_py_methods(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_py_methods() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_buffers_error(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_buffers_error() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_builtin_functions(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_builtin_functions() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_bytearray_memoization(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bytearray_memoization() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_bytes_memoization(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bytes_memoization() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_in_band_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_in_band_buffers() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_oob_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_oob_buffers() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_oob_buffers_writable_to_readonly(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_oob_buffers_writable_to_readonly() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_optional_frames(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_optional_frames() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickle_setstate_None(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_pickle_setstate_None() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_recursive_nested_names2(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_recursive_nested_names2() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_buffers_error(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_buffers_error() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_builtin_functions(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_builtin_functions() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_bytearray_memoization(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bytearray_memoization() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_bytes_memoization(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bytes_memoization() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_in_band_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_in_band_buffers() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_oob_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_oob_buffers() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_oob_buffers_writable_to_readonly(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_oob_buffers_writable_to_readonly() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_optional_frames(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_optional_frames() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickle_setstate_None(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_pickle_setstate_None() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_recursive_nested_names2(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_recursive_nested_names2() + class InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests, BigmemPickleTests, unittest.TestCase): @@ -146,69 +259,95 @@ class InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests, AttributeError, ValueError, struct.error, IndexError, ImportError) - # TODO: RUSTPYTHON, AssertionError: ValueError not raised + def dumps(self, arg, protocol=None, **kwargs): + return pickle.dumps(arg, protocol, **kwargs) + + def loads(self, buf, **kwds): + return pickle.loads(buf, **kwds) + + test_framed_write_sizes_with_delayed_writer = None + test_find_class = None + test_custom_find_class = None + + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_badly_escaped_string(self): # TODO: RUSTPYTHON, remove when this passes - super().test_badly_escaped_string() # TODO: RUSTPYTHON, remove when this passes + def test_c_methods(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_c_methods() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_buffer_callback_error(self): # TODO: RUSTPYTHON, remove when this passes - super().test_buffer_callback_error() # TODO: RUSTPYTHON, remove when this passes + def test_complex_newobj_ex(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_complex_newobj_ex() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_buffers_error(self): # TODO: RUSTPYTHON, remove when this passes - super().test_buffers_error() # TODO: RUSTPYTHON, remove when this passes + def test_badly_escaped_string(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_badly_escaped_string() - # TODO: RUSTPYTHON, AssertionError + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_complex_newobj_ex(self): # TODO: RUSTPYTHON, remove when this passes - super().test_complex_newobj_ex() # TODO: RUSTPYTHON, remove when this passes + def test_correctly_quoted_string(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_correctly_quoted_string() - # TODO: RUSTPYTHON, AssertionError + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_correctly_quoted_string(self): # TODO: RUSTPYTHON, remove when this passes - super().test_correctly_quoted_string() # TODO: RUSTPYTHON, remove when this passes + def test_load_python2_str_as_bytes(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_load_python2_str_as_bytes() - # TODO: RUSTPYTHON, TypeError: cannot pickle 'method' object + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_in_band_buffers(self): # TODO: RUSTPYTHON, remove when this passes - super().test_in_band_buffers() # TODO: RUSTPYTHON, remove when this passes + def test_py_methods(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_py_methods() - # TODO: RUSTPYTHON, AssertionError + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_load_python2_str_as_bytes(self): # TODO: RUSTPYTHON, remove when this passes - super().test_load_python2_str_as_bytes() # TODO: RUSTPYTHON, remove when this passes + def test_recursive_nested_names2(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_recursive_nested_names2() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes - super().test_oob_buffers() # TODO: RUSTPYTHON, remove when this passes + def test_oob_buffers_writable_to_readonly(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_oob_buffers_writable_to_readonly() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_oob_buffers_writable_to_readonly(self): # TODO: RUSTPYTHON, remove when this passes - super().test_oob_buffers_writable_to_readonly() # TODO: RUSTPYTHON, remove when this passes + def test_buffers_error(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_buffers_error() - # TODO: RUSTPYTHON, TypeError: Expected type 'bytes', not 'bytearray' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_optional_frames(self): # TODO: RUSTPYTHON, remove when this passes - super().test_optional_frames() # TODO: RUSTPYTHON, remove when this passes + def test_builtin_functions(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_builtin_functions() - # TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer' + # TODO: RUSTPYTHON @unittest.expectedFailure - def test_picklebuffer_error(self): # TODO: RUSTPYTHON, remove when this passes - super().test_picklebuffer_error() # TODO: RUSTPYTHON, remove when this passes + def test_bytearray_memoization(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bytearray_memoization() - def dumps(self, arg, protocol=None, **kwargs): - return pickle.dumps(arg, protocol, **kwargs) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_bytes_memoization(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_bytes_memoization() - def loads(self, buf, **kwds): - return pickle.loads(buf, **kwds) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_in_band_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_in_band_buffers() - test_framed_write_sizes_with_delayed_writer = None + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_oob_buffers(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_oob_buffers() + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_optional_frames(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_optional_frames() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickle_setstate_None(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_pickle_setstate_None() class PersistentPicklerUnpicklerMixin(object): @@ -242,6 +381,7 @@ class PyIdPersPicklerTests(AbstractIdentityPersistentPicklerTests, pickler = pickle._Pickler unpickler = pickle._Unpickler + persistent_load_error = pickle.UnpicklingError @support.cpython_only def test_pickler_reference_cycle(self): @@ -296,7 +436,6 @@ class DispatchTable: support.gc_collect() self.assertIsNone(table_ref()) - @support.cpython_only def test_unpickler_reference_cycle(self): def check(Unpickler): @@ -326,12 +465,128 @@ def persistent_load(pid): return pid check(PersUnpickler) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickler_super(self): + class PersPickler(self.pickler): + def persistent_id(subself, obj): + called.append(obj) + self.assertIsNone(super().persistent_id(obj)) + return obj + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + f = io.BytesIO() + pickler = PersPickler(f, proto) + called = [] + pickler.dump('abc') + self.assertEqual(called, ['abc']) + self.assertEqual(self.loads(f.getvalue()), 'abc') + + def test_unpickler_super(self): + class PersUnpickler(self.unpickler): + def persistent_load(subself, pid): + called.append(pid) + with self.assertRaises(self.persistent_load_error): + super().persistent_load(pid) + return pid + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + unpickler = PersUnpickler(io.BytesIO(self.dumps('abc', proto))) + called = [] + self.assertEqual(unpickler.load(), 'abc') + self.assertEqual(called, ['abc']) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickler_instance_attribute(self): + def persistent_id(obj): + called.append(obj) + return obj + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + f = io.BytesIO() + pickler = self.pickler(f, proto) + called = [] + old_persistent_id = pickler.persistent_id + pickler.persistent_id = persistent_id + self.assertEqual(pickler.persistent_id, persistent_id) + pickler.dump('abc') + self.assertEqual(called, ['abc']) + self.assertEqual(self.loads(f.getvalue()), 'abc') + del pickler.persistent_id + self.assertEqual(pickler.persistent_id, old_persistent_id) + + def test_unpickler_instance_attribute(self): + def persistent_load(pid): + called.append(pid) + return pid + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + unpickler = self.unpickler(io.BytesIO(self.dumps('abc', proto))) + called = [] + old_persistent_load = unpickler.persistent_load + unpickler.persistent_load = persistent_load + self.assertEqual(unpickler.persistent_load, persistent_load) + self.assertEqual(unpickler.load(), 'abc') + self.assertEqual(called, ['abc']) + del unpickler.persistent_load + self.assertEqual(unpickler.persistent_load, old_persistent_load) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickler_super_instance_attribute(self): + class PersPickler(self.pickler): + def persistent_id(subself, obj): + raise AssertionError('should never be called') + def _persistent_id(subself, obj): + called.append(obj) + self.assertIsNone(super().persistent_id(obj)) + return obj + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + f = io.BytesIO() + pickler = PersPickler(f, proto) + called = [] + old_persistent_id = pickler.persistent_id + pickler.persistent_id = pickler._persistent_id + self.assertEqual(pickler.persistent_id, pickler._persistent_id) + pickler.dump('abc') + self.assertEqual(called, ['abc']) + self.assertEqual(self.loads(f.getvalue()), 'abc') + del pickler.persistent_id + self.assertEqual(pickler.persistent_id, old_persistent_id) + + def test_unpickler_super_instance_attribute(self): + class PersUnpickler(self.unpickler): + def persistent_load(subself, pid): + raise AssertionError('should never be called') + def _persistent_load(subself, pid): + called.append(pid) + with self.assertRaises(self.persistent_load_error): + super().persistent_load(pid) + return pid + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + unpickler = PersUnpickler(io.BytesIO(self.dumps('abc', proto))) + called = [] + old_persistent_load = unpickler.persistent_load + unpickler.persistent_load = unpickler._persistent_load + self.assertEqual(unpickler.persistent_load, unpickler._persistent_load) + self.assertEqual(unpickler.load(), 'abc') + self.assertEqual(called, ['abc']) + del unpickler.persistent_load + self.assertEqual(unpickler.persistent_load, old_persistent_load) class PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests, unittest.TestCase): pickler_class = pickle._Pickler unpickler_class = pickle._Unpickler + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_pickle_invalid_reducer_override(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_pickle_invalid_reducer_override() + class PyDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): @@ -340,6 +595,11 @@ class PyDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): def get_dispatch_table(self): return pickle.dispatch_table.copy() + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_dispatch_table_None_item(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_dispatch_table_None_item() + class PyChainDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): @@ -348,6 +608,11 @@ class PyChainDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): def get_dispatch_table(self): return collections.ChainMap({}, pickle.dispatch_table) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_dispatch_table_None_item(self): # TODO(RUSTPYTHON): Remove this test when it passes + return super().test_dispatch_table_None_item() + class PyPicklerHookTests(AbstractHookTests, unittest.TestCase): class CustomPyPicklerClass(pickle._Pickler, @@ -365,6 +630,9 @@ class CUnpicklerTests(PyUnpicklerTests): bad_stack_errors = (pickle.UnpicklingError,) truncated_errors = (pickle.UnpicklingError,) + class CPicklingErrorTests(PyPicklingErrorTests): + pickler = _pickle.Pickler + class CPicklerTests(PyPicklerTests): pickler = _pickle.Pickler unpickler = _pickle.Unpickler @@ -376,6 +644,7 @@ class CPersPicklerTests(PyPersPicklerTests): class CIdPersPicklerTests(PyIdPersPicklerTests): pickler = _pickle.Pickler unpickler = _pickle.Unpickler + persistent_load_error = _pickle.UnpicklingError class CDumpPickle_LoadPickle(PyPicklerTests): pickler = _pickle.Pickler @@ -499,7 +768,9 @@ def recurse(deep): check_unpickler(recurse(1), 32, 20) check_unpickler(recurse(20), 32, 20) check_unpickler(recurse(50), 64, 60) - check_unpickler(recurse(100), 128, 140) + if not (support.is_wasi and support.Py_DEBUG): + # stack depth too shallow in pydebug WASI. + check_unpickler(recurse(100), 128, 140) u = unpickler(io.BytesIO(pickle.dumps('a', 0)), encoding='ASCII', errors='strict') @@ -642,6 +913,8 @@ def test_reverse_name_mapping(self): module, name = mapping(module, name) self.assertEqual((module, name), (module3, name3)) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_exceptions(self): self.assertEqual(mapping('exceptions', 'StandardError'), ('builtins', 'Exception')) @@ -659,13 +932,12 @@ def test_exceptions(self): if exc in (BlockingIOError, ResourceWarning, StopAsyncIteration, + PythonFinalizationError, RecursionError, EncodingWarning, BaseExceptionGroup, - ExceptionGroup): - continue - # TODO: RUSTPYTHON: fix name mapping for _IncompleteInputError - if exc is _IncompleteInputError: + ExceptionGroup, + _IncompleteInputError): continue if exc is not OSError and issubclass(exc, OSError): self.assertEqual(reverse_mapping('builtins', name), @@ -692,9 +964,8 @@ def test_multiprocessing_exceptions(self): self.assertEqual(mapping('multiprocessing', name), ('multiprocessing.context', name)) - def load_tests(loader, tests, pattern): - tests.addTest(doctest.DocTestSuite()) + tests.addTest(doctest.DocTestSuite(pickle)) return tests diff --git a/Lib/test/test_picklebuffer.py b/Lib/test/test_picklebuffer.py new file mode 100644 index 0000000000..a14f6a86b4 --- /dev/null +++ b/Lib/test/test_picklebuffer.py @@ -0,0 +1,179 @@ +"""Unit tests for the PickleBuffer object. + +Pickling tests themselves are in pickletester.py. +""" + +import gc +# TODO: RUSTPYTHON; Implment PickleBuffer +try: + from pickle import PickleBuffer +except ImportError: + PickleBuffer = None +import weakref +import unittest + +from test.support import import_helper + + +class B(bytes): + pass + + +class PickleBufferTest(unittest.TestCase): + + def check_memoryview(self, pb, equiv): + with memoryview(pb) as m: + with memoryview(equiv) as expected: + self.assertEqual(m.nbytes, expected.nbytes) + self.assertEqual(m.readonly, expected.readonly) + self.assertEqual(m.itemsize, expected.itemsize) + self.assertEqual(m.shape, expected.shape) + self.assertEqual(m.strides, expected.strides) + self.assertEqual(m.c_contiguous, expected.c_contiguous) + self.assertEqual(m.f_contiguous, expected.f_contiguous) + self.assertEqual(m.format, expected.format) + self.assertEqual(m.tobytes(), expected.tobytes()) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_constructor_failure(self): + with self.assertRaises(TypeError): + PickleBuffer() + with self.assertRaises(TypeError): + PickleBuffer("foo") + # Released memoryview fails taking a buffer + m = memoryview(b"foo") + m.release() + with self.assertRaises(ValueError): + PickleBuffer(m) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_basics(self): + pb = PickleBuffer(b"foo") + self.assertEqual(b"foo", bytes(pb)) + with memoryview(pb) as m: + self.assertTrue(m.readonly) + + pb = PickleBuffer(bytearray(b"foo")) + self.assertEqual(b"foo", bytes(pb)) + with memoryview(pb) as m: + self.assertFalse(m.readonly) + m[0] = 48 + self.assertEqual(b"0oo", bytes(pb)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_release(self): + pb = PickleBuffer(b"foo") + pb.release() + with self.assertRaises(ValueError) as raises: + memoryview(pb) + self.assertIn("operation forbidden on released PickleBuffer object", + str(raises.exception)) + # Idempotency + pb.release() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_cycle(self): + b = B(b"foo") + pb = PickleBuffer(b) + b.cycle = pb + wpb = weakref.ref(pb) + del b, pb + gc.collect() + self.assertIsNone(wpb()) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_ndarray_2d(self): + # C-contiguous + ndarray = import_helper.import_module("_testbuffer").ndarray + arr = ndarray(list(range(12)), shape=(4, 3), format=', opcode b'\\xff' unknown"): + next(it) + + +class DisTests(unittest.TestCase): + maxDiff = None + + def check_dis(self, data, expected, **kwargs): + out = io.StringIO() + pickletools.dis(data, out=out, **kwargs) + self.assertEqual(out.getvalue(), expected) + + def check_dis_error(self, data, expected, expected_error, **kwargs): + out = io.StringIO() + with self.assertRaisesRegex(ValueError, expected_error): + pickletools.dis(data, out=out, **kwargs) + self.assertEqual(out.getvalue(), expected) + + def test_mark(self): + self.check_dis(b'(N(tl.', '''\ + 0: ( MARK + 1: N NONE + 2: ( MARK + 3: t TUPLE (MARK at 2) + 4: l LIST (MARK at 0) + 5: . STOP +highest protocol among opcodes = 0 +''') + + def test_indentlevel(self): + self.check_dis(b'(N(tl.', '''\ + 0: ( MARK + 1: N NONE + 2: ( MARK + 3: t TUPLE (MARK at 2) + 4: l LIST (MARK at 0) + 5: . STOP +highest protocol among opcodes = 0 +''', indentlevel=2) + + def test_mark_without_pos(self): + self.check_dis(SimpleReader(b'(N(tl.'), '''\ +( MARK +N NONE +( MARK +t TUPLE (MARK at unknown opcode offset) +l LIST (MARK at unknown opcode offset) +. STOP +highest protocol among opcodes = 0 +''') + + def test_no_mark(self): + self.check_dis_error(b'Nt.', '''\ + 0: N NONE + 1: t TUPLE no MARK exists on stack +''', 'no MARK exists on stack') + + def test_put(self): + self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\ + 0: N NONE + 1: p PUT 0 + 4: q BINPUT 1 + 6: r LONG_BINPUT 2 + 11: \\x94 MEMOIZE (as 3) + 12: . STOP +highest protocol among opcodes = 4 +''') + + def test_put_redefined(self): + self.check_dis_error(b'Np1\np1\n.', '''\ + 0: N NONE + 1: p PUT 1 + 4: p PUT 1 +''', 'memo key 1 already defined') + self.check_dis_error(b'Np1\nq\x01.', '''\ + 0: N NONE + 1: p PUT 1 + 4: q BINPUT 1 +''', 'memo key 1 already defined') + self.check_dis_error(b'Np1\nr\x01\x00\x00\x00.', '''\ + 0: N NONE + 1: p PUT 1 + 4: r LONG_BINPUT 1 +''', 'memo key 1 already defined') + self.check_dis_error(b'Np1\n\x94.', '''\ + 0: N NONE + 1: p PUT 1 + 4: \\x94 MEMOIZE (as 1) +''', 'memo key None already defined') + + def test_put_empty_stack(self): + self.check_dis_error(b'p0\n', '''\ + 0: p PUT 0 +''', "stack is empty -- can't store into memo") + + def test_put_markobject(self): + self.check_dis_error(b'(p0\n', '''\ + 0: ( MARK + 1: p PUT 0 +''', "can't store markobject in the memo") + + def test_get(self): + self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\ + 0: ( MARK + 1: N NONE + 2: p PUT 1 + 5: g GET 1 + 8: h BINGET 1 + 10: j LONG_BINGET 1 + 15: t TUPLE (MARK at 0) + 16: . STOP +highest protocol among opcodes = 1 +''') + + def test_get_without_put(self): + self.check_dis_error(b'g1\n.', '''\ + 0: g GET 1 +''', 'memo key 1 has never been stored into') + self.check_dis_error(b'h\x01.', '''\ + 0: h BINGET 1 +''', 'memo key 1 has never been stored into') + self.check_dis_error(b'j\x01\x00\x00\x00.', '''\ + 0: j LONG_BINGET 1 +''', 'memo key 1 has never been stored into') + + def test_memo(self): + memo = {} + self.check_dis(b'Np1\n.', '''\ + 0: N NONE + 1: p PUT 1 + 4: . STOP +highest protocol among opcodes = 0 +''', memo=memo) + self.check_dis(b'g1\n.', '''\ + 0: g GET 1 + 3: . STOP +highest protocol among opcodes = 0 +''', memo=memo) + + def test_mark_pop(self): + self.check_dis(b'(N00N.', '''\ + 0: ( MARK + 1: N NONE + 2: 0 POP + 3: 0 POP (MARK at 0) + 4: N NONE + 5: . STOP +highest protocol among opcodes = 0 +''') + + def test_too_small_stack(self): + self.check_dis_error(b'a', '''\ + 0: a APPEND +''', 'tries to pop 2 items from stack with only 0 items') + self.check_dis_error(b']a', '''\ + 0: ] EMPTY_LIST + 1: a APPEND +''', 'tries to pop 2 items from stack with only 1 items') + + def test_no_stop(self): + self.check_dis_error(b'N', '''\ + 0: N NONE +''', 'pickle exhausted before seeing STOP') + + def test_truncated_data(self): + self.check_dis_error(b'NI123', '''\ + 0: N NONE +''', 'no newline found when trying to read stringnl') + self.check_dis_error(b'NJ\x12\x34', '''\ + 0: N NONE +''', 'not enough data in stream to read int4') + + def test_unknown_opcode(self): + self.check_dis_error(b'N\xff', '''\ + 0: N NONE +''', r"at position 1, opcode b'\\xff' unknown") + + def test_stop_not_empty_stack(self): + self.check_dis_error(b']N.', '''\ + 0: ] EMPTY_LIST + 1: N NONE + 2: . STOP +highest protocol among opcodes = 1 +''', r'stack not empty after STOP: \[list\]') + + def test_annotate(self): + self.check_dis(b'(Nt.', '''\ + 0: ( MARK Push markobject onto the stack. + 1: N NONE Push None on the stack. + 2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject. + 3: . STOP Stop the unpickling machine. +highest protocol among opcodes = 0 +''', annotate=1) + self.check_dis(b'(Nt.', '''\ + 0: ( MARK Push markobject onto the stack. + 1: N NONE Push None on the stack. + 2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject. + 3: . STOP Stop the unpickling machine. +highest protocol among opcodes = 0 +''', annotate=20) + self.check_dis(b'(((((((ttttttt.', '''\ + 0: ( MARK Push markobject onto the stack. + 1: ( MARK Push markobject onto the stack. + 2: ( MARK Push markobject onto the stack. + 3: ( MARK Push markobject onto the stack. + 4: ( MARK Push markobject onto the stack. + 5: ( MARK Push markobject onto the stack. + 6: ( MARK Push markobject onto the stack. + 7: t TUPLE (MARK at 6) Build a tuple out of the topmost stack slice, after markobject. + 8: t TUPLE (MARK at 5) Build a tuple out of the topmost stack slice, after markobject. + 9: t TUPLE (MARK at 4) Build a tuple out of the topmost stack slice, after markobject. + 10: t TUPLE (MARK at 3) Build a tuple out of the topmost stack slice, after markobject. + 11: t TUPLE (MARK at 2) Build a tuple out of the topmost stack slice, after markobject. + 12: t TUPLE (MARK at 1) Build a tuple out of the topmost stack slice, after markobject. + 13: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject. + 14: . STOP Stop the unpickling machine. +highest protocol among opcodes = 0 +''', annotate=20) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_string(self): + self.check_dis(b"S'abc'\n.", '''\ + 0: S STRING 'abc' + 7: . STOP +highest protocol among opcodes = 0 +''') + self.check_dis(b'S"abc"\n.', '''\ + 0: S STRING 'abc' + 7: . STOP +highest protocol among opcodes = 0 +''') + self.check_dis(b"S'\xc3\xb5'\n.", '''\ + 0: S STRING '\\xc3\\xb5' + 6: . STOP +highest protocol among opcodes = 0 +''') + + def test_string_without_quotes(self): + self.check_dis_error(b"Sabc'\n.", '', + 'no string quotes around b"abc\'"') + self.check_dis_error(b'Sabc"\n.', '', + "no string quotes around b'abc\"'") + self.check_dis_error(b"S'abc\n.", '', + '''strinq quote b"'" not found at both ends of b"'abc"''') + self.check_dis_error(b'S"abc\n.', '', + r"""strinq quote b'"' not found at both ends of b'"abc'""") + self.check_dis_error(b"S'abc\"\n.", '', + r"""strinq quote b"'" not found at both ends of b'\\'abc"'""") + self.check_dis_error(b"S\"abc'\n.", '', + r"""strinq quote b'"' not found at both ends of b'"abc\\''""") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_binstring(self): + self.check_dis(b"T\x03\x00\x00\x00abc.", '''\ + 0: T BINSTRING 'abc' + 8: . STOP +highest protocol among opcodes = 1 +''') + self.check_dis(b"T\x02\x00\x00\x00\xc3\xb5.", '''\ + 0: T BINSTRING '\\xc3\\xb5' + 7: . STOP +highest protocol among opcodes = 1 +''') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_short_binstring(self): + self.check_dis(b"U\x03abc.", '''\ + 0: U SHORT_BINSTRING 'abc' + 5: . STOP +highest protocol among opcodes = 1 +''') + self.check_dis(b"U\x02\xc3\xb5.", '''\ + 0: U SHORT_BINSTRING '\\xc3\\xb5' + 4: . STOP +highest protocol among opcodes = 1 +''') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_global(self): + self.check_dis(b"cmodule\nname\n.", '''\ + 0: c GLOBAL 'module name' + 13: . STOP +highest protocol among opcodes = 0 +''') + self.check_dis(b"cm\xc3\xb6dule\nn\xc3\xa4me\n.", '''\ + 0: c GLOBAL 'm\xf6dule n\xe4me' + 15: . STOP +highest protocol among opcodes = 0 +''') + + def test_inst(self): + self.check_dis(b"(imodule\nname\n.", '''\ + 0: ( MARK + 1: i INST 'module name' (MARK at 0) + 14: . STOP +highest protocol among opcodes = 0 +''') + + def test_persid(self): + self.check_dis(b"Pabc\n.", '''\ + 0: P PERSID 'abc' + 5: . STOP +highest protocol among opcodes = 0 +''') class MiscTestCase(unittest.TestCase): def test__all__(self): diff --git a/Lib/test/test_userlist.py b/Lib/test/test_userlist.py index 1ed67dac80..312702c8e3 100644 --- a/Lib/test/test_userlist.py +++ b/Lib/test/test_userlist.py @@ -3,6 +3,8 @@ from collections import UserList from test import list_tests import unittest +from test import support + class UserListTest(list_tests.CommonTest): type2test = UserList @@ -65,5 +67,11 @@ def test_userlist_copy(self): self.assertEqual(u, v) self.assertEqual(type(u), type(v)) + # Decorate existing test with recursion limit, because + # the test is for C structure, but `UserList` is a Python structure. + test_repr_deep = support.infinite_recursion(25)( + list_tests.CommonTest.test_repr_deep, + ) + if __name__ == "__main__": unittest.main() diff --git a/vm/src/stdlib/time.rs b/vm/src/stdlib/time.rs index 0de8648c12..85e8f4569c 100644 --- a/vm/src/stdlib/time.rs +++ b/vm/src/stdlib/time.rs @@ -121,7 +121,6 @@ mod decl { Ok(()) } - #[cfg(not(target_os = "wasi"))] #[pyfunction] fn time_ns(vm: &VirtualMachine) -> PyResult { Ok(duration_since_system_now(vm)?.as_nanos() as u64)