"""Low-level SOAP transport for the ISPConfig remote API. ISPConfig's ``/remote/index.php`` exposes PHP ``SoapServer`` in non-WSDL mode and refuses to generate a WSDL (``?wsdl`` returns a fault). That kills zeep, which requires WSDL. So we hand-roll the envelopes over ``urllib`` — ISPConfig only uses a handful of XSD scalar types plus ``xml-soap`` Map/Array, and the responses are parseable in ~80 lines of stdlib XML. This module is private. Callers stick to :class:`~ispconfig.client.ISPConfigClient`. """ from __future__ import annotations import logging import ssl import urllib.error import urllib.request from collections.abc import Iterable, Mapping from typing import Any from xml.etree import ElementTree as ET from xml.sax.saxutils import escape as xml_escape log = logging.getLogger("ispconfig") _NS = { "soap": "http://schemas.xmlsoap.org/soap/envelope/", "xsd": "http://www.w3.org/2001/XMLSchema", "xsi": "http://www.w3.org/2001/XMLSchema-instance", "ns1": "urn:ispconfig", } # PHP SoapServer emits xmlns:ns1="/remote/index.php" — we match by localname # rather than by namespace, so responses decode regardless of the ns1 URI. class SoapFault(Exception): """Raw SOAP fault. Mapped to typed exceptions by the client layer.""" def __init__(self, faultcode: str, faultstring: str) -> None: super().__init__(f"{faultcode}: {faultstring}") self.faultcode = faultcode self.faultstring = faultstring class SoapTransport: """Minimal non-WSDL SOAP transport over HTTPS.""" def __init__(self, url: str, *, verify_ssl: bool = True, timeout: float = 30.0) -> None: self.url = url self.timeout = timeout if verify_ssl: self._ctx = ssl.create_default_context() else: self._ctx = ssl._create_unverified_context() # noqa: SLF001 # ---- public API --------------------------------------------------- def call(self, method: str, args: Iterable[tuple[str, Any]]) -> Any: """Invoke a SOAP method with positional params (name, value) pairs. ISPConfig's SOAP dispatch uses positional args — the local names ``session_id``, ``primary_id``, ``params`` etc. are cosmetic; what actually matters is order. """ envelope = self._build_envelope(method, args) body = self._post(method, envelope) return self._parse_response(method, body) # ---- internals ---------------------------------------------------- def _post(self, method: str, envelope: str) -> bytes: req = urllib.request.Request( self.url, data=envelope.encode("utf-8"), headers={ "Content-Type": "text/xml; charset=UTF-8", "SOAPAction": f"urn:ispconfig#{method}", "User-Agent": "ispconfig-py/0.1", }, ) try: with urllib.request.urlopen(req, context=self._ctx, timeout=self.timeout) as resp: data: bytes = resp.read() return data except urllib.error.HTTPError as e: # PHP SoapServer returns faults as HTTP 500 with a fault envelope # in the body — still parseable. if e.code == 500 and e.headers.get("Content-Type", "").startswith("text/xml"): err_data: bytes = e.read() return err_data raise @staticmethod def _build_envelope(method: str, args: Iterable[tuple[str, Any]]) -> str: arg_xml = "".join(_encode_arg(name, value) for name, value in args) return ( '' '' f'{arg_xml}' '' ) @staticmethod def _parse_response(method: str, body: bytes) -> Any: try: root = ET.fromstring(body) except ET.ParseError as e: raise SoapFault("Client.ParseError", f"Invalid XML: {e}") from e # Walk by local-name so the bogus ns1="/remote/index.php" doesn't bite us. fault = _find_local(root, "Fault") if fault is not None: code = _text(_find_local(fault, "faultcode")) or "Server" msg = _text(_find_local(fault, "faultstring")) or "Unknown SOAP fault" raise SoapFault(code, msg) resp = _find_local(root, f"{method}Response") if resp is None: raise SoapFault( "Client.UnexpectedResponse", f"No <{method}Response> element in SOAP body", ) ret = _find_local(resp, "return") if ret is None: return None return _decode(ret) # ---- XML encoding ----------------------------------------------------- def _encode_arg(name: str, value: Any) -> str: if isinstance(value, bool): return f'<{name} xsi:type="xsd:boolean">{"true" if value else "false"}' if isinstance(value, int): return f'<{name} xsi:type="xsd:int">{value}' if isinstance(value, float): return f'<{name} xsi:type="xsd:double">{value}' if isinstance(value, str): return f'<{name} xsi:type="xsd:string">{xml_escape(value)}' if value is None: return f'<{name} xsi:nil="true"/>' if isinstance(value, Mapping): items = "".join( f'{xml_escape(str(k))}' f'{_encode_value_tag("value", v)}' for k, v in value.items() ) return f'<{name} xsi:type="ns2:Map" xmlns:ns2="http://xml.apache.org/xml-soap">{items}' if isinstance(value, (list, tuple)): items = "".join(_encode_value_tag("item", v) for v in value) return ( f'<{name} xsi:type="SOAP-ENC:Array" ' f'SOAP-ENC:arrayType="xsd:anyType[{len(value)}]">{items}' ) raise TypeError(f"Cannot encode {type(value).__name__} for SOAP arg {name!r}") def _encode_value_tag(tag: str, value: Any) -> str: # Same as _encode_arg but with a shared `value`/`item` tag name. return _encode_arg(tag, value) # ---- XML decoding ----------------------------------------------------- def _decode(el: ET.Element) -> Any: xsi_type = _xsi_type(el) # Arrays first — a response can be an Array of Maps, and the outer element # has ... children that we must NOT confuse # with map entries. if (xsi_type and xsi_type.endswith(":Array")) or _is_array(el): return _decode_array(el) if (xsi_type and xsi_type.endswith(":Map")) or _is_map(el): return _decode_map(el) if xsi_type and xsi_type.endswith(":boolean"): return (el.text or "").strip().lower() == "true" if xsi_type and (xsi_type.endswith(":int") or xsi_type.endswith(":long")): try: return int((el.text or "").strip()) except ValueError: return el.text if xsi_type and (xsi_type.endswith(":double") or xsi_type.endswith(":float")): try: return float((el.text or "").strip()) except ValueError: return el.text nil = el.get("{http://www.w3.org/2001/XMLSchema-instance}nil") if nil == "true": return None # Structs where ISPConfig returns arrays of scalars (e.g. ints) still # render as strings — PHP's $app->db->queryAll output is stringified. return el.text if el.text is not None else "" def _decode_map(el: ET.Element) -> dict[str, Any]: out: dict[str, Any] = {} for item in _iter_local(el, "item"): key_el = _find_local(item, "key") val_el = _find_local(item, "value") if key_el is None: continue key = (key_el.text or "").strip() out[key] = _decode(val_el) if val_el is not None else None return out def _decode_array(el: ET.Element) -> list[Any]: return [_decode(child) for child in _iter_local(el, "item")] def _is_map(el: ET.Element) -> bool: # apache-style Map with no xsi:type on the outer element but item/key/value # children. Check DIRECT children only — descendants may match for a # deeply nested array-of-maps and fool us. kids = list(el) if not kids: return False if not all(_local(k.tag) == "item" for k in kids): return False return any( any(_local(gk.tag) == "key" for gk in item) for item in kids ) def _is_array(el: ET.Element) -> bool: arr = el.get("{http://schemas.xmlsoap.org/soap/encoding/}arrayType") return arr is not None def _xsi_type(el: ET.Element) -> str | None: return el.get("{http://www.w3.org/2001/XMLSchema-instance}type") # ---- XML walking (namespace-agnostic) --------------------------------- def _local(tag: str) -> str: return tag.rsplit("}", 1)[-1] def _text(el: ET.Element | None) -> str | None: return el.text if el is not None else None def _find_local(root: ET.Element, name: str) -> ET.Element | None: if _local(root.tag) == name: return root for el in root.iter(): if _local(el.tag) == name: return el return None def _iter_local(parent: ET.Element, name: str) -> list[ET.Element]: return [el for el in parent if _local(el.tag) == name]