PHP 8.5.2
Preview: test_iosim.py Size: 9.58 KB
//lib/python3/dist-packages/twisted/test/test_iosim.py

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.test.iosim}.
"""
from __future__ import annotations

from typing import Literal

from zope.interface import implementer

from twisted.internet.interfaces import IPushProducer
from twisted.internet.protocol import Protocol
from twisted.internet.task import Clock
from twisted.test.iosim import FakeTransport, connect, connectedServerAndClient
from twisted.trial.unittest import TestCase


class FakeTransportTests(TestCase):
    """
    Tests for L{FakeTransport}.
    """

    def test_connectionSerial(self) -> None:
        """
        Each L{FakeTransport} receives a serial number that uniquely identifies
        it.
        """
        a = FakeTransport(object(), True)
        b = FakeTransport(object(), False)
        self.assertIsInstance(a.serial, int)
        self.assertIsInstance(b.serial, int)
        self.assertNotEqual(a.serial, b.serial)

    def test_writeSequence(self) -> None:
        """
        L{FakeTransport.writeSequence} will write a sequence of L{bytes} to the
        transport.
        """
        a = FakeTransport(object(), False)

        a.write(b"a")
        a.writeSequence([b"b", b"c", b"d"])

        self.assertEqual(b"".join(a.stream), b"abcd")

    def test_writeAfterClose(self) -> None:
        """
        L{FakeTransport.write} will accept writes after transport was closed,
        but the data will be silently discarded.
        """
        a = FakeTransport(object(), False)
        a.write(b"before")
        a.loseConnection()
        a.write(b"after")

        self.assertEqual(b"".join(a.stream), b"before")


@implementer(IPushProducer)
class StrictPushProducer:
    """
    An L{IPushProducer} implementation which produces nothing but enforces
    preconditions on its state transition methods.
    """

    _state = "running"

    def stopProducing(self) -> None:
        if self._state == "stopped":
            raise ValueError("Cannot stop already-stopped IPushProducer")
        self._state = "stopped"

    def pauseProducing(self) -> None:
        if self._state != "running":
            raise ValueError(f"Cannot pause {self._state} IPushProducer")
        self._state = "paused"

    def resumeProducing(self) -> None:
        if self._state != "paused":
            raise ValueError(f"Cannot resume {self._state} IPushProducer")
        self._state = "running"


class StrictPushProducerTests(TestCase):
    """
    Tests for L{StrictPushProducer}.
    """

    def _initial(self) -> StrictPushProducer:
        """
        @return: A new L{StrictPushProducer} which has not been through any state
            changes.
        """
        return StrictPushProducer()

    def _stopped(self) -> StrictPushProducer:
        """
        @return: A new, stopped L{StrictPushProducer}.
        """
        producer = StrictPushProducer()
        producer.stopProducing()
        return producer

    def _paused(self) -> StrictPushProducer:
        """
        @return: A new, paused L{StrictPushProducer}.
        """
        producer = StrictPushProducer()
        producer.pauseProducing()
        return producer

    def _resumed(self) -> StrictPushProducer:
        """
        @return: A new L{StrictPushProducer} which has been paused and resumed.
        """
        producer = StrictPushProducer()
        producer.pauseProducing()
        producer.resumeProducing()
        return producer

    def assertStopped(self, producer: StrictPushProducer) -> None:
        """
        Assert that the given producer is in the stopped state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        """
        self.assertEqual(producer._state, "stopped")

    def assertPaused(self, producer: StrictPushProducer) -> None:
        """
        Assert that the given producer is in the paused state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        """
        self.assertEqual(producer._state, "paused")

    def assertRunning(self, producer: StrictPushProducer) -> None:
        """
        Assert that the given producer is in the running state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        """
        self.assertEqual(producer._state, "running")

    def test_stopThenStop(self) -> None:
        """
        L{StrictPushProducer.stopProducing} raises L{ValueError} if called when
        the producer is stopped.
        """
        self.assertRaises(ValueError, self._stopped().stopProducing)

    def test_stopThenPause(self) -> None:
        """
        L{StrictPushProducer.pauseProducing} raises L{ValueError} if called when
        the producer is stopped.
        """
        self.assertRaises(ValueError, self._stopped().pauseProducing)

    def test_stopThenResume(self) -> None:
        """
        L{StrictPushProducer.resumeProducing} raises L{ValueError} if called when
        the producer is stopped.
        """
        self.assertRaises(ValueError, self._stopped().resumeProducing)

    def test_pauseThenStop(self) -> None:
        """
        L{StrictPushProducer} is stopped if C{stopProducing} is called on a paused
        producer.
        """
        producer = self._paused()
        producer.stopProducing()
        self.assertStopped(producer)

    def test_pauseThenPause(self) -> None:
        """
        L{StrictPushProducer.pauseProducing} raises L{ValueError} if called on a
        paused producer.
        """
        producer = self._paused()
        self.assertRaises(ValueError, producer.pauseProducing)

    def test_pauseThenResume(self) -> None:
        """
        L{StrictPushProducer} is resumed if C{resumeProducing} is called on a
        paused producer.
        """
        producer = self._paused()
        producer.resumeProducing()
        self.assertRunning(producer)

    def test_resumeThenStop(self) -> None:
        """
        L{StrictPushProducer} is stopped if C{stopProducing} is called on a
        resumed producer.
        """
        producer = self._resumed()
        producer.stopProducing()
        self.assertStopped(producer)

    def test_resumeThenPause(self) -> None:
        """
        L{StrictPushProducer} is paused if C{pauseProducing} is called on a
        resumed producer.
        """
        producer = self._resumed()
        producer.pauseProducing()
        self.assertPaused(producer)

    def test_resumeThenResume(self) -> None:
        """
        L{StrictPushProducer.resumeProducing} raises L{ValueError} if called on a
        resumed producer.
        """
        producer = self._resumed()
        self.assertRaises(ValueError, producer.resumeProducing)

    def test_stop(self) -> None:
        """
        L{StrictPushProducer} is stopped if C{stopProducing} is called in the
        initial state.
        """
        producer = self._initial()
        producer.stopProducing()
        self.assertStopped(producer)

    def test_pause(self) -> None:
        """
        L{StrictPushProducer} is paused if C{pauseProducing} is called in the
        initial state.
        """
        producer = self._initial()
        producer.pauseProducing()
        self.assertPaused(producer)

    def test_resume(self) -> None:
        """
        L{StrictPushProducer} raises L{ValueError} if C{resumeProducing} is called
        in the initial state.
        """
        producer = self._initial()
        self.assertRaises(ValueError, producer.resumeProducing)


class IOPumpTests(TestCase):
    """
    Tests for L{IOPump}.
    """

    def _testStreamingProducer(self, mode: Literal["server", "client"]) -> None:
        """
        Connect a couple protocol/transport pairs to an L{IOPump} and then pump
        it.  Verify that a streaming producer registered with one of the
        transports does not receive invalid L{IPushProducer} method calls and
        ends in the right state.

        @param mode: C{u"server"} to test a producer registered with the
            server transport.  C{u"client"} to test a producer registered with
            the client transport.
        """
        serverProto = Protocol()
        serverTransport = FakeTransport(serverProto, isServer=True)

        clientProto = Protocol()
        clientTransport = FakeTransport(clientProto, isServer=False)

        pump = connect(
            serverProto,
            serverTransport,
            clientProto,
            clientTransport,
            greet=False,
        )

        producer = StrictPushProducer()
        victim = {
            "server": serverTransport,
            "client": clientTransport,
        }[mode]
        victim.registerProducer(producer, streaming=True)

        pump.pump()
        self.assertEqual("running", producer._state)

    def test_serverStreamingProducer(self) -> None:
        """
        L{IOPump.pump} does not call C{resumeProducing} on a L{IPushProducer}
        (stream producer) registered with the server transport.
        """
        self._testStreamingProducer(mode="server")

    def test_clientStreamingProducer(self) -> None:
        """
        L{IOPump.pump} does not call C{resumeProducing} on a L{IPushProducer}
        (stream producer) registered with the client transport.
        """
        self._testStreamingProducer(mode="client")

    def test_timeAdvances(self) -> None:
        """
        L{IOPump.pump} advances time in the given L{Clock}.
        """
        time_passed = []
        clock = Clock()
        _, _, pump = connectedServerAndClient(Protocol, Protocol, clock=clock)
        clock.callLater(0, lambda: time_passed.append(True))
        self.assertFalse(time_passed)
        pump.pump()
        self.assertTrue(time_passed)

Directory Contents

Dirs: 1 × Files: 104

Name Size Perms Modified Actions
- drwxr-xr-x 2026-01-08 12:56:24
Edit Download
1.46 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
654 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
34.88 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
18.40 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.63 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.25 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
172 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
172 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
925 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
400 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
566 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
123 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
214 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
983 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
233 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
268 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
297 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
178 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
220 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
739 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
787 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
130 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.15 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.34 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
60 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
81 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
48 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
5.23 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.72 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.14 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
2.00 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.06 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.13 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.55 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.45 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
902 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
894 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download
5.06 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
3.66 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
25.47 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
108.04 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
33.34 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
17.75 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.43 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
20.84 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
143.37 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
13.01 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
6.90 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
9.60 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
4.46 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
7.28 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.89 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
4.27 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
126.96 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
2.65 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
3.19 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
6.56 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
45.38 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
9.58 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
13.31 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
15.54 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
36.86 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
17.79 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
13.99 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
2.12 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
24.69 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
17.84 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
6.38 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
73.64 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
12.23 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
14.73 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
26.02 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
32.28 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
4.32 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
86.29 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
7.16 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
3.63 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
7.40 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
23.89 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.65 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.91 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
24.90 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
5.53 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
17.09 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
22.73 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
113.84 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.97 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
12.43 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
5.10 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.67 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
47.73 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
64.26 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
12.73 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
6.47 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
3.26 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
21.64 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
12.90 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
1.69 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
72.29 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
6.13 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
26.79 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
13.26 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
22.76 KB lrw-r--r-- 2024-08-27 10:30:39
Edit Download
475 B lrw-r--r-- 2024-08-27 10:30:39
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).