From 9438b4e75165fd9b8245fe80d61e6aa3f29dd95a Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 22 Apr 2026 13:24:58 -0700 Subject: [PATCH] feat: initial ispconfig-py SDK (sites / dns / mail / databases / clients) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Python 3.10+ SDK wrapping the ISPConfig remote SOAP API so we stop writing throwaway PHP snippets every time we need to touch a site, zone, or mailbox. Why no zeep: ISPConfig's /remote/index.php exposes PHP SoapServer in non-WSDL mode and refuses WSDL generation (?wsdl returns a fault). zeep requires WSDL, so the stated dependency wouldn't actually work. Instead we hand-roll SOAP envelopes with the stdlib (urllib + xml.etree). Zero runtime deps. Structure: - src/ispconfig/_soap.py — envelope encode/decode, fault surfacing - src/ispconfig/client.py — ISPConfigClient context manager, retry - src/ispconfig/exceptions.py — ISPConfigError / Auth / Permission / NotFound / Fault - src/ispconfig/sites.py — web_domain get/add/update/delete + enable_php / enable_letsencrypt helpers - src/ispconfig/dns.py — zones + A/CNAME/MX/TXT records, dns_a_add type-column workaround - src/ispconfig/mail.py — mail domains, users, forwards, create_mailbox helper - src/ispconfig/databases.py — convenience facade over sites_database_* - src/ispconfig/clients.py — client + sys_groupid lookups - src/ispconfig/types.py — TypedDicts for response shapes (no pydantic) Institutional knowledge baked into docstrings + README footgun list: - sites_web_domain_update 2nd arg is client_id (not primary_id); admin = 0 - sys_groupid=1 -> pass client_id=0 on update, else ownership churns - fastcgi_php_version vs server_php_id depending on panel version - dns_a_add type-column bug (<= 3.2.11) — wrapper issues follow-up update - dns_zone_get_id wants origin WITHOUT trailing dot on 3.2.11+ (contrary to what the older snippets say). Verified live against Rackham 2026-04-22. - mail_user_get returns a bare map on exactly-one-match filter dicts — wrapper normalizes to list - session timeouts mid-op: client detects + re-auths once (max_retries knob) Tests: - tests/test_unit.py — 12 unit tests against a fake transport - tests/test_smoke.py — live read-only smoke test, gated on env vars: ISPCONFIG_TEST_URL, ISPCONFIG_TEST_USER, ISPCONFIG_TEST_PASS Covers login, web_domain_get(156), dns_zone_get_id, mail_user_get filter. Tooling: - mypy strict-ish (disallow untyped defs, warn-return-any, no implicit optional) - ruff with E/F/W/I/B/UP/N/SLF/RUF lint sets - pip install -e .[dev] for pytest / mypy / ruff --- .gitignore | 18 +++ LICENSE | 21 +++ README.md | 199 +++++++++++++++++++++++++++ pyproject.toml | 95 +++++++++++++ src/ispconfig/__init__.py | 36 +++++ src/ispconfig/_soap.py | 262 ++++++++++++++++++++++++++++++++++++ src/ispconfig/client.py | 130 ++++++++++++++++++ src/ispconfig/clients.py | 48 +++++++ src/ispconfig/databases.py | 39 ++++++ src/ispconfig/dns.py | 159 ++++++++++++++++++++++ src/ispconfig/exceptions.py | 69 ++++++++++ src/ispconfig/mail.py | 152 +++++++++++++++++++++ src/ispconfig/py.typed | 0 src/ispconfig/sites.py | 157 +++++++++++++++++++++ src/ispconfig/types.py | 156 +++++++++++++++++++++ tests/__init__.py | 0 tests/conftest.py | 22 +++ tests/test_smoke.py | 47 +++++++ tests/test_unit.py | 187 +++++++++++++++++++++++++ 19 files changed, 1797 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 pyproject.toml create mode 100644 src/ispconfig/__init__.py create mode 100644 src/ispconfig/_soap.py create mode 100644 src/ispconfig/client.py create mode 100644 src/ispconfig/clients.py create mode 100644 src/ispconfig/databases.py create mode 100644 src/ispconfig/dns.py create mode 100644 src/ispconfig/exceptions.py create mode 100644 src/ispconfig/mail.py create mode 100644 src/ispconfig/py.typed create mode 100644 src/ispconfig/sites.py create mode 100644 src/ispconfig/types.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_smoke.py create mode 100644 tests/test_unit.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..739d6a2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +.venv/ +venv/ +env/ +.mypy_cache/ +.ruff_cache/ +.pytest_cache/ +*.egg-info/ +dist/ +build/ +.env +.envrc +.vscode/ +.idea/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..77c4caf --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Sulkta Coop + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..82eaf18 --- /dev/null +++ b/README.md @@ -0,0 +1,199 @@ +# ispconfig-py + +Python SDK for the ISPConfig remote SOAP API. Internal tooling for the Sulkta +Coop — wraps the panel's SOAP surface so we stop writing throw-away PHP +scripts every time we need to touch a site, zone, or mailbox. + +The SDK covers the methods we actually use today (sites, DNS, mail, databases, +clients). More methods can be added as needed. + +## Why no zeep? + +ISPConfig's `/remote/index.php` uses PHP's `SoapServer` in non-WSDL mode and +refuses to generate a WSDL (`?wsdl` returns a fault). zeep requires WSDL. So +we hand-roll SOAP envelopes with the stdlib (`urllib` + `xml.etree`). Zero +runtime dependencies — fewer moving parts, nothing to pin. + +## Install + +```bash +pip install -e . # for use +pip install -e .[dev] # with pytest / mypy / ruff +``` + +Python 3.10+. + +## Quick start + +```python +from ispconfig import ISPConfigClient + +with ISPConfigClient( + "https://panel.example.com:8080/remote/index.php", + username="kayos", + password="hunter2", +) as c: + site = c.sites.web_domain_get(156) + print(site["domain"], site["php"]) + + # Flip on Let's Encrypt in one call. + c.sites.enable_letsencrypt(156) +``` + +The `with` block handles login on enter and logout on exit. Session IDs are +managed internally — callers don't touch them. + +Set `verify_ssl=False` for dev boxes with self-signed certs. Default is `True`. + +## Modules at a glance + +### `sites` + +```python +c.sites.web_domain_get(156) +c.sites.web_domain_update(client_id=0, primary_id=156, params={"php": "fast-cgi"}) +c.sites.enable_php(156, mode="php-fpm", server_php_id=2, pm="ondemand") +c.sites.enable_letsencrypt(156) +``` + +### `dns` + +```python +zone_id = c.dns.zone_get_id("example.com.") # note the trailing dot +zone = c.dns.zone_get(zone_id) +records = c.dns.rr_get_all_by_zone(zone_id) +rr_id = c.dns.a_add(0, {"zone": zone_id, "name": "www", "data": "1.2.3.4", "ttl": 3600, "active": "Y"}) +``` + +### `mail` + +```python +md = c.mail.domain_get_by_domain("example.com") +users = c.mail.user_get({"email": "%@example.com"}) +new_id = c.mail.create_mailbox(client_id=5, domain="example.com", + local_part="info", password="x", quota_mb=2048) +``` + +### `databases` + +```python +db = c.databases.get(42) +c.databases.user_update(client_id=5, primary_id=42, params={"database_password": "x"}) +``` + +### `clients` + +```python +cli = c.clients.get_by_username("jacob") +groupid = c.clients.get_groupid(cli["client_id"]) +``` + +## Footguns (captured here so nobody has to rediscover them) + +- **`sites_web_domain_update`'s second arg is `client_id`, not `primary_id`.** + For admin-owned sites pass `client_id=0`. Wrong order = permission denied + or silent ownership change. `SitesModule.web_domain_update` signature + enforces the correct order. +- **`sys_groupid=1` is the admin group.** On update calls, if you read a + record with `sys_groupid=1`, pass `client_id=0`, not `1`, or ISPConfig + re-owns the record to "admin's admin". `enable_php` / `enable_letsencrypt` + handle this automatically. +- **`fastcgi_php_version` vs `server_php_id`.** Older installs use the string + field `fastcgi_php_version` (e.g. `"PHP-8.2:/usr/bin/php-cgi8.2:/etc/php/8.2/cgi"`), + newer installs use `server_php_id` (int FK to `server_php`). Our + `enable_php` wraps the new field; set it directly via `web_domain_update` + if you need the old one. +- **BIND trailing dot on zone origins — BUT in reverse.** Contrary to a lot + of older documentation and PHP snippets floating around, ISPConfig 3.2.11+ + wants the origin **without** a trailing dot: `dns_zone_get_id("example.com")` + works, `dns_zone_get_id("example.com.")` raises `no_domain_found`. Our + wrapper strips a trailing dot for you so either call works. Verified + against Rackham 2026-04-22. +- **`mail_user_get` with a filter dict returns inconsistent shapes.** If the + filter matches multiple rows you get an array; exactly-one match returns a + bare map. Our `mail.user_get(filter_dict)` always normalizes to a list. +- **`no_domain_found` fault.** Both `dns_zone_get_id` and (in some paths) + `mail_domain_get_by_domain` return a SOAP fault with `faultcode=no_domain_found` + when the record is missing. Mapped to `NotFoundError`. +- **`dns_a_add` type-column bug.** On some ISPConfig versions (<= ~3.2.11) + `dns_a_add` inserts the `dns_rr` row without setting the `type` column, + so BIND never emits the record. `DnsModule.a_add(..., fix_type_bug=True)` + (default) issues a follow-up `dns_rr_update` with `{"type": "A"}` — no-op + on fixed versions, required on the broken ones. +- **Session timeouts.** ISPConfig sessions expire mid-long-operation without + warning. The client detects the "session not valid" fault, re-authenticates, + and retries once (`max_retries=1` by default). Tune via + `ISPConfigClient(..., max_retries=3)` for very long batch jobs. +- **`mail_forward_*`, not `mail_forwarding_*`.** The remote method is + singular; we expose it as `c.mail.forward_add` and keep + `c.mail.forwarding_add` as an alias for PHP-snippet refugees. +- **Filter dicts on `mail_user_get`.** Pass an int to get one row; pass a + dict like `{"email": "%@example.com"}` to get a list. The SOAP method is + overloaded and untyped on the wire. + +## Errors + +Typed hierarchy — never `zeep.exceptions.Fault` (we don't use zeep), never a +raw SOAP fault leaked to callers: + +``` +ISPConfigError +├── AuthError # login failed OR session expired +├── PermissionError # caller lacks rights on the record +├── NotFoundError # primary_id didn't match anything +└── FaultError # everything else (.faultcode, .faultstring) +``` + +## Tests + +```bash +pytest # unit tests only, no network +``` + +To run the live smoke test against a real panel: + +```bash +export ISPCONFIG_TEST_URL="https://panel.example.com:8080/remote/index.php" +export ISPCONFIG_TEST_USER="kayos" +export ISPCONFIG_TEST_PASS="..." +pytest tests/test_smoke.py +``` + +The smoke test is read-only — no `_add` / `_update` / `_delete` calls. Safe +against production. + +## Development + +```bash +pip install -e .[dev] +ruff check src/ +mypy src/ispconfig +pytest +``` + +## Not yet covered + +The remote API surface is huge. These are intentionally left out of v0.1 — +add as needed: + +- `sites_web_aliasdomain_*`, `sites_web_subdomain_*`, + `sites_web_vhost_{subdomain,aliasdomain}_*` +- `sites_ftp_user_*`, `sites_shell_user_*`, `sites_webdav_user_*` +- `sites_cron_*` +- `sites_web_domain_backup`, `sites_web_domain_backup_list`, + `mail_user_backup`, `mail_user_backup_list` +- `dns_{aaaa,ns,srv,ptr,tlsa,ds,caa,sshfp,dname,loc,hinfo,naptr,rp,alias}_*` +- `dns_slave_*`, `dns_zone_set_dnssec`, `dns_zone_get_by_user`, `dns_templatezone_*` +- `mail_alias_*`, `mail_aliasdomain_*`, `mail_catchall_*`, `mail_filter_*`, + `mail_fetchmail_*`, `mail_mailinglist_*`, `mail_policy_*`, + `mail_relay_{domain,recipient}_*`, `mail_transport_*`, + `mail_{whitelist,blacklist}_*`, `mail_spamfilter_*`, `mail_user_filter_*` +- `client_add`, `client_update`, `client_delete`, `client_change_password`, + `client_template_additional_*`, `client_templates_get_all`, + `client_login_get` +- `server_get`, `server_get_all`, `admin.*`, `monitor.*`, `aps.*`, + `openvz.*`, `domains.*` + +## License + +MIT — see `LICENSE`. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..18d3537 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,95 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "ispconfig" +version = "0.1.0" +description = "Python SDK for the ISPConfig remote SOAP API — Sulkta Coop internal tooling." +readme = "README.md" +license = { text = "MIT" } +requires-python = ">=3.10" +authors = [ + { name = "Sulkta Coop" }, +] +# Zero runtime deps on purpose. ISPConfig's SOAP endpoint disables WSDL +# generation (?wsdl returns a fault), so zeep can't help us anyway — we +# hand-roll envelopes with the stdlib. +dependencies = [] + +[project.optional-dependencies] +dev = [ + "pytest>=7", + "mypy>=1.8", + "ruff>=0.3", +] + +[project.urls] +Homepage = "http://192.168.0.5:3001/Sulkta-Coop/ispconfig-py" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.package-data] +ispconfig = ["py.typed"] + +# ---- mypy ----------------------------------------------------------- + +[tool.mypy] +python_version = "3.10" +packages = ["ispconfig"] +mypy_path = "src" +strict_optional = true +warn_unused_ignores = true +warn_redundant_casts = true +warn_return_any = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +no_implicit_optional = true +# We intentionally don't enable `disallow_any_expr` — SOAP responses are +# dicts of Any and forcing typing everywhere would create fake certainty. + +[[tool.mypy.overrides]] +module = "ispconfig.types" +disallow_untyped_defs = false + +# ---- ruff ----------------------------------------------------------- + +[tool.ruff] +line-length = 110 +target-version = "py310" +src = ["src"] + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "F", # pyflakes + "W", # pycodestyle warnings + "I", # isort + "B", # bugbear + "UP", # pyupgrade + "N", # pep8-naming + "SLF", # flake8-self (private access) + "RUF", +] +ignore = [ + "E501", # line length — handled by formatter when it cares + "N818", # exception suffix — our hierarchy predates this rule + "B008", # function calls in argument defaults — not our style anyway +] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["SLF001", "N802"] +# Submodules call back into the client's dispatcher (`_call`) by design — +# it's the single chokepoint for session management and retry logic. +"src/ispconfig/sites.py" = ["SLF001"] +"src/ispconfig/dns.py" = ["SLF001"] +"src/ispconfig/mail.py" = ["SLF001"] +"src/ispconfig/clients.py" = ["SLF001"] + +# ---- pytest --------------------------------------------------------- + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-ra" diff --git a/src/ispconfig/__init__.py b/src/ispconfig/__init__.py new file mode 100644 index 0000000..112d5dc --- /dev/null +++ b/src/ispconfig/__init__.py @@ -0,0 +1,36 @@ +"""Python SDK for the ISPConfig remote SOAP API. + +Quick start:: + + from ispconfig import ISPConfigClient + + with ISPConfigClient("https://panel.example.com:8080/remote/index.php", + "admin", "password") as c: + site = c.sites.web_domain_get(156) + print(site["domain"], site["php"]) + +See the individual module docstrings for the full set of wrapped calls. +""" + +from __future__ import annotations + +from .client import ISPConfigClient +from .exceptions import ( + AuthError, + FaultError, + ISPConfigError, + NotFoundError, + PermissionError, +) + +__version__ = "0.1.0" + +__all__ = [ + "AuthError", + "FaultError", + "ISPConfigClient", + "ISPConfigError", + "NotFoundError", + "PermissionError", + "__version__", +] diff --git a/src/ispconfig/_soap.py b/src/ispconfig/_soap.py new file mode 100644 index 0000000..f86f933 --- /dev/null +++ b/src/ispconfig/_soap.py @@ -0,0 +1,262 @@ +"""Low-level SOAP transport for the ISPConfig remote API. + +ISPConfig's ``/remote/index.php`` exposes PHP ``SoapServer`` in non-WSDL mode +and refuses to generate a WSDL (``?wsdl`` returns a fault). That kills zeep, +which requires WSDL. So we hand-roll the envelopes over ``urllib`` — ISPConfig +only uses a handful of XSD scalar types plus ``xml-soap`` Map/Array, and the +responses are parseable in ~80 lines of stdlib XML. + +This module is private. Callers stick to :class:`~ispconfig.client.ISPConfigClient`. +""" + +from __future__ import annotations + +import logging +import ssl +import urllib.error +import urllib.request +from collections.abc import Iterable, Mapping +from typing import Any +from xml.etree import ElementTree as ET +from xml.sax.saxutils import escape as xml_escape + +log = logging.getLogger("ispconfig") + +_NS = { + "soap": "http://schemas.xmlsoap.org/soap/envelope/", + "xsd": "http://www.w3.org/2001/XMLSchema", + "xsi": "http://www.w3.org/2001/XMLSchema-instance", + "ns1": "urn:ispconfig", +} + +# PHP SoapServer emits xmlns:ns1="/remote/index.php" — we match by localname +# rather than by namespace, so responses decode regardless of the ns1 URI. + + +class SoapFault(Exception): + """Raw SOAP fault. Mapped to typed exceptions by the client layer.""" + + def __init__(self, faultcode: str, faultstring: str) -> None: + super().__init__(f"{faultcode}: {faultstring}") + self.faultcode = faultcode + self.faultstring = faultstring + + +class SoapTransport: + """Minimal non-WSDL SOAP transport over HTTPS.""" + + def __init__(self, url: str, *, verify_ssl: bool = True, timeout: float = 30.0) -> None: + self.url = url + self.timeout = timeout + if verify_ssl: + self._ctx = ssl.create_default_context() + else: + self._ctx = ssl._create_unverified_context() # noqa: SLF001 + + # ---- public API --------------------------------------------------- + + def call(self, method: str, args: Iterable[tuple[str, Any]]) -> Any: + """Invoke a SOAP method with positional params (name, value) pairs. + + ISPConfig's SOAP dispatch uses positional args — the local names + ``session_id``, ``primary_id``, ``params`` etc. are cosmetic; what + actually matters is order. + """ + envelope = self._build_envelope(method, args) + body = self._post(method, envelope) + return self._parse_response(method, body) + + # ---- internals ---------------------------------------------------- + + def _post(self, method: str, envelope: str) -> bytes: + req = urllib.request.Request( + self.url, + data=envelope.encode("utf-8"), + headers={ + "Content-Type": "text/xml; charset=UTF-8", + "SOAPAction": f"urn:ispconfig#{method}", + "User-Agent": "ispconfig-py/0.1", + }, + ) + try: + with urllib.request.urlopen(req, context=self._ctx, timeout=self.timeout) as resp: + data: bytes = resp.read() + return data + except urllib.error.HTTPError as e: + # PHP SoapServer returns faults as HTTP 500 with a fault envelope + # in the body — still parseable. + if e.code == 500 and e.headers.get("Content-Type", "").startswith("text/xml"): + err_data: bytes = e.read() + return err_data + raise + + @staticmethod + def _build_envelope(method: str, args: Iterable[tuple[str, Any]]) -> str: + arg_xml = "".join(_encode_arg(name, value) for name, value in args) + return ( + '' + '' + f'{arg_xml}' + '' + ) + + @staticmethod + def _parse_response(method: str, body: bytes) -> Any: + try: + root = ET.fromstring(body) + except ET.ParseError as e: + raise SoapFault("Client.ParseError", f"Invalid XML: {e}") from e + + # Walk by local-name so the bogus ns1="/remote/index.php" doesn't bite us. + fault = _find_local(root, "Fault") + if fault is not None: + code = _text(_find_local(fault, "faultcode")) or "Server" + msg = _text(_find_local(fault, "faultstring")) or "Unknown SOAP fault" + raise SoapFault(code, msg) + + resp = _find_local(root, f"{method}Response") + if resp is None: + raise SoapFault( + "Client.UnexpectedResponse", + f"No <{method}Response> element in SOAP body", + ) + ret = _find_local(resp, "return") + if ret is None: + return None + return _decode(ret) + + +# ---- XML encoding ----------------------------------------------------- + + +def _encode_arg(name: str, value: Any) -> str: + if isinstance(value, bool): + return f'<{name} xsi:type="xsd:boolean">{"true" if value else "false"}' + if isinstance(value, int): + return f'<{name} xsi:type="xsd:int">{value}' + if isinstance(value, float): + return f'<{name} xsi:type="xsd:double">{value}' + if isinstance(value, str): + return f'<{name} xsi:type="xsd:string">{xml_escape(value)}' + if value is None: + return f'<{name} xsi:nil="true"/>' + if isinstance(value, Mapping): + items = "".join( + f'{xml_escape(str(k))}' + f'{_encode_value_tag("value", v)}' + for k, v in value.items() + ) + return f'<{name} xsi:type="ns2:Map" xmlns:ns2="http://xml.apache.org/xml-soap">{items}' + if isinstance(value, (list, tuple)): + items = "".join(_encode_value_tag("item", v) for v in value) + return ( + f'<{name} xsi:type="SOAP-ENC:Array" ' + f'SOAP-ENC:arrayType="xsd:anyType[{len(value)}]">{items}' + ) + raise TypeError(f"Cannot encode {type(value).__name__} for SOAP arg {name!r}") + + +def _encode_value_tag(tag: str, value: Any) -> str: + # Same as _encode_arg but with a shared `value`/`item` tag name. + return _encode_arg(tag, value) + + +# ---- XML decoding ----------------------------------------------------- + + +def _decode(el: ET.Element) -> Any: + xsi_type = _xsi_type(el) + # Arrays first — a response can be an Array of Maps, and the outer element + # has ... children that we must NOT confuse + # with map entries. + if (xsi_type and xsi_type.endswith(":Array")) or _is_array(el): + return _decode_array(el) + if (xsi_type and xsi_type.endswith(":Map")) or _is_map(el): + return _decode_map(el) + if xsi_type and xsi_type.endswith(":boolean"): + return (el.text or "").strip().lower() == "true" + if xsi_type and (xsi_type.endswith(":int") or xsi_type.endswith(":long")): + try: + return int((el.text or "").strip()) + except ValueError: + return el.text + if xsi_type and (xsi_type.endswith(":double") or xsi_type.endswith(":float")): + try: + return float((el.text or "").strip()) + except ValueError: + return el.text + nil = el.get("{http://www.w3.org/2001/XMLSchema-instance}nil") + if nil == "true": + return None + # Structs where ISPConfig returns arrays of scalars (e.g. ints) still + # render as strings — PHP's $app->db->queryAll output is stringified. + return el.text if el.text is not None else "" + + +def _decode_map(el: ET.Element) -> dict[str, Any]: + out: dict[str, Any] = {} + for item in _iter_local(el, "item"): + key_el = _find_local(item, "key") + val_el = _find_local(item, "value") + if key_el is None: + continue + key = (key_el.text or "").strip() + out[key] = _decode(val_el) if val_el is not None else None + return out + + +def _decode_array(el: ET.Element) -> list[Any]: + return [_decode(child) for child in _iter_local(el, "item")] + + +def _is_map(el: ET.Element) -> bool: + # apache-style Map with no xsi:type on the outer element but item/key/value + # children. Check DIRECT children only — descendants may match for a + # deeply nested array-of-maps and fool us. + kids = list(el) + if not kids: + return False + if not all(_local(k.tag) == "item" for k in kids): + return False + return any( + any(_local(gk.tag) == "key" for gk in item) + for item in kids + ) + + +def _is_array(el: ET.Element) -> bool: + arr = el.get("{http://schemas.xmlsoap.org/soap/encoding/}arrayType") + return arr is not None + + +def _xsi_type(el: ET.Element) -> str | None: + return el.get("{http://www.w3.org/2001/XMLSchema-instance}type") + + +# ---- XML walking (namespace-agnostic) --------------------------------- + + +def _local(tag: str) -> str: + return tag.rsplit("}", 1)[-1] + + +def _text(el: ET.Element | None) -> str | None: + return el.text if el is not None else None + + +def _find_local(root: ET.Element, name: str) -> ET.Element | None: + if _local(root.tag) == name: + return root + for el in root.iter(): + if _local(el.tag) == name: + return el + return None + + +def _iter_local(parent: ET.Element, name: str) -> list[ET.Element]: + return [el for el in parent if _local(el.tag) == name] diff --git a/src/ispconfig/client.py b/src/ispconfig/client.py new file mode 100644 index 0000000..5dd6283 --- /dev/null +++ b/src/ispconfig/client.py @@ -0,0 +1,130 @@ +"""``ISPConfigClient`` — top-level entry point, session lifecycle, submodule wiring.""" + +from __future__ import annotations + +import logging +from types import TracebackType +from typing import Any + +from . import exceptions as _exc +from ._soap import SoapFault, SoapTransport +from .clients import ClientsModule +from .databases import DatabasesModule +from .dns import DnsModule +from .mail import MailModule +from .sites import SitesModule + +log = logging.getLogger("ispconfig") + + +class ISPConfigClient: + """High-level client for the ISPConfig remote SOAP API. + + Use as a context manager to auto-login on enter and auto-logout on exit:: + + with ISPConfigClient(url, "kayos", "hunter2") as c: + site = c.sites.web_domain_get(156) + print(site["domain"]) + + Session IDs are managed internally and never returned to callers. + If a call fails with a session-expired fault, the client re-authenticates + once and retries (controlled by ``max_retries``). + """ + + def __init__( + self, + url: str, + username: str, + password: str, + *, + verify_ssl: bool = True, + timeout: float = 30.0, + max_retries: int = 1, + ) -> None: + self._url = url + self._username = username + self._password = password + self._max_retries = max_retries + self._transport = SoapTransport(url, verify_ssl=verify_ssl, timeout=timeout) + self._session_id: str | None = None + + self.sites = SitesModule(self) + self.dns = DnsModule(self) + self.mail = MailModule(self) + self.databases = DatabasesModule(self) + self.clients = ClientsModule(self) + + # ---- context manager --------------------------------------------- + + def __enter__(self) -> ISPConfigClient: + self.login() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + try: + self.logout() + except Exception as e: # pragma: no cover — cleanup path, never raise out + log.warning("logout on __exit__ failed: %s", e) + + # ---- session lifecycle ------------------------------------------- + + def login(self) -> None: + """Open a session. Stores the session id internally.""" + try: + sid = self._transport.call( + "login", + (("username", self._username), ("password", self._password)), + ) + except SoapFault as f: + raise _exc.map_fault(f.faultcode, f.faultstring) from f + if not isinstance(sid, str) or not sid: + raise _exc.AuthError("login returned empty session id") + self._session_id = sid + log.debug("ispconfig login ok (session %s...)", sid[:8]) + + def logout(self) -> bool: + """Close the session. Safe to call even if never logged in.""" + if self._session_id is None: + return False + sid = self._session_id + self._session_id = None + try: + result = self._transport.call("logout", (("session_id", sid),)) + except SoapFault as f: + log.debug("logout fault ignored: %s", f) + return False + return bool(result) + + @property + def session_id(self) -> str | None: + """Read-only accessor — exposed for debugging, not for API calls.""" + return self._session_id + + # ---- the hot path ------------------------------------------------ + + def _call(self, method: str, *args: tuple[str, Any]) -> Any: + """Invoke ``method(session_id, *args)`` with typed error mapping + retry. + + This is what the submodules call. It prepends the session id, + translates SOAP faults, and retries once on session expiry. + """ + attempts = 0 + while True: + if self._session_id is None: + self.login() + sid_arg = ("session_id", self._session_id or "") + try: + return self._transport.call(method, (sid_arg, *args)) + except SoapFault as f: + mapped = _exc.map_fault(f.faultcode, f.faultstring) + if _exc.is_session_expired(mapped) and attempts < self._max_retries: + log.info("ispconfig session expired, re-authenticating") + self._session_id = None + attempts += 1 + continue + raise mapped from f diff --git a/src/ispconfig/clients.py b/src/ispconfig/clients.py new file mode 100644 index 0000000..dc6fe4c --- /dev/null +++ b/src/ispconfig/clients.py @@ -0,0 +1,48 @@ +"""``clients.*`` — ISPConfig client/customer lookups. + +ISPConfig's client model has two IDs you'll trip over: + +* ``client_id`` — primary key of the ``client`` table. +* ``sys_groupid`` — the "owner group" used by sites/dns/mail records. + ``sys_groupid=1`` is the admin group. For regular clients, ``sys_groupid`` + is *not* equal to ``client_id``; you must look it up with + :meth:`ClientsModule.get_groupid`. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from .types import Client + +if TYPE_CHECKING: + from .client import ISPConfigClient + + +class ClientsModule: + def __init__(self, client: ISPConfigClient) -> None: + self._c = client + + def get(self, primary_id: int) -> Client: + return cast(Client, self._c._call("client_get", ("primary_id", int(primary_id)))) + + def get_groupid(self, client_id: int) -> int: + """Look up ``sys_groupid`` for a given ``client_id``.""" + result = self._c._call("client_get_groupid", ("client_id", int(client_id))) + return int(result) if result else 0 + + def get_id(self, sys_userid: int) -> int: + """Reverse of :meth:`get_groupid` — client_id from a sys user id.""" + result = self._c._call("client_get_id", ("sys_userid", int(sys_userid))) + return int(result) if result else 0 + + def get_by_username(self, username: str) -> Client: + return cast(Client, self._c._call("client_get_by_username", ("username", username))) + + def get_by_groupid(self, groupid: int) -> Client: + return cast(Client, self._c._call("client_get_by_groupid", ("groupid", int(groupid)))) + + def get_all(self) -> list[int]: + """Return every ``client_id`` the API user can see.""" + result = self._c._call("client_get_all") + return [int(x) for x in (result or [])] diff --git a/src/ispconfig/databases.py b/src/ispconfig/databases.py new file mode 100644 index 0000000..2797cff --- /dev/null +++ b/src/ispconfig/databases.py @@ -0,0 +1,39 @@ +"""``databases.*`` — thin convenience wrapper around the sites-level DB calls. + +The underlying remote methods live under ``sites_database_*`` in ISPConfig, +but callers reasonably expect a top-level ``client.databases`` namespace. +This module just delegates to :class:`~ispconfig.sites.SitesModule`. +""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any + +from .types import Database, DatabaseUser + +if TYPE_CHECKING: + from .client import ISPConfigClient + + +class DatabasesModule: + def __init__(self, client: ISPConfigClient) -> None: + self._c = client + + def get(self, primary_id: int) -> Database: + return self._c.sites.database_get(primary_id) + + def add(self, client_id: int, params: Mapping[str, Any]) -> int: + return self._c.sites.database_add(client_id, params) + + def delete(self, primary_id: int) -> int: + return self._c.sites.database_delete(primary_id) + + def user_get(self, primary_id: int) -> DatabaseUser: + return self._c.sites.database_user_get(primary_id) + + def user_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return self._c.sites.database_user_add(client_id, params) + + def user_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return self._c.sites.database_user_update(client_id, primary_id, params) diff --git a/src/ispconfig/dns.py b/src/ispconfig/dns.py new file mode 100644 index 0000000..3a01d8c --- /dev/null +++ b/src/ispconfig/dns.py @@ -0,0 +1,159 @@ +"""``dns.*`` — zones and resource records. + +Footguns: + +* Zone origins **must end in a dot** per BIND convention: + ``dns_zone_get_id(..., 'example.com.')`` — without the trailing dot + ISPConfig returns 0 and you'll spend 20 minutes wondering why. +* ``dns_a_add`` in some ISPConfig builds (<= 3.2.11-ish) has a known bug + where the ``type`` column in ``dns_rr`` is not populated by the add + handler, which means the record exists in the table but BIND never + emits it. Workaround: after add, ``dns_rr_update`` with ``{'type':'A'}`` + to force the column. :meth:`DnsModule.a_add` does this for you and + emits a warning log — check your ISPConfig version. +""" + +from __future__ import annotations + +import logging +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any, cast + +from .types import DnsRr, DnsZone + +if TYPE_CHECKING: + from .client import ISPConfigClient + +log = logging.getLogger("ispconfig.dns") + + +class DnsModule: + def __init__(self, client: ISPConfigClient) -> None: + self._c = client + + # ---- zones -------------------------------------------------------- + + def zone_get(self, primary_id: int) -> DnsZone: + return cast(DnsZone, self._c._call("dns_zone_get", ("primary_id", int(primary_id)))) + + def zone_get_id(self, origin: str) -> int: + """Resolve a zone origin to its ``dns_soa.id``. + + .. note:: + Despite BIND convention, ISPConfig's ``dns_zone_get_id`` wants the + origin **without** a trailing dot — ``"example.com"``, not + ``"example.com."``. Passing the dotted form raises a + ``no_domain_found`` fault. This wrapper strips a trailing dot if + present so callers can be lazy either way. + + Returns 0 if the zone does not exist (``NotFoundError`` caught and + converted; other faults propagate). + """ + # ISPConfig's internal lookup compares against the stored `origin` + # column, which can be either with or without a dot depending on how + # the zone was created. Historically the spec said "must end in a dot" + # — that turns out to be wrong against 3.2.11+. + normalized = origin.rstrip(".") + try: + result = self._c._call("dns_zone_get_id", ("origin", normalized)) + except Exception as e: # pragma: no cover — exception classes tested in unit tests + if "no_domain_found" in str(e).lower() or "invalid domain" in str(e).lower(): + return 0 + raise + return int(result) if result else 0 + + def zone_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "dns_zone_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def zone_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "dns_zone_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def zone_delete(self, primary_id: int) -> int: + return int(self._c._call("dns_zone_delete", ("primary_id", int(primary_id)))) + + def rr_get_all_by_zone(self, zone_id: int) -> list[DnsRr]: + return self._c._call("dns_rr_get_all_by_zone", ("zone_id", int(zone_id))) or [] + + # ---- records ------------------------------------------------------ + + def a_add(self, client_id: int, params: Mapping[str, Any], *, fix_type_bug: bool = True) -> int: + """Add an A record. + + On some ISPConfig versions the ``type`` column of ``dns_rr`` is not + set by ``dns_a_add``. When ``fix_type_bug=True`` (default), we follow + up with a ``dns_rr_update`` that writes ``type='A'``. No-op on fixed + versions, harmless. + """ + rr_id = int(self._c._call( + "dns_a_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + if fix_type_bug and rr_id: + try: + self._c._call( + "dns_rr_update", + ("client_id", int(client_id)), + ("primary_id", rr_id), + ("params", {"type": "A"}), + ) + except Exception as e: # pragma: no cover — workaround logs & swallows + log.warning("dns.a_add: type-column workaround failed: %s", e) + return rr_id + + def a_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "dns_a_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def a_delete(self, primary_id: int) -> int: + return int(self._c._call("dns_a_delete", ("primary_id", int(primary_id)))) + + def cname_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "dns_cname_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def mx_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "dns_mx_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def txt_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "dns_txt_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def rr_get(self, method_prefix: str, primary_id: int) -> DnsRr: + """Generic ``dns_{type}_get`` — e.g. ``rr_get('cname', 42)``.""" + return cast(DnsRr, self._c._call(f"dns_{method_prefix}_get", ("primary_id", int(primary_id)))) + + def rr_update(self, type_: str, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + """Generic ``dns_{type}_update``. ``type_`` is ``a``, ``cname``, etc.""" + return int(self._c._call( + f"dns_{type_}_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def rr_delete(self, type_: str, primary_id: int) -> int: + return int(self._c._call(f"dns_{type_}_delete", ("primary_id", int(primary_id)))) diff --git a/src/ispconfig/exceptions.py b/src/ispconfig/exceptions.py new file mode 100644 index 0000000..dbae0b1 --- /dev/null +++ b/src/ispconfig/exceptions.py @@ -0,0 +1,69 @@ +"""Typed exceptions for the ISPConfig SDK. + +Never leak raw ``_soap.SoapFault`` to callers — map to one of these. +""" + +from __future__ import annotations + + +class ISPConfigError(Exception): + """Base exception for all SDK errors.""" + + +class AuthError(ISPConfigError): + """Authentication failed, or session has expired.""" + + +class PermissionError(ISPConfigError): + """Caller lacks permission for this operation. + + Name intentionally shadows the builtin — readers grepping for + ``PermissionError`` in ISPConfig code should find this. + """ + + +class NotFoundError(ISPConfigError): + """Requested record does not exist.""" + + +class FaultError(ISPConfigError): + """Generic SOAP fault we haven't classified.""" + + def __init__(self, faultcode: str, faultstring: str) -> None: + super().__init__(f"{faultcode}: {faultstring}") + self.faultcode = faultcode + self.faultstring = faultstring + + +def map_fault(faultcode: str, faultstring: str) -> ISPConfigError: + """Translate a raw SOAP fault into a typed :class:`ISPConfigError`. + + ISPConfig doesn't use structured fault subcodes — detection is by string. + """ + msg = faultstring.lower() + if "login failed" in msg or "login_error" in msg or "could not login" in msg: + return AuthError(faultstring) + if "session" in msg and ("expired" in msg or "not valid" in msg or "invalid" in msg): + return AuthError(faultstring) + if "permission denied" in msg or "you do not have the permissions" in msg or "not allowed" in msg: + return PermissionError(faultstring) + if ( + "no records found" in msg + or "not found" in msg + or "no record found" in msg + or "no_domain_found" in faultcode.lower() + or "invalid domain name" in msg + ): + return NotFoundError(faultstring) + return FaultError(faultcode, faultstring) + + +def is_session_expired(exc: BaseException) -> bool: + """True if the exception looks like ISPConfig's session-timeout fault. + + Used by :class:`ISPConfigClient` to decide whether to re-auth + retry. + """ + if not isinstance(exc, AuthError): + return False + msg = str(exc).lower() + return "session" in msg and ("expired" in msg or "invalid" in msg or "not valid" in msg) diff --git a/src/ispconfig/mail.py b/src/ispconfig/mail.py new file mode 100644 index 0000000..77bcb4c --- /dev/null +++ b/src/ispconfig/mail.py @@ -0,0 +1,152 @@ +"""``mail.*`` — mail domains, mailboxes, forwarders. + +Note: the ISPConfig remote method is ``mail_forward_*`` (singular), not +``mail_forwarding_*``. Our wrapper exposes :meth:`MailModule.forward_add` +etc; the old ``forwarding`` names are available as aliases for anyone coming +from the PHP snippets. +""" + +from __future__ import annotations + +import logging +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any, cast + +from .types import MailDomain, MailUser + +if TYPE_CHECKING: + from .client import ISPConfigClient + +log = logging.getLogger("ispconfig.mail") + + +class MailModule: + def __init__(self, client: ISPConfigClient) -> None: + self._c = client + + # ---- mail domains ------------------------------------------------- + + def domain_get(self, primary_id: int) -> MailDomain: + return cast(MailDomain, self._c._call("mail_domain_get", ("primary_id", int(primary_id)))) + + def domain_get_by_domain(self, domain: str) -> list[MailDomain]: + return self._c._call("mail_domain_get_by_domain", ("domain", domain)) or [] + + def domain_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "mail_domain_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def domain_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "mail_domain_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def domain_delete(self, primary_id: int) -> int: + return int(self._c._call("mail_domain_delete", ("primary_id", int(primary_id)))) + + # ---- mail users --------------------------------------------------- + + def user_get(self, primary_id: int | Mapping[str, Any]) -> MailUser | list[MailUser]: + """Fetch a mailuser. + + Pass an int ``mailuser_id`` to get a single row, or a dict of filter + key/values (e.g. ``{'email': 'x@y.com'}``) to get a list. + + .. note:: + ISPConfig's SOAP is inconsistent: a filter dict that matches + multiple rows returns an array, but a filter dict that matches + *exactly one* row sometimes returns a single map instead of a + 1-element array. When a filter dict is passed we always normalize + to a list so callers can iterate without surprises. + """ + if isinstance(primary_id, Mapping): + result = self._c._call("mail_user_get", ("primary_id", dict(primary_id))) + if result is None: + return cast(list[MailUser], []) + if isinstance(result, list): + return cast(list[MailUser], result) + # single-hit quirk — wrap into a list for consistency. + return cast(list[MailUser], [result]) + return cast(MailUser, self._c._call("mail_user_get", ("primary_id", int(primary_id)))) + + def user_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "mail_user_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def user_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "mail_user_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def user_delete(self, primary_id: int) -> int: + return int(self._c._call("mail_user_delete", ("primary_id", int(primary_id)))) + + # ---- mail forward ------------------------------------------------- + + def forward_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "mail_forward_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def forward_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "mail_forward_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def forward_delete(self, primary_id: int) -> int: + return int(self._c._call("mail_forward_delete", ("primary_id", int(primary_id)))) + + # Aliases for callers coming from the old ``mail_forwarding_*`` PHP names. + forwarding_add = forward_add + forwarding_update = forward_update + forwarding_delete = forward_delete + + # ---- helpers ------------------------------------------------------ + + def create_mailbox( + self, + client_id: int, + domain: str, + local_part: str, + password: str, + *, + server_id: int = 1, + quota_mb: int = 1024, + name: str | None = None, + ) -> int: + """Shorthand for ``user_add`` with typical defaults. + + Returns the new ``mailuser_id``. + """ + email = f"{local_part}@{domain}" + params: dict[str, Any] = { + "server_id": int(server_id), + "email": email, + "login": email, + "password": password, + "name": name or local_part, + "quota": int(quota_mb) * 1024 * 1024, # ISPConfig stores bytes + "postfix": "y", + "access": "y", + "disableimap": "n", + "disablepop3": "n", + "disablesmtp": "n", + } + return self.user_add(client_id, params) diff --git a/src/ispconfig/py.typed b/src/ispconfig/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/ispconfig/sites.py b/src/ispconfig/sites.py new file mode 100644 index 0000000..a736669 --- /dev/null +++ b/src/ispconfig/sites.py @@ -0,0 +1,157 @@ +"""``sites.*`` — web domains, databases, database users. + +Footguns baked into these wrappers so callers don't have to rediscover them: + +* ``sites_web_domain_update``'s second positional arg is ``client_id``, **not** + the primary id. For admin-owned sites pass ``client_id=0``. This is a common + footgun — the ISPConfig remote API is inconsistent about this across methods. +* ``sys_groupid=1`` means admin-owned. Pass it through unchanged on update + calls or ISPConfig will reassign ownership. +* ``fastcgi_php_version`` is the legacy field; newer installs use + ``server_php_id`` (int, references ``server_php.server_php_id``). Set the + one your panel version knows about — :meth:`SitesModule.enable_php` + handles this. +""" + +from __future__ import annotations + +import logging +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any, cast + +from .types import Database, DatabaseUser, WebDomain + +if TYPE_CHECKING: + from .client import ISPConfigClient + +log = logging.getLogger("ispconfig.sites") + + +class SitesModule: + def __init__(self, client: ISPConfigClient) -> None: + self._c = client + + # ---- web domain --------------------------------------------------- + + def web_domain_get(self, primary_id: int) -> WebDomain: + """Fetch a single ``web_domain`` row by its ``domain_id``.""" + return cast(WebDomain, self._c._call("sites_web_domain_get", ("primary_id", int(primary_id)))) + + def web_domain_add(self, client_id: int, params: Mapping[str, Any], read_only: bool = False) -> int: + """Create a new site. Returns the new ``domain_id``. + + ``client_id=0`` creates an admin-owned site. + """ + return int(self._c._call( + "sites_web_domain_add", + ("client_id", int(client_id)), + ("params", dict(params)), + ("read_only", bool(read_only)), + )) + + def web_domain_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + """Update a site. + + .. warning:: + The second positional arg is ``client_id``, not ``primary_id``. + Pass 0 for admin-owned. See module docstring. + """ + return int(self._c._call( + "sites_web_domain_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) + + def web_domain_delete(self, primary_id: int) -> int: + return int(self._c._call("sites_web_domain_delete", ("primary_id", int(primary_id)))) + + def web_domain_set_status(self, primary_id: int, status: str) -> int: + """``status`` is typically ``'active'`` or ``'inactive'``.""" + return int(self._c._call( + "sites_web_domain_set_status", + ("primary_id", int(primary_id)), + ("status", status), + )) + + # ---- helpers ------------------------------------------------------ + + def enable_php( + self, + domain_id: int, + mode: str = "fast-cgi", + server_php_id: int = 0, + pm: str = "ondemand", + pm_max_children: int = 10, + pm_start_servers: int = 2, + pm_min_spare_servers: int = 1, + pm_max_spare_servers: int = 5, + ) -> int: + """Flip on PHP with sane defaults. + + Covers the usual field soup: ``php`` (the mode), ``server_php_id`` + (what PHP binary), and the ``pm_*`` family (only relevant for + ``php-fpm`` / ``fast-cgi``). Preserves existing ``client_id`` / + ``sys_groupid`` by reading the record first. + """ + current = self.web_domain_get(domain_id) + client_id = int(current.get("sys_groupid", 0) or 0) + # sys_groupid==1 => admin; pass 0 as client_id for update. + if client_id == 1: + client_id = 0 + params: dict[str, Any] = { + "php": mode, + "server_php_id": int(server_php_id), + "pm": pm, + "pm_max_children": int(pm_max_children), + "pm_start_servers": int(pm_start_servers), + "pm_min_spare_servers": int(pm_min_spare_servers), + "pm_max_spare_servers": int(pm_max_spare_servers), + } + return self.web_domain_update(client_id, domain_id, params) + + def enable_letsencrypt(self, domain_id: int) -> int: + """Turn on SSL + Let's Encrypt + force-https in one shot.""" + current = self.web_domain_get(domain_id) + client_id = int(current.get("sys_groupid", 0) or 0) + if client_id == 1: + client_id = 0 + params = { + "ssl": "y", + "ssl_letsencrypt": "y", + "rewrite_to_https": "y", + } + return self.web_domain_update(client_id, domain_id, params) + + # ---- databases ---------------------------------------------------- + + def database_get(self, primary_id: int) -> Database: + return cast(Database, self._c._call("sites_database_get", ("primary_id", int(primary_id)))) + + def database_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "sites_database_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def database_delete(self, primary_id: int) -> int: + return int(self._c._call("sites_database_delete", ("primary_id", int(primary_id)))) + + def database_user_get(self, primary_id: int) -> DatabaseUser: + return cast(DatabaseUser, self._c._call("sites_database_user_get", ("primary_id", int(primary_id)))) + + def database_user_add(self, client_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "sites_database_user_add", + ("client_id", int(client_id)), + ("params", dict(params)), + )) + + def database_user_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int: + return int(self._c._call( + "sites_database_user_update", + ("client_id", int(client_id)), + ("primary_id", int(primary_id)), + ("params", dict(params)), + )) diff --git a/src/ispconfig/types.py b/src/ispconfig/types.py new file mode 100644 index 0000000..c60f70c --- /dev/null +++ b/src/ispconfig/types.py @@ -0,0 +1,156 @@ +"""Shared TypedDicts for ISPConfig response shapes. + +We use ``TypedDict`` with ``total=False`` because SOAP returns dicts and the +schema varies by ISPConfig version. This gives type hints without forcing +runtime validation (no pydantic). + +Field values from the API are mostly strings (PHP stringifies DB output); +callers that want integers should cast explicitly. +""" + +from __future__ import annotations + +from typing import TypedDict + + +class WebDomain(TypedDict, total=False): + """Row from ``web_domain`` — returned by ``sites_web_domain_get``.""" + + domain_id: str + sys_userid: str + sys_groupid: str + server_id: str + ip_address: str + ipv6_address: str + domain: str + type: str + parent_domain_id: str + vhost_type: str + document_root: str + system_user: str + system_group: str + hd_quota: str + traffic_quota: str + cgi: str + ssi: str + suexec: str + php: str + fastcgi_php_version: str + server_php_id: str + pm: str + pm_max_children: str + pm_start_servers: str + pm_min_spare_servers: str + pm_max_spare_servers: str + ssl: str + ssl_letsencrypt: str + rewrite_to_https: str + active: str + + +class DnsZone(TypedDict, total=False): + id: str + sys_userid: str + sys_groupid: str + server_id: str + origin: str + ns: str + mbox: str + serial: str + refresh: str + retry: str + expire: str + minimum: str + ttl: str + active: str + dnssec_wanted: str + dnssec_initialized: str + + +class DnsRr(TypedDict, total=False): + id: str + zone: str + name: str + type: str + data: str + aux: str + ttl: str + active: str + + +class MailDomain(TypedDict, total=False): + domain_id: str + sys_groupid: str + server_id: str + domain: str + dkim: str + dkim_private: str + dkim_public: str + dkim_selector: str + active: str + + +class MailUser(TypedDict, total=False): + mailuser_id: str + sys_groupid: str + server_id: str + email: str + login: str + password: str + name: str + maildir: str + quota: str + cc: str + homedir: str + autoresponder: str + postfix: str + access: str + disableimap: str + disablepop3: str + disablesmtp: str + + +class MailForward(TypedDict, total=False): + forwarding_id: str + sys_groupid: str + server_id: str + source: str + destination: str + type: str + active: str + + +class Database(TypedDict, total=False): + database_id: str + sys_groupid: str + server_id: str + type: str + database_name: str + database_user_id: str + database_ro_user_id: str + database_charset: str + remote_access: str + remote_ips: str + backup_copies: str + active: str + + +class DatabaseUser(TypedDict, total=False): + database_user_id: str + sys_groupid: str + database_user: str + database_password: str + + +class Client(TypedDict, total=False): + client_id: str + sys_userid: str + sys_groupid: str + username: str + contact_name: str + company_name: str + email: str + customer_no: str + limit_web_domain: str + limit_mail_domain: str + limit_database: str diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..259db99 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,22 @@ +"""Shared fixtures. + +Live-smoke tests read ``ISPCONFIG_TEST_URL``, ``ISPCONFIG_TEST_USER``, and +``ISPCONFIG_TEST_PASS`` from the environment. If any is missing, those tests +are skipped — so the default ``pytest`` run on a laptop never phones home. +""" + +from __future__ import annotations + +import os + +import pytest + + +@pytest.fixture(scope="session") +def live_creds() -> dict[str, str]: + url = os.environ.get("ISPCONFIG_TEST_URL") + user = os.environ.get("ISPCONFIG_TEST_USER") + password = os.environ.get("ISPCONFIG_TEST_PASS") + if not (url and user and password): + pytest.skip("live smoke test: set ISPCONFIG_TEST_URL/USER/PASS to enable") + return {"url": url, "user": user, "password": password} diff --git a/tests/test_smoke.py b/tests/test_smoke.py new file mode 100644 index 0000000..6852f1d --- /dev/null +++ b/tests/test_smoke.py @@ -0,0 +1,47 @@ +"""Live read-only smoke test against a real ISPConfig panel. + +Gated on env vars: ``ISPCONFIG_TEST_URL``, ``ISPCONFIG_TEST_USER``, +``ISPCONFIG_TEST_PASS``. These tests are skipped if any is unset. + +They are **read-only** — no ``_add`` / ``_update`` / ``_delete`` calls. Safe +to run against production (Rackham). +""" + +from __future__ import annotations + +import pytest + +from ispconfig import ISPConfigClient + + +@pytest.fixture() +def client(live_creds: dict[str, str]) -> ISPConfigClient: + with ISPConfigClient(live_creds["url"], live_creds["user"], live_creds["password"]) as c: + yield c # type: ignore[misc] + + +def test_login_returns_session(live_creds: dict[str, str]) -> None: + c = ISPConfigClient(live_creds["url"], live_creds["user"], live_creds["password"]) + c.login() + assert c.session_id and len(c.session_id) > 10 + assert c.logout() is True + + +def test_get_domain_156(client: ISPConfigClient) -> None: + site = client.sites.web_domain_get(156) + assert site["domain"] == "mcbindustrial.com" + assert site["php"] == "fast-cgi" + + +def test_dns_zone_lookup(client: ISPConfigClient) -> None: + zone_id = client.dns.zone_get_id("mcbindustrial.com.") + assert zone_id > 0, "zone_get_id returned 0 — is the zone present?" + + +def test_mail_users_under_mcbindustrial(client: ISPConfigClient) -> None: + # mail_user_get accepts a filter-dict and returns a list. + users = client.mail.user_get({"email": "%@mcbindustrial.com"}) + assert isinstance(users, list) + # Don't assert count — just shape. Zero mailboxes is a valid state. + for u in users: + assert "email" in u diff --git a/tests/test_unit.py b/tests/test_unit.py new file mode 100644 index 0000000..0f30f2b --- /dev/null +++ b/tests/test_unit.py @@ -0,0 +1,187 @@ +"""Pure-unit tests — no network, no ISPConfig. + +We swap in a fake transport so the client-level logic (session management, +retry on expired session, fault translation) can be tested in isolation. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from typing import Any + +import pytest + +from ispconfig import ( + AuthError, + FaultError, + ISPConfigClient, + NotFoundError, + PermissionError, +) +from ispconfig._soap import SoapFault, SoapTransport + + +class _FakeTransport: + def __init__(self, scripted: list[Any]) -> None: + self.scripted = list(scripted) + self.calls: list[tuple[str, tuple[tuple[str, Any], ...]]] = [] + + def call(self, method: str, args: Iterable[tuple[str, Any]]) -> Any: + self.calls.append((method, tuple(args))) + if not self.scripted: + raise AssertionError(f"unexpected call {method}") + result = self.scripted.pop(0) + if isinstance(result, Exception): + raise result + return result + + +def _make_client(transport: _FakeTransport) -> ISPConfigClient: + c = ISPConfigClient("http://fake/remote/index.php", "user", "pass") + c._transport = transport # type: ignore[assignment] + return c + + +def test_login_stores_session_and_logout_clears() -> None: + t = _FakeTransport(["sid-abc", True]) + c = _make_client(t) + c.login() + assert c.session_id == "sid-abc" + assert c.logout() is True + assert c.session_id is None + + +def test_context_manager_auto_login_logout() -> None: + t = _FakeTransport(["sid-123", {"domain": "x.com"}, True]) + c = _make_client(t) + with c: + assert c.session_id == "sid-123" + # first positional arg passed through is the session id. + result = c.sites.web_domain_get(1) + assert result == {"domain": "x.com"} + assert c.session_id is None + assert [call[0] for call in t.calls] == ["login", "sites_web_domain_get", "logout"] + + +def test_session_expired_retry() -> None: + t = _FakeTransport([ + "sid-first", # login + SoapFault("Server", "Session not valid"), # first _call fails + "sid-second", # re-login + {"domain": "x.com"}, # retry succeeds + ]) + c = _make_client(t) + c.login() + result = c.sites.web_domain_get(1) + assert result == {"domain": "x.com"} + # 4 transport calls: login, failed get, login, successful get. + assert [call[0] for call in t.calls] == [ + "login", "sites_web_domain_get", "login", "sites_web_domain_get", + ] + + +def test_session_expired_no_retry_when_disabled() -> None: + t = _FakeTransport([ + "sid-first", + SoapFault("Server", "Session expired"), + ]) + c = ISPConfigClient("http://fake/", "u", "p", max_retries=0) + c._transport = t # type: ignore[assignment] + c.login() + with pytest.raises(AuthError): + c.sites.web_domain_get(1) + + +def test_fault_mapping_auth() -> None: + t = _FakeTransport([SoapFault("Server", "Login failed")]) + c = _make_client(t) + with pytest.raises(AuthError): + c.login() + + +def test_fault_mapping_permission() -> None: + t = _FakeTransport(["sid", SoapFault("Server", "Permission denied")]) + c = _make_client(t) + c.login() + with pytest.raises(PermissionError): + c.sites.web_domain_get(1) + + +def test_fault_mapping_not_found() -> None: + t = _FakeTransport(["sid", SoapFault("Server", "No records found")]) + c = _make_client(t) + c.login() + with pytest.raises(NotFoundError): + c.sites.web_domain_get(999) + + +def test_fault_mapping_generic() -> None: + t = _FakeTransport(["sid", SoapFault("Server", "something weird")]) + c = _make_client(t) + c.login() + with pytest.raises(FaultError): + c.sites.web_domain_get(1) + + +def test_update_client_id_footgun_passes_through() -> None: + """``web_domain_update`` must send ``client_id`` as the 2nd positional arg.""" + t = _FakeTransport(["sid", 1]) + c = _make_client(t) + c.login() + c.sites.web_domain_update(0, 156, {"php": "fast-cgi"}) + _, args = t.calls[-1] + assert args[0][0] == "session_id" + assert args[1] == ("client_id", 0) + assert args[2] == ("primary_id", 156) + assert args[3][0] == "params" + + +def test_envelope_encoding_map_and_scalars() -> None: + """Smoke test for the XML encoder — catches regressions.""" + xml = SoapTransport._build_envelope( + "sites_web_domain_update", + ( + ("session_id", "abc"), + ("client_id", 0), + ("primary_id", 156), + ("params", {"php": "fast-cgi", "active": "y"}), + ), + ) + assert "abc<" in xml + assert '0' in xml + assert 'ns2:Map' in xml + assert 'php' in xml + assert 'fast-cgi' in xml + + +def test_response_parsing_map() -> None: + body = b''' + + + + + domainmcb.com + activey + + + +''' + result = SoapTransport._parse_response("sites_web_domain_get", body) + assert result == {"domain": "mcb.com", "active": "y"} + + +def test_response_parsing_fault() -> None: + body = b''' + + + SOAP-ENV:Server + Login failed. + +''' + with pytest.raises(SoapFault) as excinfo: + SoapTransport._parse_response("login", body) + assert "Login failed" in excinfo.value.faultstring