Gestionnaire de contexte qui ne s'exécute pas

Challenge du matin :

with DoNotRun():
    print("Is never printed")

Oui je sais que if False: existe, mais j’aimerai vraiment un with pas un if ici.

Le mieux que j’ai réussi à faire, et je ne suis pas fier :

class DoNotRunException(Exception):
    pass


class DoNotRun:
    def __enter__(self):
        raise DoNotRunException

    def __exit__(self, exc_type, exc_value, traceback):
        pass


with suppress(DoNotRunException), DoNotRun():
    print("This won't run.")

C’est proche, mais c’est moche.

C’est marrant, j’ai eu cette discussion à plusieurs occasions lors de la PyCon.

Pour le moment ma meilleure solution consiste à combiner deux gestionnaire de contexte :

class Catch:
    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, exc_tb):
        print('Catch.__exit__')
        return True


class Raise:
    def __enter__(self):
        raise ValueError

    def __exit__(self, exc_type, exc_value, exc_tb):
        print('Raise.__exit__')


with Catch(), Raise():
    print('Hello')

Mais je cherche encore à réduire en un seul.
J’ai cette solution de contournement qui n’est pas encore 100% satisfaisante :

class FakeIterable:
    def __iter__(self):
        raise ValueError


class Catch:
    def __enter__(self):
        return FakeIterable()

    def __exit__(self, exc_type, exc_value, exc_tb):
        if exc_type is ValueError:
            return True


with Catch() as (a,):
    print(a)

Oh oh oh amusant ton contournement !

En me documentant sur le sujet je suis aussi tombé sur contextlib.nested : 28.7. contextlib — Utilities for with-statement contexts — Documentation Python 2.7.18

Et on voit que même là c’est un cas qui n’était pas géré, d’ailleurs une des raisons de sa dépréciation puis suppression : cpython/Lib/contextlib.py at 2.7 · python/cpython · GitHub

1 « J'aime »

Tu t’ennuies ? :smiley:

Et pour revenir sur le sujet, mon but à la fin c’est de définir quelque chose comme :

def func(block):
    print('Before block')
    block(3, 5)
    print('After block')

with BlockCall(func) as (x, y):
    print(x + y)

qui afficherait

Before block
8
After block

Pourquoi faire ça plutôt que d’utiliser des fonctions/lambdas ? Pour voir si c’est possible !
Si certain·e·s connaissent, c’est pour reproduire le mécanisme des blocs en Ruby.

Et donc pour ça j’ai besoin d’un bloc qui ne s’exécute pas (et dont j’enregistrerais l’AST pour reconstruire une fonction derrière).

On voit que là, comme j’ai un as (a, b) je peux utiliser mon astuce sur le __iter__ mais ça ne fonctionnerait pas si le bloc n’a pas besoin d’arguments.

Avec 6 charactères de plus, ça marche :

class BlockCall:
    def __init__(self, func):
        self.func = func

    def __enter__(self):
        capture = []

        def block(*args):
            capture.extend(args)

        self.iterator = self.func(block)
        self.iterator.send(None)

        return capture

    def __exit__(self, exc_type, exc_value, traceback):
        try:
            self.iterator.send(None)
        except StopIteration:
            pass
        else:
            raise RunTimeError(f"{self.func} should have a single yield")


def func(block):
    print("Before block")
    yield block(3, 5)
    print("After block")


with BlockCall(func) as (x, y):
    print(x + y)

Ah oui en effet, c’est moi qui n’ai pas assez spécifié le problème.
L’idée c’est qu’un bloc soit réutilisable (donc appelable zéro ou plusieurs fois), de façon à avoir quelque chose comme :

>>> def range_each(start, stop, block):
...     for i in range(start, stop):
...         block(i)
...
... with Block.call(range_each, 0, 10) as (i,):
...     print(i, '!')
...
... with Block.call(range_each, 0, 0) as (i,):
...     print(i, '?')
...
0 !
1 !
2 !
3 !
4 !
5 !
6 !
7 !
8 !
9 !

Actuellement j’arrive donc à m’en sortir avec le code bien sale suivant (sur lequel il faut encore que je travaille, notamment pour être compatible avec l’interpréteur interactif) :

À vos risques et périls
import ast
import inspect
import itertools
import textwrap
from functools import partial


class _Stop(Exception):
    pass


class _BlockArguments:
    def __iter__(self):
        raise _Stop


class _Block:
    def __init__(self, exit_callable=None):
        self.exit_callable = exit_callable

    def __enter__(self):
        caller_frame_info = inspect.stack()[1]

        for module_frame_info in inspect.stack()[1:]:
            if module_frame_info.function == '<module>':
                break

        source = inspect.getsource(module_frame_info.frame)
        source_lines = source.splitlines()

        first_line = caller_frame_info.lineno
        block_lines = []

        (block_def,) = ast.parse(source_lines[first_line - 1].strip() + ' pass').body
        assert isinstance(block_def, ast.With)
        (block_item,) = block_def.items
        block_args = block_item.optional_vars
        assert isinstance(block_args, ast.Tuple)
        self.block_arg_names = [name.id for name in block_args.elts]

        for i in itertools.count(first_line):
            try:
                line = source_lines[i]
            except IndexError:
                break
            code = textwrap.dedent('\n'.join(block_lines + [line]))
            if code.startswith(' '):
                break
            block_lines.append(line)

        block_source = textwrap.dedent('\n'.join(block_lines))

        self._code = compile(block_source, '<block>', 'exec')

        return _BlockArguments()

    def __exit__(self, exc_type, exc_val, exc_tb):
        ret = exc_type and issubclass(exc_type, _Stop)
        if ret and self.exit_callable is not None:
            self.exit_callable(self)
        return ret

    def __call__(self, *args, **kwargs):
        loc = locals() | kwargs | dict(zip(self.block_arg_names, args))
        exec(self._code, locals=loc)


class Block(_Block):
    def __init__(self):
        super().__init__()

    @staticmethod
    def call(func, /, *args, **kwargs):
        return _Block(partial(func, *args, **kwargs))


def range_each(start, stop, block):
    for i in range(start, stop):
        block(i)


with Block.call(range_each, 0, 10) as (i,):
    print(i, '!')


with Block.call(range_each, 0, 0) as (i,):
    print(i, '?')


with (block1 := Block()) as ():
    print('block1 - foo')
    print('block1 - bar')


def get_block():
    with (block := Block()) as ():
        print('block2 - foo')
        print('block2 - bar')
    return block


block2 = get_block()

print('tests')
block2()
block1()
block2()

Solution non-standard et qui ne fonctionne que si vous avez de la chance :smiley:

import math
import signal

EPS = math.nextafter(0, 1)


class _Stop(Exception):
    pass


class Context:
    @staticmethod
    def signal_handler(sig, frame):
        if frame.f_code.co_name == '__enter__':
            signal.setitimer(signal.ITIMER_REAL, EPS)
        else:
            raise _Stop

    def __enter__(self):
        signal.signal(signal.SIGALRM, self.signal_handler)
        signal.setitimer(signal.ITIMER_REAL, EPS)

    def __exit__(self, exc_type, exc_value, exc_tb):
        if exc_type is _Stop:
            return True


print('Before')

with Context():
    print('Inside')

print('After')

Je suis un grand fan :)))

Allez une autre solution pour la route qui a l’air de plutôt bien fonctionner (mais il ne faut pas vouloir utiliser de débogueur en même temps)

class Context:
    def __enter__(self):
        import io
        import pdb
        stdin = io.StringIO('r\nn\nq')
        stdout = io.StringIO()
        pdb.Pdb(stdin=stdin, stdout=stdout).set_trace()

    def __exit__(self, exc_type, exc_value, exc_tb):
        import bdb
        if exc_type is bdb.BdbQuit:
            return True


print('Before')

with Context():
    print('Inside')

print('After')

À creuser du côté de pdb pour voir s’il y a des choses plus bas-niveau qui peuvent être appelées pour faire du pdb sans être pdb (peut-être du côté de sys.settrace) :grin:

1 « J'aime »

Bien vu @entwanne, ça marche avec sys.settrace :

import inspect
import sys


class _Stop(Exception): ...


class Context:
    def __init__(self):
        self.frame = None

    def tracer(self, frame, event, arg):
        raise _Stop

    def __enter__(self):
        self.frame = inspect.currentframe().f_back
        self.frame.f_trace = self.tracer
        self.frame.f_trace_lines = True
        sys.settrace(self.tracer)

    def __exit__(self, exc_type, exc_value, exc_tb):
        sys.settrace(None)
        self.frame.f_trace = None
        return exc_type is _Stop


print("Before")

with Context():
    print("Inside")

print("After")

(Attention j’ai changé l’ordre des lignes dans le __enter__, c’est un poil sensible, une des versions ne marchait pas.)

Bien vu !

J'arrivais plus ou moins à la même chose avec `sys.setprofile`
import sys


class Context:
    class _Stop(Exception):
        pass

    def __enter__(self):
        self.old_profile = sys.getprofile()
        self.first_call = True
        sys.setprofile(self.profile)

    def __exit__(self, exc_type, exc_value, exc_tb):
        sys.setprofile(self.old_profile)
        return exc_type is self._Stop

    def profile(self, frame, event, arg):
        if self.first_call:
            assert frame.f_code.co_name == '__enter__'
            self.first_call = False
        else:
            raise self._Stop

print('Before')

with Context():
    print('Inside')

print('After')

Mais je me suis rendu compte que ça ne marche que si le contenu du bloc fait des appels.
avec un with Context(): pass ou with Context(): a = 5 ça ne fonctionne pas.
Il y a le même souci avec la solution pdb d’ailleurs.

Tu connaîtrais pas un hook qui pourrait être appelé à chaque instruction du bytecode ?

Si :

frame = inspect.currentframe().f_back
frame.f_trace = tracer
frame.f_trace_opcodes = True
sys.settrace(tracer)

Et donc, BOOM, je peux faire mon gestionnaire de contexte qui fork pour exécuter son contexte :

import inspect
import os
import sys
import traceback
from contextlib import contextmanager, suppress
from selectors import EVENT_READ, PollSelector


class _InTheParent(Exception):
    pass


class Forking:
    def __init__(self) -> None:
        self.frame = None
        self.stdout = b""
        self.stderr = b""

    def tracer(self, frame, event, arg):
        raise _InTheParent

    def __enter__(self):
        self.rout, wout = os.pipe()
        self.rerr, werr = os.pipe()
        self.pid = os.fork()
        if self.pid:
            os.close(wout)
            os.close(werr)
            self.frame = inspect.currentframe().f_back
            self.frame.f_trace = self.tracer
            self.frame.f_trace_lines = True
            sys.settrace(self.tracer)
            return self
        else:
            os.dup2(wout, 1)
            os.dup2(werr, 2)
            os.close(wout)
            os.close(werr)
            return self

    def __exit__(self, exc_type, exc_value, tb):
        if self.pid == 0:
            if exc_type is not None:
                traceback.print_exception(None, value=exc_value, tb=tb)
            sys.stdout.flush()
            sys.stderr.flush()
            os._exit(0)
        if exc_type is not _InTheParent:
            return False
        with PollSelector() as selector:
            selector.register(self.rout, EVENT_READ)
            selector.register(self.rerr, EVENT_READ)
            while selector.get_map():
                ready = selector.select(1)
                for key, events in ready:
                    data = os.read(key.fd, 32768)
                    if not data:
                        selector.unregister(key.fd)
                        os.close(key.fd)
                    if key.fd == self.rout:
                        self.stdout += data
                    if key.fd == self.rerr:
                        self.stderr += data
        _pid, _exit_status = os.wait()
        return True


print("Parent PID", os.getpid())
with Forking() as child:
    print("Child PID", os.getpid())
print("At the end", os.getpid())


print("Child printed on stdout:", child.stdout.decode())
print("Child printed on stderr:", child.stderr.decode())

Haha, c’est amusant :

x = 1
with Forking():
    x = 2
assert x == 1

L’affectation x = 2 se fait dans le processus fils.

Je dirais même plus :

with Forking():
    os._exit()
print("I survived!")

@Melcore ^

1 « J'aime »

Ah oui pardon, je n’avais pas testé assez loin.
Il n’y a donc que la with Context(): pass qui échoue, mais d’un côté qui voudrait zapper le contenu d’un gestionnaire de contexte vide ? :joy:

Ah oui pass ça le fait échouer. ... aussi. 1 aussi. 1 == 1 ça marche.

En jouant un peu avec les frames, j’arrive à ça : à mon avis c’est pas fail-proof…

import sys
import inspect

class _Stop(Exception):
    pass

class ignore:
    def __enter__(self):
        self._frame = inspect.currentframe().f_back

        def tracer(frame, event, arg):
            if frame is self._frame and event == "opcode":
                raise _Stop

        self._frame.f_trace = tracer
        self._frame.f_trace_opcodes = True
        sys.settrace(tracer)

    def __exit__(self, exc_type, exc, tb):
        return exc_type is _Stop

x = 1
print(f"Before ignore(): {x}")
with ignore():
    x += 1
    print("Error!")
print(f"After ignore(): {x}")

Au début ça marchait pas avec les one-liner with ignore(): print("...") car il attendait un event de type line et il avait un opcode, du coup en ajoutant f_trace_opcodes = True je n’ai que des opcode en event mais je pense pas que ça marche avec tout…