Challenge du matin :
with DoNotRun():
print("Is never printed")
Oui je sais que if False: existe, mais j’aimerai vraiment un with pas un if ici.
Challenge du matin :
with DoNotRun():
print("Is never printed")
Oui je sais que if False: existe, mais j’aimerai vraiment un with pas un if ici.
Le mieux que j’ai réussi à faire, et je ne suis pas fier :
class DoNotRunException(Exception):
pass
class DoNotRun:
def __enter__(self):
raise DoNotRunException
def __exit__(self, exc_type, exc_value, traceback):
pass
with suppress(DoNotRunException), DoNotRun():
print("This won't run.")
C’est proche, mais c’est moche.
C’est marrant, j’ai eu cette discussion à plusieurs occasions lors de la PyCon.
Pour le moment ma meilleure solution consiste à combiner deux gestionnaire de contexte :
class Catch:
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, exc_tb):
print('Catch.__exit__')
return True
class Raise:
def __enter__(self):
raise ValueError
def __exit__(self, exc_type, exc_value, exc_tb):
print('Raise.__exit__')
with Catch(), Raise():
print('Hello')
Mais je cherche encore à réduire en un seul.
J’ai cette solution de contournement qui n’est pas encore 100% satisfaisante :
class FakeIterable:
def __iter__(self):
raise ValueError
class Catch:
def __enter__(self):
return FakeIterable()
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type is ValueError:
return True
with Catch() as (a,):
print(a)
Oh oh oh amusant ton contournement !
En me documentant sur le sujet je suis aussi tombé sur contextlib.nested : 28.7. contextlib — Utilities for with-statement contexts — Documentation Python 2.7.18
Et on voit que même là c’est un cas qui n’était pas géré, d’ailleurs une des raisons de sa dépréciation puis suppression : cpython/Lib/contextlib.py at 2.7 · python/cpython · GitHub
Tu t’ennuies ? ![]()
Et pour revenir sur le sujet, mon but à la fin c’est de définir quelque chose comme :
def func(block):
print('Before block')
block(3, 5)
print('After block')
with BlockCall(func) as (x, y):
print(x + y)
qui afficherait
Before block
8
After block
Pourquoi faire ça plutôt que d’utiliser des fonctions/lambdas ? Pour voir si c’est possible !
Si certain·e·s connaissent, c’est pour reproduire le mécanisme des blocs en Ruby.
Et donc pour ça j’ai besoin d’un bloc qui ne s’exécute pas (et dont j’enregistrerais l’AST pour reconstruire une fonction derrière).
On voit que là, comme j’ai un as (a, b) je peux utiliser mon astuce sur le __iter__ mais ça ne fonctionnerait pas si le bloc n’a pas besoin d’arguments.
Avec 6 charactères de plus, ça marche :
class BlockCall:
def __init__(self, func):
self.func = func
def __enter__(self):
capture = []
def block(*args):
capture.extend(args)
self.iterator = self.func(block)
self.iterator.send(None)
return capture
def __exit__(self, exc_type, exc_value, traceback):
try:
self.iterator.send(None)
except StopIteration:
pass
else:
raise RunTimeError(f"{self.func} should have a single yield")
def func(block):
print("Before block")
yield block(3, 5)
print("After block")
with BlockCall(func) as (x, y):
print(x + y)
Ah oui en effet, c’est moi qui n’ai pas assez spécifié le problème.
L’idée c’est qu’un bloc soit réutilisable (donc appelable zéro ou plusieurs fois), de façon à avoir quelque chose comme :
>>> def range_each(start, stop, block):
... for i in range(start, stop):
... block(i)
...
... with Block.call(range_each, 0, 10) as (i,):
... print(i, '!')
...
... with Block.call(range_each, 0, 0) as (i,):
... print(i, '?')
...
0 !
1 !
2 !
3 !
4 !
5 !
6 !
7 !
8 !
9 !
Actuellement j’arrive donc à m’en sortir avec le code bien sale suivant (sur lequel il faut encore que je travaille, notamment pour être compatible avec l’interpréteur interactif) :
import ast
import inspect
import itertools
import textwrap
from functools import partial
class _Stop(Exception):
pass
class _BlockArguments:
def __iter__(self):
raise _Stop
class _Block:
def __init__(self, exit_callable=None):
self.exit_callable = exit_callable
def __enter__(self):
caller_frame_info = inspect.stack()[1]
for module_frame_info in inspect.stack()[1:]:
if module_frame_info.function == '<module>':
break
source = inspect.getsource(module_frame_info.frame)
source_lines = source.splitlines()
first_line = caller_frame_info.lineno
block_lines = []
(block_def,) = ast.parse(source_lines[first_line - 1].strip() + ' pass').body
assert isinstance(block_def, ast.With)
(block_item,) = block_def.items
block_args = block_item.optional_vars
assert isinstance(block_args, ast.Tuple)
self.block_arg_names = [name.id for name in block_args.elts]
for i in itertools.count(first_line):
try:
line = source_lines[i]
except IndexError:
break
code = textwrap.dedent('\n'.join(block_lines + [line]))
if code.startswith(' '):
break
block_lines.append(line)
block_source = textwrap.dedent('\n'.join(block_lines))
self._code = compile(block_source, '<block>', 'exec')
return _BlockArguments()
def __exit__(self, exc_type, exc_val, exc_tb):
ret = exc_type and issubclass(exc_type, _Stop)
if ret and self.exit_callable is not None:
self.exit_callable(self)
return ret
def __call__(self, *args, **kwargs):
loc = locals() | kwargs | dict(zip(self.block_arg_names, args))
exec(self._code, locals=loc)
class Block(_Block):
def __init__(self):
super().__init__()
@staticmethod
def call(func, /, *args, **kwargs):
return _Block(partial(func, *args, **kwargs))
def range_each(start, stop, block):
for i in range(start, stop):
block(i)
with Block.call(range_each, 0, 10) as (i,):
print(i, '!')
with Block.call(range_each, 0, 0) as (i,):
print(i, '?')
with (block1 := Block()) as ():
print('block1 - foo')
print('block1 - bar')
def get_block():
with (block := Block()) as ():
print('block2 - foo')
print('block2 - bar')
return block
block2 = get_block()
print('tests')
block2()
block1()
block2()
Solution non-standard et qui ne fonctionne que si vous avez de la chance ![]()
import math
import signal
EPS = math.nextafter(0, 1)
class _Stop(Exception):
pass
class Context:
@staticmethod
def signal_handler(sig, frame):
if frame.f_code.co_name == '__enter__':
signal.setitimer(signal.ITIMER_REAL, EPS)
else:
raise _Stop
def __enter__(self):
signal.signal(signal.SIGALRM, self.signal_handler)
signal.setitimer(signal.ITIMER_REAL, EPS)
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type is _Stop:
return True
print('Before')
with Context():
print('Inside')
print('After')
Je suis un grand fan :)))
Allez une autre solution pour la route qui a l’air de plutôt bien fonctionner (mais il ne faut pas vouloir utiliser de débogueur en même temps)
class Context:
def __enter__(self):
import io
import pdb
stdin = io.StringIO('r\nn\nq')
stdout = io.StringIO()
pdb.Pdb(stdin=stdin, stdout=stdout).set_trace()
def __exit__(self, exc_type, exc_value, exc_tb):
import bdb
if exc_type is bdb.BdbQuit:
return True
print('Before')
with Context():
print('Inside')
print('After')
À creuser du côté de pdb pour voir s’il y a des choses plus bas-niveau qui peuvent être appelées pour faire du pdb sans être pdb (peut-être du côté de sys.settrace) ![]()
Bien vu @entwanne, ça marche avec sys.settrace :
import inspect
import sys
class _Stop(Exception): ...
class Context:
def __init__(self):
self.frame = None
def tracer(self, frame, event, arg):
raise _Stop
def __enter__(self):
self.frame = inspect.currentframe().f_back
self.frame.f_trace = self.tracer
self.frame.f_trace_lines = True
sys.settrace(self.tracer)
def __exit__(self, exc_type, exc_value, exc_tb):
sys.settrace(None)
self.frame.f_trace = None
return exc_type is _Stop
print("Before")
with Context():
print("Inside")
print("After")
(Attention j’ai changé l’ordre des lignes dans le __enter__, c’est un poil sensible, une des versions ne marchait pas.)
Bien vu !
import sys
class Context:
class _Stop(Exception):
pass
def __enter__(self):
self.old_profile = sys.getprofile()
self.first_call = True
sys.setprofile(self.profile)
def __exit__(self, exc_type, exc_value, exc_tb):
sys.setprofile(self.old_profile)
return exc_type is self._Stop
def profile(self, frame, event, arg):
if self.first_call:
assert frame.f_code.co_name == '__enter__'
self.first_call = False
else:
raise self._Stop
print('Before')
with Context():
print('Inside')
print('After')
Mais je me suis rendu compte que ça ne marche que si le contenu du bloc fait des appels.
avec un with Context(): pass ou with Context(): a = 5 ça ne fonctionne pas.
Il y a le même souci avec la solution pdb d’ailleurs.
Tu connaîtrais pas un hook qui pourrait être appelé à chaque instruction du bytecode ?
Si :
frame = inspect.currentframe().f_back
frame.f_trace = tracer
frame.f_trace_opcodes = True
sys.settrace(tracer)
Et donc, BOOM, je peux faire mon gestionnaire de contexte qui fork pour exécuter son contexte :
import inspect
import os
import sys
import traceback
from contextlib import contextmanager, suppress
from selectors import EVENT_READ, PollSelector
class _InTheParent(Exception):
pass
class Forking:
def __init__(self) -> None:
self.frame = None
self.stdout = b""
self.stderr = b""
def tracer(self, frame, event, arg):
raise _InTheParent
def __enter__(self):
self.rout, wout = os.pipe()
self.rerr, werr = os.pipe()
self.pid = os.fork()
if self.pid:
os.close(wout)
os.close(werr)
self.frame = inspect.currentframe().f_back
self.frame.f_trace = self.tracer
self.frame.f_trace_lines = True
sys.settrace(self.tracer)
return self
else:
os.dup2(wout, 1)
os.dup2(werr, 2)
os.close(wout)
os.close(werr)
return self
def __exit__(self, exc_type, exc_value, tb):
if self.pid == 0:
if exc_type is not None:
traceback.print_exception(None, value=exc_value, tb=tb)
sys.stdout.flush()
sys.stderr.flush()
os._exit(0)
if exc_type is not _InTheParent:
return False
with PollSelector() as selector:
selector.register(self.rout, EVENT_READ)
selector.register(self.rerr, EVENT_READ)
while selector.get_map():
ready = selector.select(1)
for key, events in ready:
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fd)
os.close(key.fd)
if key.fd == self.rout:
self.stdout += data
if key.fd == self.rerr:
self.stderr += data
_pid, _exit_status = os.wait()
return True
print("Parent PID", os.getpid())
with Forking() as child:
print("Child PID", os.getpid())
print("At the end", os.getpid())
print("Child printed on stdout:", child.stdout.decode())
print("Child printed on stderr:", child.stderr.decode())
Haha, c’est amusant :
x = 1
with Forking():
x = 2
assert x == 1
L’affectation x = 2 se fait dans le processus fils.
Je dirais même plus :
with Forking():
os._exit()
print("I survived!")
@Melcore ^
Ah oui pardon, je n’avais pas testé assez loin.
Il n’y a donc que la with Context(): pass qui échoue, mais d’un côté qui voudrait zapper le contenu d’un gestionnaire de contexte vide ? ![]()
Ah oui pass ça le fait échouer. ... aussi. 1 aussi. 1 == 1 ça marche.
En jouant un peu avec les frames, j’arrive à ça : à mon avis c’est pas fail-proof…
import sys
import inspect
class _Stop(Exception):
pass
class ignore:
def __enter__(self):
self._frame = inspect.currentframe().f_back
def tracer(frame, event, arg):
if frame is self._frame and event == "opcode":
raise _Stop
self._frame.f_trace = tracer
self._frame.f_trace_opcodes = True
sys.settrace(tracer)
def __exit__(self, exc_type, exc, tb):
return exc_type is _Stop
x = 1
print(f"Before ignore(): {x}")
with ignore():
x += 1
print("Error!")
print(f"After ignore(): {x}")
Au début ça marchait pas avec les one-liner with ignore(): print("...") car il attendait un event de type line et il avait un opcode, du coup en ajoutant f_trace_opcodes = True je n’ai que des opcode en event mais je pense pas que ça marche avec tout…