REDROOM
PHP 8.5.2
Path:
Logout
Edit File
Size: 6.47 KB
Close
//lib/python3/dist-packages/twisted/protocols/haproxy/_v2parser.py
Text
Base64
# -*- test-case-name: twisted.protocols.haproxy.test.test_v2parser -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ IProxyParser implementation for version two of the PROXY protocol. """ import binascii import struct from typing import Callable, Tuple, Type, Union from zope.interface import implementer from constantly import ValueConstant, Values from typing import Literal from twisted.internet import address from twisted.python import compat from . import _info, _interfaces from ._exceptions import ( InvalidNetworkProtocol, InvalidProxyHeader, MissingAddressData, convertError, ) class NetFamily(Values): """ Values for the 'family' field. """ UNSPEC = ValueConstant(0x00) INET = ValueConstant(0x10) INET6 = ValueConstant(0x20) UNIX = ValueConstant(0x30) class NetProtocol(Values): """ Values for 'protocol' field. """ UNSPEC = ValueConstant(0) STREAM = ValueConstant(1) DGRAM = ValueConstant(2) _HIGH = 0b11110000 _LOW = 0b00001111 _LOCALCOMMAND = "LOCAL" _PROXYCOMMAND = "PROXY" @implementer(_interfaces.IProxyParser) class V2Parser: """ PROXY protocol version two header parser. Version two of the PROXY protocol is a binary format. """ PREFIX = b"\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A" VERSIONS = [32] COMMANDS = {0: _LOCALCOMMAND, 1: _PROXYCOMMAND} ADDRESSFORMATS = { # TCP4 17: "!4s4s2H", 18: "!4s4s2H", # TCP6 33: "!16s16s2H", 34: "!16s16s2H", # UNIX 49: "!108s108s", 50: "!108s108s", } def __init__(self) -> None: self.buffer = b"" def feed( self, data: bytes ) -> Union[Tuple[_info.ProxyInfo, bytes], Tuple[None, None]]: """ Consume a chunk of data and attempt to parse it. @param data: A bytestring. @type data: bytes @return: A two-tuple containing, in order, a L{_interfaces.IProxyInfo} and any bytes fed to the parser that followed the end of the header. Both of these values are None until a complete header is parsed. @raises InvalidProxyHeader: If the bytes fed to the parser create an invalid PROXY header. """ self.buffer += data if len(self.buffer) < 16: raise InvalidProxyHeader() size = struct.unpack("!H", self.buffer[14:16])[0] + 16 if len(self.buffer) < size: return (None, None) header, remaining = self.buffer[:size], self.buffer[size:] self.buffer = b"" info = self.parse(header) return (info, remaining) @staticmethod def _bytesToIPv4(bytestring: bytes) -> bytes: """ Convert packed 32-bit IPv4 address bytes into a dotted-quad ASCII bytes representation of that address. @param bytestring: 4 octets representing an IPv4 address. @type bytestring: L{bytes} @return: a dotted-quad notation IPv4 address. @rtype: L{bytes} """ return b".".join( ("%i" % (ord(b),)).encode("ascii") for b in compat.iterbytes(bytestring) ) @staticmethod def _bytesToIPv6(bytestring: bytes) -> bytes: """ Convert packed 128-bit IPv6 address bytes into a colon-separated ASCII bytes representation of that address. @param bytestring: 16 octets representing an IPv6 address. @type bytestring: L{bytes} @return: a dotted-quad notation IPv6 address. @rtype: L{bytes} """ hexString = binascii.b2a_hex(bytestring) return b":".join( (f"{int(hexString[b : b + 4], 16):x}").encode("ascii") for b in range(0, 32, 4) ) @classmethod def parse(cls, line: bytes) -> _info.ProxyInfo: """ Parse a bytestring as a full PROXY protocol header. @param line: A bytestring that represents a valid HAProxy PROXY protocol version 2 header. @type line: bytes @return: A L{_interfaces.IProxyInfo} containing the parsed data. @raises InvalidProxyHeader: If the bytestring does not represent a valid PROXY header. """ prefix = line[:12] addrInfo = None with convertError(IndexError, InvalidProxyHeader): # Use single value slices to ensure bytestring values are returned # instead of int in PY3. versionCommand = ord(line[12:13]) familyProto = ord(line[13:14]) if prefix != cls.PREFIX: raise InvalidProxyHeader() version, command = versionCommand & _HIGH, versionCommand & _LOW if version not in cls.VERSIONS or command not in cls.COMMANDS: raise InvalidProxyHeader() if cls.COMMANDS[command] == _LOCALCOMMAND: return _info.ProxyInfo(line, None, None) family, netproto = familyProto & _HIGH, familyProto & _LOW with convertError(ValueError, InvalidNetworkProtocol): family = NetFamily.lookupByValue(family) netproto = NetProtocol.lookupByValue(netproto) if family is NetFamily.UNSPEC or netproto is NetProtocol.UNSPEC: return _info.ProxyInfo(line, None, None) addressFormat = cls.ADDRESSFORMATS[familyProto] addrInfo = line[16 : 16 + struct.calcsize(addressFormat)] if family is NetFamily.UNIX: with convertError(struct.error, MissingAddressData): source, dest = struct.unpack(addressFormat, addrInfo) return _info.ProxyInfo( line, address.UNIXAddress(source.rstrip(b"\x00")), address.UNIXAddress(dest.rstrip(b"\x00")), ) addrType: Union[Literal["TCP"], Literal["UDP"]] = "TCP" if netproto is NetProtocol.DGRAM: addrType = "UDP" addrCls: Union[ Type[address.IPv4Address], Type[address.IPv6Address] ] = address.IPv4Address addrParser: Callable[[bytes], bytes] = cls._bytesToIPv4 if family is NetFamily.INET6: addrCls = address.IPv6Address addrParser = cls._bytesToIPv6 with convertError(struct.error, MissingAddressData): info = struct.unpack(addressFormat, addrInfo) source, dest, sPort, dPort = info return _info.ProxyInfo( line, addrCls(addrType, addrParser(source).decode(), sPort), addrCls(addrType, addrParser(dest).decode(), dPort), )
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 2 × Files: 8
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
test
DIR
-
drwxr-xr-x
2026-01-08 12:56:22
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__pycache__
DIR
-
drwxr-xr-x
2026-01-08 12:56:23
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_exceptions.py
1.15 KB
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_info.py
917 B
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_interfaces.py
1.85 KB
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_parser.py
2.10 KB
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_v1parser.py
4.39 KB
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_v2parser.py
6.47 KB
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
_wrapper.py
3.59 KB
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
243 B
lrw-r--r--
2024-08-27 10:30:39
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).