feat: initial ispconfig-py SDK (sites / dns / mail / databases / clients)

Python 3.10+ SDK wrapping the ISPConfig remote SOAP API so we stop writing
throwaway PHP snippets every time we need to touch a site, zone, or mailbox.

Why no zeep: ISPConfig's /remote/index.php exposes PHP SoapServer in non-WSDL
mode and refuses WSDL generation (?wsdl returns a fault). zeep requires WSDL,
so the stated dependency wouldn't actually work. Instead we hand-roll SOAP
envelopes with the stdlib (urllib + xml.etree). Zero runtime deps.

Structure:
- src/ispconfig/_soap.py        — envelope encode/decode, fault surfacing
- src/ispconfig/client.py       — ISPConfigClient context manager, retry
- src/ispconfig/exceptions.py   — ISPConfigError / Auth / Permission / NotFound / Fault
- src/ispconfig/sites.py        — web_domain get/add/update/delete + enable_php / enable_letsencrypt helpers
- src/ispconfig/dns.py          — zones + A/CNAME/MX/TXT records, dns_a_add type-column workaround
- src/ispconfig/mail.py         — mail domains, users, forwards, create_mailbox helper
- src/ispconfig/databases.py    — convenience facade over sites_database_*
- src/ispconfig/clients.py      — client + sys_groupid lookups
- src/ispconfig/types.py        — TypedDicts for response shapes (no pydantic)

Institutional knowledge baked into docstrings + README footgun list:
- sites_web_domain_update 2nd arg is client_id (not primary_id); admin = 0
- sys_groupid=1 -> pass client_id=0 on update, else ownership churns
- fastcgi_php_version vs server_php_id depending on panel version
- dns_a_add type-column bug (<= 3.2.11) — wrapper issues follow-up update
- dns_zone_get_id wants origin WITHOUT trailing dot on 3.2.11+ (contrary to
  what the older snippets say). Verified live against Rackham 2026-04-22.
- mail_user_get returns a bare map on exactly-one-match filter dicts —
  wrapper normalizes to list
- session timeouts mid-op: client detects + re-auths once (max_retries knob)

Tests:
- tests/test_unit.py   — 12 unit tests against a fake transport
- tests/test_smoke.py  — live read-only smoke test, gated on env vars:
    ISPCONFIG_TEST_URL, ISPCONFIG_TEST_USER, ISPCONFIG_TEST_PASS
  Covers login, web_domain_get(156), dns_zone_get_id, mail_user_get filter.

Tooling:
- mypy strict-ish (disallow untyped defs, warn-return-any, no implicit optional)
- ruff with E/F/W/I/B/UP/N/SLF/RUF lint sets
- pip install -e .[dev] for pytest / mypy / ruff
This commit is contained in:
Kayos 2026-04-22 13:24:58 -07:00
commit 9438b4e751
19 changed files with 1797 additions and 0 deletions

18
.gitignore vendored Normal file
View file

@ -0,0 +1,18 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
.venv/
venv/
env/
.mypy_cache/
.ruff_cache/
.pytest_cache/
*.egg-info/
dist/
build/
.env
.envrc
.vscode/
.idea/

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Sulkta Coop
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

199
README.md Normal file
View file

@ -0,0 +1,199 @@
# ispconfig-py
Python SDK for the ISPConfig remote SOAP API. Internal tooling for the Sulkta
Coop — wraps the panel's SOAP surface so we stop writing throw-away PHP
scripts every time we need to touch a site, zone, or mailbox.
The SDK covers the methods we actually use today (sites, DNS, mail, databases,
clients). More methods can be added as needed.
## Why no zeep?
ISPConfig's `/remote/index.php` uses PHP's `SoapServer` in non-WSDL mode and
refuses to generate a WSDL (`?wsdl` returns a fault). zeep requires WSDL. So
we hand-roll SOAP envelopes with the stdlib (`urllib` + `xml.etree`). Zero
runtime dependencies — fewer moving parts, nothing to pin.
## Install
```bash
pip install -e . # for use
pip install -e .[dev] # with pytest / mypy / ruff
```
Python 3.10+.
## Quick start
```python
from ispconfig import ISPConfigClient
with ISPConfigClient(
"https://panel.example.com:8080/remote/index.php",
username="kayos",
password="hunter2",
) as c:
site = c.sites.web_domain_get(156)
print(site["domain"], site["php"])
# Flip on Let's Encrypt in one call.
c.sites.enable_letsencrypt(156)
```
The `with` block handles login on enter and logout on exit. Session IDs are
managed internally — callers don't touch them.
Set `verify_ssl=False` for dev boxes with self-signed certs. Default is `True`.
## Modules at a glance
### `sites`
```python
c.sites.web_domain_get(156)
c.sites.web_domain_update(client_id=0, primary_id=156, params={"php": "fast-cgi"})
c.sites.enable_php(156, mode="php-fpm", server_php_id=2, pm="ondemand")
c.sites.enable_letsencrypt(156)
```
### `dns`
```python
zone_id = c.dns.zone_get_id("example.com.") # note the trailing dot
zone = c.dns.zone_get(zone_id)
records = c.dns.rr_get_all_by_zone(zone_id)
rr_id = c.dns.a_add(0, {"zone": zone_id, "name": "www", "data": "1.2.3.4", "ttl": 3600, "active": "Y"})
```
### `mail`
```python
md = c.mail.domain_get_by_domain("example.com")
users = c.mail.user_get({"email": "%@example.com"})
new_id = c.mail.create_mailbox(client_id=5, domain="example.com",
local_part="info", password="x", quota_mb=2048)
```
### `databases`
```python
db = c.databases.get(42)
c.databases.user_update(client_id=5, primary_id=42, params={"database_password": "x"})
```
### `clients`
```python
cli = c.clients.get_by_username("jacob")
groupid = c.clients.get_groupid(cli["client_id"])
```
## Footguns (captured here so nobody has to rediscover them)
- **`sites_web_domain_update`'s second arg is `client_id`, not `primary_id`.**
For admin-owned sites pass `client_id=0`. Wrong order = permission denied
or silent ownership change. `SitesModule.web_domain_update` signature
enforces the correct order.
- **`sys_groupid=1` is the admin group.** On update calls, if you read a
record with `sys_groupid=1`, pass `client_id=0`, not `1`, or ISPConfig
re-owns the record to "admin's admin". `enable_php` / `enable_letsencrypt`
handle this automatically.
- **`fastcgi_php_version` vs `server_php_id`.** Older installs use the string
field `fastcgi_php_version` (e.g. `"PHP-8.2:/usr/bin/php-cgi8.2:/etc/php/8.2/cgi"`),
newer installs use `server_php_id` (int FK to `server_php`). Our
`enable_php` wraps the new field; set it directly via `web_domain_update`
if you need the old one.
- **BIND trailing dot on zone origins — BUT in reverse.** Contrary to a lot
of older documentation and PHP snippets floating around, ISPConfig 3.2.11+
wants the origin **without** a trailing dot: `dns_zone_get_id("example.com")`
works, `dns_zone_get_id("example.com.")` raises `no_domain_found`. Our
wrapper strips a trailing dot for you so either call works. Verified
against Rackham 2026-04-22.
- **`mail_user_get` with a filter dict returns inconsistent shapes.** If the
filter matches multiple rows you get an array; exactly-one match returns a
bare map. Our `mail.user_get(filter_dict)` always normalizes to a list.
- **`no_domain_found` fault.** Both `dns_zone_get_id` and (in some paths)
`mail_domain_get_by_domain` return a SOAP fault with `faultcode=no_domain_found`
when the record is missing. Mapped to `NotFoundError`.
- **`dns_a_add` type-column bug.** On some ISPConfig versions (<= ~3.2.11)
`dns_a_add` inserts the `dns_rr` row without setting the `type` column,
so BIND never emits the record. `DnsModule.a_add(..., fix_type_bug=True)`
(default) issues a follow-up `dns_rr_update` with `{"type": "A"}` — no-op
on fixed versions, required on the broken ones.
- **Session timeouts.** ISPConfig sessions expire mid-long-operation without
warning. The client detects the "session not valid" fault, re-authenticates,
and retries once (`max_retries=1` by default). Tune via
`ISPConfigClient(..., max_retries=3)` for very long batch jobs.
- **`mail_forward_*`, not `mail_forwarding_*`.** The remote method is
singular; we expose it as `c.mail.forward_add` and keep
`c.mail.forwarding_add` as an alias for PHP-snippet refugees.
- **Filter dicts on `mail_user_get`.** Pass an int to get one row; pass a
dict like `{"email": "%@example.com"}` to get a list. The SOAP method is
overloaded and untyped on the wire.
## Errors
Typed hierarchy — never `zeep.exceptions.Fault` (we don't use zeep), never a
raw SOAP fault leaked to callers:
```
ISPConfigError
├── AuthError # login failed OR session expired
├── PermissionError # caller lacks rights on the record
├── NotFoundError # primary_id didn't match anything
└── FaultError # everything else (.faultcode, .faultstring)
```
## Tests
```bash
pytest # unit tests only, no network
```
To run the live smoke test against a real panel:
```bash
export ISPCONFIG_TEST_URL="https://panel.example.com:8080/remote/index.php"
export ISPCONFIG_TEST_USER="kayos"
export ISPCONFIG_TEST_PASS="..."
pytest tests/test_smoke.py
```
The smoke test is read-only — no `_add` / `_update` / `_delete` calls. Safe
against production.
## Development
```bash
pip install -e .[dev]
ruff check src/
mypy src/ispconfig
pytest
```
## Not yet covered
The remote API surface is huge. These are intentionally left out of v0.1 —
add as needed:
- `sites_web_aliasdomain_*`, `sites_web_subdomain_*`,
`sites_web_vhost_{subdomain,aliasdomain}_*`
- `sites_ftp_user_*`, `sites_shell_user_*`, `sites_webdav_user_*`
- `sites_cron_*`
- `sites_web_domain_backup`, `sites_web_domain_backup_list`,
`mail_user_backup`, `mail_user_backup_list`
- `dns_{aaaa,ns,srv,ptr,tlsa,ds,caa,sshfp,dname,loc,hinfo,naptr,rp,alias}_*`
- `dns_slave_*`, `dns_zone_set_dnssec`, `dns_zone_get_by_user`, `dns_templatezone_*`
- `mail_alias_*`, `mail_aliasdomain_*`, `mail_catchall_*`, `mail_filter_*`,
`mail_fetchmail_*`, `mail_mailinglist_*`, `mail_policy_*`,
`mail_relay_{domain,recipient}_*`, `mail_transport_*`,
`mail_{whitelist,blacklist}_*`, `mail_spamfilter_*`, `mail_user_filter_*`
- `client_add`, `client_update`, `client_delete`, `client_change_password`,
`client_template_additional_*`, `client_templates_get_all`,
`client_login_get`
- `server_get`, `server_get_all`, `admin.*`, `monitor.*`, `aps.*`,
`openvz.*`, `domains.*`
## License
MIT — see `LICENSE`.

95
pyproject.toml Normal file
View file

@ -0,0 +1,95 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "ispconfig"
version = "0.1.0"
description = "Python SDK for the ISPConfig remote SOAP API — Sulkta Coop internal tooling."
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.10"
authors = [
{ name = "Sulkta Coop" },
]
# Zero runtime deps on purpose. ISPConfig's SOAP endpoint disables WSDL
# generation (?wsdl returns a fault), so zeep can't help us anyway — we
# hand-roll envelopes with the stdlib.
dependencies = []
[project.optional-dependencies]
dev = [
"pytest>=7",
"mypy>=1.8",
"ruff>=0.3",
]
[project.urls]
Homepage = "http://192.168.0.5:3001/Sulkta-Coop/ispconfig-py"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
ispconfig = ["py.typed"]
# ---- mypy -----------------------------------------------------------
[tool.mypy]
python_version = "3.10"
packages = ["ispconfig"]
mypy_path = "src"
strict_optional = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_return_any = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
no_implicit_optional = true
# We intentionally don't enable `disallow_any_expr` — SOAP responses are
# dicts of Any and forcing typing everywhere would create fake certainty.
[[tool.mypy.overrides]]
module = "ispconfig.types"
disallow_untyped_defs = false
# ---- ruff -----------------------------------------------------------
[tool.ruff]
line-length = 110
target-version = "py310"
src = ["src"]
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"F", # pyflakes
"W", # pycodestyle warnings
"I", # isort
"B", # bugbear
"UP", # pyupgrade
"N", # pep8-naming
"SLF", # flake8-self (private access)
"RUF",
]
ignore = [
"E501", # line length — handled by formatter when it cares
"N818", # exception suffix — our hierarchy predates this rule
"B008", # function calls in argument defaults — not our style anyway
]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["SLF001", "N802"]
# Submodules call back into the client's dispatcher (`_call`) by design —
# it's the single chokepoint for session management and retry logic.
"src/ispconfig/sites.py" = ["SLF001"]
"src/ispconfig/dns.py" = ["SLF001"]
"src/ispconfig/mail.py" = ["SLF001"]
"src/ispconfig/clients.py" = ["SLF001"]
# ---- pytest ---------------------------------------------------------
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-ra"

36
src/ispconfig/__init__.py Normal file
View file

@ -0,0 +1,36 @@
"""Python SDK for the ISPConfig remote SOAP API.
Quick start::
from ispconfig import ISPConfigClient
with ISPConfigClient("https://panel.example.com:8080/remote/index.php",
"admin", "password") as c:
site = c.sites.web_domain_get(156)
print(site["domain"], site["php"])
See the individual module docstrings for the full set of wrapped calls.
"""
from __future__ import annotations
from .client import ISPConfigClient
from .exceptions import (
AuthError,
FaultError,
ISPConfigError,
NotFoundError,
PermissionError,
)
__version__ = "0.1.0"
__all__ = [
"AuthError",
"FaultError",
"ISPConfigClient",
"ISPConfigError",
"NotFoundError",
"PermissionError",
"__version__",
]

262
src/ispconfig/_soap.py Normal file
View file

@ -0,0 +1,262 @@
"""Low-level SOAP transport for the ISPConfig remote API.
ISPConfig's ``/remote/index.php`` exposes PHP ``SoapServer`` in non-WSDL mode
and refuses to generate a WSDL (``?wsdl`` returns a fault). That kills zeep,
which requires WSDL. So we hand-roll the envelopes over ``urllib`` ISPConfig
only uses a handful of XSD scalar types plus ``xml-soap`` Map/Array, and the
responses are parseable in ~80 lines of stdlib XML.
This module is private. Callers stick to :class:`~ispconfig.client.ISPConfigClient`.
"""
from __future__ import annotations
import logging
import ssl
import urllib.error
import urllib.request
from collections.abc import Iterable, Mapping
from typing import Any
from xml.etree import ElementTree as ET
from xml.sax.saxutils import escape as xml_escape
log = logging.getLogger("ispconfig")
_NS = {
"soap": "http://schemas.xmlsoap.org/soap/envelope/",
"xsd": "http://www.w3.org/2001/XMLSchema",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
"ns1": "urn:ispconfig",
}
# PHP SoapServer emits xmlns:ns1="/remote/index.php" — we match by localname
# rather than by namespace, so responses decode regardless of the ns1 URI.
class SoapFault(Exception):
"""Raw SOAP fault. Mapped to typed exceptions by the client layer."""
def __init__(self, faultcode: str, faultstring: str) -> None:
super().__init__(f"{faultcode}: {faultstring}")
self.faultcode = faultcode
self.faultstring = faultstring
class SoapTransport:
"""Minimal non-WSDL SOAP transport over HTTPS."""
def __init__(self, url: str, *, verify_ssl: bool = True, timeout: float = 30.0) -> None:
self.url = url
self.timeout = timeout
if verify_ssl:
self._ctx = ssl.create_default_context()
else:
self._ctx = ssl._create_unverified_context() # noqa: SLF001
# ---- public API ---------------------------------------------------
def call(self, method: str, args: Iterable[tuple[str, Any]]) -> Any:
"""Invoke a SOAP method with positional params (name, value) pairs.
ISPConfig's SOAP dispatch uses positional args — the local names
``session_id``, ``primary_id``, ``params`` etc. are cosmetic; what
actually matters is order.
"""
envelope = self._build_envelope(method, args)
body = self._post(method, envelope)
return self._parse_response(method, body)
# ---- internals ----------------------------------------------------
def _post(self, method: str, envelope: str) -> bytes:
req = urllib.request.Request(
self.url,
data=envelope.encode("utf-8"),
headers={
"Content-Type": "text/xml; charset=UTF-8",
"SOAPAction": f"urn:ispconfig#{method}",
"User-Agent": "ispconfig-py/0.1",
},
)
try:
with urllib.request.urlopen(req, context=self._ctx, timeout=self.timeout) as resp:
data: bytes = resp.read()
return data
except urllib.error.HTTPError as e:
# PHP SoapServer returns faults as HTTP 500 with a fault envelope
# in the body — still parseable.
if e.code == 500 and e.headers.get("Content-Type", "").startswith("text/xml"):
err_data: bytes = e.read()
return err_data
raise
@staticmethod
def _build_envelope(method: str, args: Iterable[tuple[str, Any]]) -> str:
arg_xml = "".join(_encode_arg(name, value) for name, value in args)
return (
'<?xml version="1.0" encoding="UTF-8"?>'
'<SOAP-ENV:Envelope'
' xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"'
' xmlns:ns1="urn:ispconfig"'
' xmlns:xsd="http://www.w3.org/2001/XMLSchema"'
' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'
' xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/">'
f'<SOAP-ENV:Body><ns1:{method}>{arg_xml}</ns1:{method}></SOAP-ENV:Body>'
'</SOAP-ENV:Envelope>'
)
@staticmethod
def _parse_response(method: str, body: bytes) -> Any:
try:
root = ET.fromstring(body)
except ET.ParseError as e:
raise SoapFault("Client.ParseError", f"Invalid XML: {e}") from e
# Walk by local-name so the bogus ns1="/remote/index.php" doesn't bite us.
fault = _find_local(root, "Fault")
if fault is not None:
code = _text(_find_local(fault, "faultcode")) or "Server"
msg = _text(_find_local(fault, "faultstring")) or "Unknown SOAP fault"
raise SoapFault(code, msg)
resp = _find_local(root, f"{method}Response")
if resp is None:
raise SoapFault(
"Client.UnexpectedResponse",
f"No <{method}Response> element in SOAP body",
)
ret = _find_local(resp, "return")
if ret is None:
return None
return _decode(ret)
# ---- XML encoding -----------------------------------------------------
def _encode_arg(name: str, value: Any) -> str:
if isinstance(value, bool):
return f'<{name} xsi:type="xsd:boolean">{"true" if value else "false"}</{name}>'
if isinstance(value, int):
return f'<{name} xsi:type="xsd:int">{value}</{name}>'
if isinstance(value, float):
return f'<{name} xsi:type="xsd:double">{value}</{name}>'
if isinstance(value, str):
return f'<{name} xsi:type="xsd:string">{xml_escape(value)}</{name}>'
if value is None:
return f'<{name} xsi:nil="true"/>'
if isinstance(value, Mapping):
items = "".join(
f'<item><key xsi:type="xsd:string">{xml_escape(str(k))}</key>'
f'{_encode_value_tag("value", v)}</item>'
for k, v in value.items()
)
return f'<{name} xsi:type="ns2:Map" xmlns:ns2="http://xml.apache.org/xml-soap">{items}</{name}>'
if isinstance(value, (list, tuple)):
items = "".join(_encode_value_tag("item", v) for v in value)
return (
f'<{name} xsi:type="SOAP-ENC:Array" '
f'SOAP-ENC:arrayType="xsd:anyType[{len(value)}]">{items}</{name}>'
)
raise TypeError(f"Cannot encode {type(value).__name__} for SOAP arg {name!r}")
def _encode_value_tag(tag: str, value: Any) -> str:
# Same as _encode_arg but with a shared `value`/`item` tag name.
return _encode_arg(tag, value)
# ---- XML decoding -----------------------------------------------------
def _decode(el: ET.Element) -> Any:
xsi_type = _xsi_type(el)
# Arrays first — a response can be an Array of Maps, and the outer element
# has <item xsi:type="Map">...</item> children that we must NOT confuse
# with map entries.
if (xsi_type and xsi_type.endswith(":Array")) or _is_array(el):
return _decode_array(el)
if (xsi_type and xsi_type.endswith(":Map")) or _is_map(el):
return _decode_map(el)
if xsi_type and xsi_type.endswith(":boolean"):
return (el.text or "").strip().lower() == "true"
if xsi_type and (xsi_type.endswith(":int") or xsi_type.endswith(":long")):
try:
return int((el.text or "").strip())
except ValueError:
return el.text
if xsi_type and (xsi_type.endswith(":double") or xsi_type.endswith(":float")):
try:
return float((el.text or "").strip())
except ValueError:
return el.text
nil = el.get("{http://www.w3.org/2001/XMLSchema-instance}nil")
if nil == "true":
return None
# Structs where ISPConfig returns arrays of scalars (e.g. ints) still
# render as strings — PHP's $app->db->queryAll output is stringified.
return el.text if el.text is not None else ""
def _decode_map(el: ET.Element) -> dict[str, Any]:
out: dict[str, Any] = {}
for item in _iter_local(el, "item"):
key_el = _find_local(item, "key")
val_el = _find_local(item, "value")
if key_el is None:
continue
key = (key_el.text or "").strip()
out[key] = _decode(val_el) if val_el is not None else None
return out
def _decode_array(el: ET.Element) -> list[Any]:
return [_decode(child) for child in _iter_local(el, "item")]
def _is_map(el: ET.Element) -> bool:
# apache-style Map with no xsi:type on the outer element but item/key/value
# children. Check DIRECT children only — descendants may match for a
# deeply nested array-of-maps and fool us.
kids = list(el)
if not kids:
return False
if not all(_local(k.tag) == "item" for k in kids):
return False
return any(
any(_local(gk.tag) == "key" for gk in item)
for item in kids
)
def _is_array(el: ET.Element) -> bool:
arr = el.get("{http://schemas.xmlsoap.org/soap/encoding/}arrayType")
return arr is not None
def _xsi_type(el: ET.Element) -> str | None:
return el.get("{http://www.w3.org/2001/XMLSchema-instance}type")
# ---- XML walking (namespace-agnostic) ---------------------------------
def _local(tag: str) -> str:
return tag.rsplit("}", 1)[-1]
def _text(el: ET.Element | None) -> str | None:
return el.text if el is not None else None
def _find_local(root: ET.Element, name: str) -> ET.Element | None:
if _local(root.tag) == name:
return root
for el in root.iter():
if _local(el.tag) == name:
return el
return None
def _iter_local(parent: ET.Element, name: str) -> list[ET.Element]:
return [el for el in parent if _local(el.tag) == name]

130
src/ispconfig/client.py Normal file
View file

@ -0,0 +1,130 @@
"""``ISPConfigClient`` — top-level entry point, session lifecycle, submodule wiring."""
from __future__ import annotations
import logging
from types import TracebackType
from typing import Any
from . import exceptions as _exc
from ._soap import SoapFault, SoapTransport
from .clients import ClientsModule
from .databases import DatabasesModule
from .dns import DnsModule
from .mail import MailModule
from .sites import SitesModule
log = logging.getLogger("ispconfig")
class ISPConfigClient:
"""High-level client for the ISPConfig remote SOAP API.
Use as a context manager to auto-login on enter and auto-logout on exit::
with ISPConfigClient(url, "kayos", "hunter2") as c:
site = c.sites.web_domain_get(156)
print(site["domain"])
Session IDs are managed internally and never returned to callers.
If a call fails with a session-expired fault, the client re-authenticates
once and retries (controlled by ``max_retries``).
"""
def __init__(
self,
url: str,
username: str,
password: str,
*,
verify_ssl: bool = True,
timeout: float = 30.0,
max_retries: int = 1,
) -> None:
self._url = url
self._username = username
self._password = password
self._max_retries = max_retries
self._transport = SoapTransport(url, verify_ssl=verify_ssl, timeout=timeout)
self._session_id: str | None = None
self.sites = SitesModule(self)
self.dns = DnsModule(self)
self.mail = MailModule(self)
self.databases = DatabasesModule(self)
self.clients = ClientsModule(self)
# ---- context manager ---------------------------------------------
def __enter__(self) -> ISPConfigClient:
self.login()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
try:
self.logout()
except Exception as e: # pragma: no cover — cleanup path, never raise out
log.warning("logout on __exit__ failed: %s", e)
# ---- session lifecycle -------------------------------------------
def login(self) -> None:
"""Open a session. Stores the session id internally."""
try:
sid = self._transport.call(
"login",
(("username", self._username), ("password", self._password)),
)
except SoapFault as f:
raise _exc.map_fault(f.faultcode, f.faultstring) from f
if not isinstance(sid, str) or not sid:
raise _exc.AuthError("login returned empty session id")
self._session_id = sid
log.debug("ispconfig login ok (session %s...)", sid[:8])
def logout(self) -> bool:
"""Close the session. Safe to call even if never logged in."""
if self._session_id is None:
return False
sid = self._session_id
self._session_id = None
try:
result = self._transport.call("logout", (("session_id", sid),))
except SoapFault as f:
log.debug("logout fault ignored: %s", f)
return False
return bool(result)
@property
def session_id(self) -> str | None:
"""Read-only accessor — exposed for debugging, not for API calls."""
return self._session_id
# ---- the hot path ------------------------------------------------
def _call(self, method: str, *args: tuple[str, Any]) -> Any:
"""Invoke ``method(session_id, *args)`` with typed error mapping + retry.
This is what the submodules call. It prepends the session id,
translates SOAP faults, and retries once on session expiry.
"""
attempts = 0
while True:
if self._session_id is None:
self.login()
sid_arg = ("session_id", self._session_id or "")
try:
return self._transport.call(method, (sid_arg, *args))
except SoapFault as f:
mapped = _exc.map_fault(f.faultcode, f.faultstring)
if _exc.is_session_expired(mapped) and attempts < self._max_retries:
log.info("ispconfig session expired, re-authenticating")
self._session_id = None
attempts += 1
continue
raise mapped from f

48
src/ispconfig/clients.py Normal file
View file

@ -0,0 +1,48 @@
"""``clients.*`` — ISPConfig client/customer lookups.
ISPConfig's client model has two IDs you'll trip over:
* ``client_id`` primary key of the ``client`` table.
* ``sys_groupid`` the "owner group" used by sites/dns/mail records.
``sys_groupid=1`` is the admin group. For regular clients, ``sys_groupid``
is *not* equal to ``client_id``; you must look it up with
:meth:`ClientsModule.get_groupid`.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, cast
from .types import Client
if TYPE_CHECKING:
from .client import ISPConfigClient
class ClientsModule:
def __init__(self, client: ISPConfigClient) -> None:
self._c = client
def get(self, primary_id: int) -> Client:
return cast(Client, self._c._call("client_get", ("primary_id", int(primary_id))))
def get_groupid(self, client_id: int) -> int:
"""Look up ``sys_groupid`` for a given ``client_id``."""
result = self._c._call("client_get_groupid", ("client_id", int(client_id)))
return int(result) if result else 0
def get_id(self, sys_userid: int) -> int:
"""Reverse of :meth:`get_groupid` — client_id from a sys user id."""
result = self._c._call("client_get_id", ("sys_userid", int(sys_userid)))
return int(result) if result else 0
def get_by_username(self, username: str) -> Client:
return cast(Client, self._c._call("client_get_by_username", ("username", username)))
def get_by_groupid(self, groupid: int) -> Client:
return cast(Client, self._c._call("client_get_by_groupid", ("groupid", int(groupid))))
def get_all(self) -> list[int]:
"""Return every ``client_id`` the API user can see."""
result = self._c._call("client_get_all")
return [int(x) for x in (result or [])]

View file

@ -0,0 +1,39 @@
"""``databases.*`` — thin convenience wrapper around the sites-level DB calls.
The underlying remote methods live under ``sites_database_*`` in ISPConfig,
but callers reasonably expect a top-level ``client.databases`` namespace.
This module just delegates to :class:`~ispconfig.sites.SitesModule`.
"""
from __future__ import annotations
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any
from .types import Database, DatabaseUser
if TYPE_CHECKING:
from .client import ISPConfigClient
class DatabasesModule:
def __init__(self, client: ISPConfigClient) -> None:
self._c = client
def get(self, primary_id: int) -> Database:
return self._c.sites.database_get(primary_id)
def add(self, client_id: int, params: Mapping[str, Any]) -> int:
return self._c.sites.database_add(client_id, params)
def delete(self, primary_id: int) -> int:
return self._c.sites.database_delete(primary_id)
def user_get(self, primary_id: int) -> DatabaseUser:
return self._c.sites.database_user_get(primary_id)
def user_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return self._c.sites.database_user_add(client_id, params)
def user_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return self._c.sites.database_user_update(client_id, primary_id, params)

159
src/ispconfig/dns.py Normal file
View file

@ -0,0 +1,159 @@
"""``dns.*`` — zones and resource records.
Footguns:
* Zone origins **must end in a dot** per BIND convention:
``dns_zone_get_id(..., 'example.com.')`` without the trailing dot
ISPConfig returns 0 and you'll spend 20 minutes wondering why.
* ``dns_a_add`` in some ISPConfig builds (<= 3.2.11-ish) has a known bug
where the ``type`` column in ``dns_rr`` is not populated by the add
handler, which means the record exists in the table but BIND never
emits it. Workaround: after add, ``dns_rr_update`` with ``{'type':'A'}``
to force the column. :meth:`DnsModule.a_add` does this for you and
emits a warning log check your ISPConfig version.
"""
from __future__ import annotations
import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, cast
from .types import DnsRr, DnsZone
if TYPE_CHECKING:
from .client import ISPConfigClient
log = logging.getLogger("ispconfig.dns")
class DnsModule:
def __init__(self, client: ISPConfigClient) -> None:
self._c = client
# ---- zones --------------------------------------------------------
def zone_get(self, primary_id: int) -> DnsZone:
return cast(DnsZone, self._c._call("dns_zone_get", ("primary_id", int(primary_id))))
def zone_get_id(self, origin: str) -> int:
"""Resolve a zone origin to its ``dns_soa.id``.
.. note::
Despite BIND convention, ISPConfig's ``dns_zone_get_id`` wants the
origin **without** a trailing dot ``"example.com"``, not
``"example.com."``. Passing the dotted form raises a
``no_domain_found`` fault. This wrapper strips a trailing dot if
present so callers can be lazy either way.
Returns 0 if the zone does not exist (``NotFoundError`` caught and
converted; other faults propagate).
"""
# ISPConfig's internal lookup compares against the stored `origin`
# column, which can be either with or without a dot depending on how
# the zone was created. Historically the spec said "must end in a dot"
# — that turns out to be wrong against 3.2.11+.
normalized = origin.rstrip(".")
try:
result = self._c._call("dns_zone_get_id", ("origin", normalized))
except Exception as e: # pragma: no cover — exception classes tested in unit tests
if "no_domain_found" in str(e).lower() or "invalid domain" in str(e).lower():
return 0
raise
return int(result) if result else 0
def zone_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"dns_zone_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def zone_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"dns_zone_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def zone_delete(self, primary_id: int) -> int:
return int(self._c._call("dns_zone_delete", ("primary_id", int(primary_id))))
def rr_get_all_by_zone(self, zone_id: int) -> list[DnsRr]:
return self._c._call("dns_rr_get_all_by_zone", ("zone_id", int(zone_id))) or []
# ---- records ------------------------------------------------------
def a_add(self, client_id: int, params: Mapping[str, Any], *, fix_type_bug: bool = True) -> int:
"""Add an A record.
On some ISPConfig versions the ``type`` column of ``dns_rr`` is not
set by ``dns_a_add``. When ``fix_type_bug=True`` (default), we follow
up with a ``dns_rr_update`` that writes ``type='A'``. No-op on fixed
versions, harmless.
"""
rr_id = int(self._c._call(
"dns_a_add",
("client_id", int(client_id)),
("params", dict(params)),
))
if fix_type_bug and rr_id:
try:
self._c._call(
"dns_rr_update",
("client_id", int(client_id)),
("primary_id", rr_id),
("params", {"type": "A"}),
)
except Exception as e: # pragma: no cover — workaround logs & swallows
log.warning("dns.a_add: type-column workaround failed: %s", e)
return rr_id
def a_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"dns_a_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def a_delete(self, primary_id: int) -> int:
return int(self._c._call("dns_a_delete", ("primary_id", int(primary_id))))
def cname_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"dns_cname_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def mx_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"dns_mx_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def txt_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"dns_txt_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def rr_get(self, method_prefix: str, primary_id: int) -> DnsRr:
"""Generic ``dns_{type}_get`` — e.g. ``rr_get('cname', 42)``."""
return cast(DnsRr, self._c._call(f"dns_{method_prefix}_get", ("primary_id", int(primary_id))))
def rr_update(self, type_: str, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
"""Generic ``dns_{type}_update``. ``type_`` is ``a``, ``cname``, etc."""
return int(self._c._call(
f"dns_{type_}_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def rr_delete(self, type_: str, primary_id: int) -> int:
return int(self._c._call(f"dns_{type_}_delete", ("primary_id", int(primary_id))))

View file

@ -0,0 +1,69 @@
"""Typed exceptions for the ISPConfig SDK.
Never leak raw ``_soap.SoapFault`` to callers map to one of these.
"""
from __future__ import annotations
class ISPConfigError(Exception):
"""Base exception for all SDK errors."""
class AuthError(ISPConfigError):
"""Authentication failed, or session has expired."""
class PermissionError(ISPConfigError):
"""Caller lacks permission for this operation.
Name intentionally shadows the builtin readers grepping for
``PermissionError`` in ISPConfig code should find this.
"""
class NotFoundError(ISPConfigError):
"""Requested record does not exist."""
class FaultError(ISPConfigError):
"""Generic SOAP fault we haven't classified."""
def __init__(self, faultcode: str, faultstring: str) -> None:
super().__init__(f"{faultcode}: {faultstring}")
self.faultcode = faultcode
self.faultstring = faultstring
def map_fault(faultcode: str, faultstring: str) -> ISPConfigError:
"""Translate a raw SOAP fault into a typed :class:`ISPConfigError`.
ISPConfig doesn't use structured fault subcodes — detection is by string.
"""
msg = faultstring.lower()
if "login failed" in msg or "login_error" in msg or "could not login" in msg:
return AuthError(faultstring)
if "session" in msg and ("expired" in msg or "not valid" in msg or "invalid" in msg):
return AuthError(faultstring)
if "permission denied" in msg or "you do not have the permissions" in msg or "not allowed" in msg:
return PermissionError(faultstring)
if (
"no records found" in msg
or "not found" in msg
or "no record found" in msg
or "no_domain_found" in faultcode.lower()
or "invalid domain name" in msg
):
return NotFoundError(faultstring)
return FaultError(faultcode, faultstring)
def is_session_expired(exc: BaseException) -> bool:
"""True if the exception looks like ISPConfig's session-timeout fault.
Used by :class:`ISPConfigClient` to decide whether to re-auth + retry.
"""
if not isinstance(exc, AuthError):
return False
msg = str(exc).lower()
return "session" in msg and ("expired" in msg or "invalid" in msg or "not valid" in msg)

152
src/ispconfig/mail.py Normal file
View file

@ -0,0 +1,152 @@
"""``mail.*`` — mail domains, mailboxes, forwarders.
Note: the ISPConfig remote method is ``mail_forward_*`` (singular), not
``mail_forwarding_*``. Our wrapper exposes :meth:`MailModule.forward_add`
etc; the old ``forwarding`` names are available as aliases for anyone coming
from the PHP snippets.
"""
from __future__ import annotations
import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, cast
from .types import MailDomain, MailUser
if TYPE_CHECKING:
from .client import ISPConfigClient
log = logging.getLogger("ispconfig.mail")
class MailModule:
def __init__(self, client: ISPConfigClient) -> None:
self._c = client
# ---- mail domains -------------------------------------------------
def domain_get(self, primary_id: int) -> MailDomain:
return cast(MailDomain, self._c._call("mail_domain_get", ("primary_id", int(primary_id))))
def domain_get_by_domain(self, domain: str) -> list[MailDomain]:
return self._c._call("mail_domain_get_by_domain", ("domain", domain)) or []
def domain_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"mail_domain_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def domain_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"mail_domain_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def domain_delete(self, primary_id: int) -> int:
return int(self._c._call("mail_domain_delete", ("primary_id", int(primary_id))))
# ---- mail users ---------------------------------------------------
def user_get(self, primary_id: int | Mapping[str, Any]) -> MailUser | list[MailUser]:
"""Fetch a mailuser.
Pass an int ``mailuser_id`` to get a single row, or a dict of filter
key/values (e.g. ``{'email': 'x@y.com'}``) to get a list.
.. note::
ISPConfig's SOAP is inconsistent: a filter dict that matches
multiple rows returns an array, but a filter dict that matches
*exactly one* row sometimes returns a single map instead of a
1-element array. When a filter dict is passed we always normalize
to a list so callers can iterate without surprises.
"""
if isinstance(primary_id, Mapping):
result = self._c._call("mail_user_get", ("primary_id", dict(primary_id)))
if result is None:
return cast(list[MailUser], [])
if isinstance(result, list):
return cast(list[MailUser], result)
# single-hit quirk — wrap into a list for consistency.
return cast(list[MailUser], [result])
return cast(MailUser, self._c._call("mail_user_get", ("primary_id", int(primary_id))))
def user_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"mail_user_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def user_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"mail_user_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def user_delete(self, primary_id: int) -> int:
return int(self._c._call("mail_user_delete", ("primary_id", int(primary_id))))
# ---- mail forward -------------------------------------------------
def forward_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"mail_forward_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def forward_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"mail_forward_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def forward_delete(self, primary_id: int) -> int:
return int(self._c._call("mail_forward_delete", ("primary_id", int(primary_id))))
# Aliases for callers coming from the old ``mail_forwarding_*`` PHP names.
forwarding_add = forward_add
forwarding_update = forward_update
forwarding_delete = forward_delete
# ---- helpers ------------------------------------------------------
def create_mailbox(
self,
client_id: int,
domain: str,
local_part: str,
password: str,
*,
server_id: int = 1,
quota_mb: int = 1024,
name: str | None = None,
) -> int:
"""Shorthand for ``user_add`` with typical defaults.
Returns the new ``mailuser_id``.
"""
email = f"{local_part}@{domain}"
params: dict[str, Any] = {
"server_id": int(server_id),
"email": email,
"login": email,
"password": password,
"name": name or local_part,
"quota": int(quota_mb) * 1024 * 1024, # ISPConfig stores bytes
"postfix": "y",
"access": "y",
"disableimap": "n",
"disablepop3": "n",
"disablesmtp": "n",
}
return self.user_add(client_id, params)

0
src/ispconfig/py.typed Normal file
View file

157
src/ispconfig/sites.py Normal file
View file

@ -0,0 +1,157 @@
"""``sites.*`` — web domains, databases, database users.
Footguns baked into these wrappers so callers don't have to rediscover them:
* ``sites_web_domain_update``'s second positional arg is ``client_id``, **not**
the primary id. For admin-owned sites pass ``client_id=0``. This is a common
footgun the ISPConfig remote API is inconsistent about this across methods.
* ``sys_groupid=1`` means admin-owned. Pass it through unchanged on update
calls or ISPConfig will reassign ownership.
* ``fastcgi_php_version`` is the legacy field; newer installs use
``server_php_id`` (int, references ``server_php.server_php_id``). Set the
one your panel version knows about :meth:`SitesModule.enable_php`
handles this.
"""
from __future__ import annotations
import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, cast
from .types import Database, DatabaseUser, WebDomain
if TYPE_CHECKING:
from .client import ISPConfigClient
log = logging.getLogger("ispconfig.sites")
class SitesModule:
def __init__(self, client: ISPConfigClient) -> None:
self._c = client
# ---- web domain ---------------------------------------------------
def web_domain_get(self, primary_id: int) -> WebDomain:
"""Fetch a single ``web_domain`` row by its ``domain_id``."""
return cast(WebDomain, self._c._call("sites_web_domain_get", ("primary_id", int(primary_id))))
def web_domain_add(self, client_id: int, params: Mapping[str, Any], read_only: bool = False) -> int:
"""Create a new site. Returns the new ``domain_id``.
``client_id=0`` creates an admin-owned site.
"""
return int(self._c._call(
"sites_web_domain_add",
("client_id", int(client_id)),
("params", dict(params)),
("read_only", bool(read_only)),
))
def web_domain_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
"""Update a site.
.. warning::
The second positional arg is ``client_id``, not ``primary_id``.
Pass 0 for admin-owned. See module docstring.
"""
return int(self._c._call(
"sites_web_domain_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))
def web_domain_delete(self, primary_id: int) -> int:
return int(self._c._call("sites_web_domain_delete", ("primary_id", int(primary_id))))
def web_domain_set_status(self, primary_id: int, status: str) -> int:
"""``status`` is typically ``'active'`` or ``'inactive'``."""
return int(self._c._call(
"sites_web_domain_set_status",
("primary_id", int(primary_id)),
("status", status),
))
# ---- helpers ------------------------------------------------------
def enable_php(
self,
domain_id: int,
mode: str = "fast-cgi",
server_php_id: int = 0,
pm: str = "ondemand",
pm_max_children: int = 10,
pm_start_servers: int = 2,
pm_min_spare_servers: int = 1,
pm_max_spare_servers: int = 5,
) -> int:
"""Flip on PHP with sane defaults.
Covers the usual field soup: ``php`` (the mode), ``server_php_id``
(what PHP binary), and the ``pm_*`` family (only relevant for
``php-fpm`` / ``fast-cgi``). Preserves existing ``client_id`` /
``sys_groupid`` by reading the record first.
"""
current = self.web_domain_get(domain_id)
client_id = int(current.get("sys_groupid", 0) or 0)
# sys_groupid==1 => admin; pass 0 as client_id for update.
if client_id == 1:
client_id = 0
params: dict[str, Any] = {
"php": mode,
"server_php_id": int(server_php_id),
"pm": pm,
"pm_max_children": int(pm_max_children),
"pm_start_servers": int(pm_start_servers),
"pm_min_spare_servers": int(pm_min_spare_servers),
"pm_max_spare_servers": int(pm_max_spare_servers),
}
return self.web_domain_update(client_id, domain_id, params)
def enable_letsencrypt(self, domain_id: int) -> int:
"""Turn on SSL + Let's Encrypt + force-https in one shot."""
current = self.web_domain_get(domain_id)
client_id = int(current.get("sys_groupid", 0) or 0)
if client_id == 1:
client_id = 0
params = {
"ssl": "y",
"ssl_letsencrypt": "y",
"rewrite_to_https": "y",
}
return self.web_domain_update(client_id, domain_id, params)
# ---- databases ----------------------------------------------------
def database_get(self, primary_id: int) -> Database:
return cast(Database, self._c._call("sites_database_get", ("primary_id", int(primary_id))))
def database_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"sites_database_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def database_delete(self, primary_id: int) -> int:
return int(self._c._call("sites_database_delete", ("primary_id", int(primary_id))))
def database_user_get(self, primary_id: int) -> DatabaseUser:
return cast(DatabaseUser, self._c._call("sites_database_user_get", ("primary_id", int(primary_id))))
def database_user_add(self, client_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"sites_database_user_add",
("client_id", int(client_id)),
("params", dict(params)),
))
def database_user_update(self, client_id: int, primary_id: int, params: Mapping[str, Any]) -> int:
return int(self._c._call(
"sites_database_user_update",
("client_id", int(client_id)),
("primary_id", int(primary_id)),
("params", dict(params)),
))

156
src/ispconfig/types.py Normal file
View file

@ -0,0 +1,156 @@
"""Shared TypedDicts for ISPConfig response shapes.
We use ``TypedDict`` with ``total=False`` because SOAP returns dicts and the
schema varies by ISPConfig version. This gives type hints without forcing
runtime validation (no pydantic).
Field values from the API are mostly strings (PHP stringifies DB output);
callers that want integers should cast explicitly.
"""
from __future__ import annotations
from typing import TypedDict
class WebDomain(TypedDict, total=False):
"""Row from ``web_domain`` — returned by ``sites_web_domain_get``."""
domain_id: str
sys_userid: str
sys_groupid: str
server_id: str
ip_address: str
ipv6_address: str
domain: str
type: str
parent_domain_id: str
vhost_type: str
document_root: str
system_user: str
system_group: str
hd_quota: str
traffic_quota: str
cgi: str
ssi: str
suexec: str
php: str
fastcgi_php_version: str
server_php_id: str
pm: str
pm_max_children: str
pm_start_servers: str
pm_min_spare_servers: str
pm_max_spare_servers: str
ssl: str
ssl_letsencrypt: str
rewrite_to_https: str
active: str
class DnsZone(TypedDict, total=False):
id: str
sys_userid: str
sys_groupid: str
server_id: str
origin: str
ns: str
mbox: str
serial: str
refresh: str
retry: str
expire: str
minimum: str
ttl: str
active: str
dnssec_wanted: str
dnssec_initialized: str
class DnsRr(TypedDict, total=False):
id: str
zone: str
name: str
type: str
data: str
aux: str
ttl: str
active: str
class MailDomain(TypedDict, total=False):
domain_id: str
sys_groupid: str
server_id: str
domain: str
dkim: str
dkim_private: str
dkim_public: str
dkim_selector: str
active: str
class MailUser(TypedDict, total=False):
mailuser_id: str
sys_groupid: str
server_id: str
email: str
login: str
password: str
name: str
maildir: str
quota: str
cc: str
homedir: str
autoresponder: str
postfix: str
access: str
disableimap: str
disablepop3: str
disablesmtp: str
class MailForward(TypedDict, total=False):
forwarding_id: str
sys_groupid: str
server_id: str
source: str
destination: str
type: str
active: str
class Database(TypedDict, total=False):
database_id: str
sys_groupid: str
server_id: str
type: str
database_name: str
database_user_id: str
database_ro_user_id: str
database_charset: str
remote_access: str
remote_ips: str
backup_copies: str
active: str
class DatabaseUser(TypedDict, total=False):
database_user_id: str
sys_groupid: str
database_user: str
database_password: str
class Client(TypedDict, total=False):
client_id: str
sys_userid: str
sys_groupid: str
username: str
contact_name: str
company_name: str
email: str
customer_no: str
limit_web_domain: str
limit_mail_domain: str
limit_database: str

0
tests/__init__.py Normal file
View file

22
tests/conftest.py Normal file
View file

@ -0,0 +1,22 @@
"""Shared fixtures.
Live-smoke tests read ``ISPCONFIG_TEST_URL``, ``ISPCONFIG_TEST_USER``, and
``ISPCONFIG_TEST_PASS`` from the environment. If any is missing, those tests
are skipped so the default ``pytest`` run on a laptop never phones home.
"""
from __future__ import annotations
import os
import pytest
@pytest.fixture(scope="session")
def live_creds() -> dict[str, str]:
url = os.environ.get("ISPCONFIG_TEST_URL")
user = os.environ.get("ISPCONFIG_TEST_USER")
password = os.environ.get("ISPCONFIG_TEST_PASS")
if not (url and user and password):
pytest.skip("live smoke test: set ISPCONFIG_TEST_URL/USER/PASS to enable")
return {"url": url, "user": user, "password": password}

47
tests/test_smoke.py Normal file
View file

@ -0,0 +1,47 @@
"""Live read-only smoke test against a real ISPConfig panel.
Gated on env vars: ``ISPCONFIG_TEST_URL``, ``ISPCONFIG_TEST_USER``,
``ISPCONFIG_TEST_PASS``. These tests are skipped if any is unset.
They are **read-only** no ``_add`` / ``_update`` / ``_delete`` calls. Safe
to run against production (Rackham).
"""
from __future__ import annotations
import pytest
from ispconfig import ISPConfigClient
@pytest.fixture()
def client(live_creds: dict[str, str]) -> ISPConfigClient:
with ISPConfigClient(live_creds["url"], live_creds["user"], live_creds["password"]) as c:
yield c # type: ignore[misc]
def test_login_returns_session(live_creds: dict[str, str]) -> None:
c = ISPConfigClient(live_creds["url"], live_creds["user"], live_creds["password"])
c.login()
assert c.session_id and len(c.session_id) > 10
assert c.logout() is True
def test_get_domain_156(client: ISPConfigClient) -> None:
site = client.sites.web_domain_get(156)
assert site["domain"] == "mcbindustrial.com"
assert site["php"] == "fast-cgi"
def test_dns_zone_lookup(client: ISPConfigClient) -> None:
zone_id = client.dns.zone_get_id("mcbindustrial.com.")
assert zone_id > 0, "zone_get_id returned 0 — is the zone present?"
def test_mail_users_under_mcbindustrial(client: ISPConfigClient) -> None:
# mail_user_get accepts a filter-dict and returns a list.
users = client.mail.user_get({"email": "%@mcbindustrial.com"})
assert isinstance(users, list)
# Don't assert count — just shape. Zero mailboxes is a valid state.
for u in users:
assert "email" in u

187
tests/test_unit.py Normal file
View file

@ -0,0 +1,187 @@
"""Pure-unit tests — no network, no ISPConfig.
We swap in a fake transport so the client-level logic (session management,
retry on expired session, fault translation) can be tested in isolation.
"""
from __future__ import annotations
from collections.abc import Iterable
from typing import Any
import pytest
from ispconfig import (
AuthError,
FaultError,
ISPConfigClient,
NotFoundError,
PermissionError,
)
from ispconfig._soap import SoapFault, SoapTransport
class _FakeTransport:
def __init__(self, scripted: list[Any]) -> None:
self.scripted = list(scripted)
self.calls: list[tuple[str, tuple[tuple[str, Any], ...]]] = []
def call(self, method: str, args: Iterable[tuple[str, Any]]) -> Any:
self.calls.append((method, tuple(args)))
if not self.scripted:
raise AssertionError(f"unexpected call {method}")
result = self.scripted.pop(0)
if isinstance(result, Exception):
raise result
return result
def _make_client(transport: _FakeTransport) -> ISPConfigClient:
c = ISPConfigClient("http://fake/remote/index.php", "user", "pass")
c._transport = transport # type: ignore[assignment]
return c
def test_login_stores_session_and_logout_clears() -> None:
t = _FakeTransport(["sid-abc", True])
c = _make_client(t)
c.login()
assert c.session_id == "sid-abc"
assert c.logout() is True
assert c.session_id is None
def test_context_manager_auto_login_logout() -> None:
t = _FakeTransport(["sid-123", {"domain": "x.com"}, True])
c = _make_client(t)
with c:
assert c.session_id == "sid-123"
# first positional arg passed through is the session id.
result = c.sites.web_domain_get(1)
assert result == {"domain": "x.com"}
assert c.session_id is None
assert [call[0] for call in t.calls] == ["login", "sites_web_domain_get", "logout"]
def test_session_expired_retry() -> None:
t = _FakeTransport([
"sid-first", # login
SoapFault("Server", "Session not valid"), # first _call fails
"sid-second", # re-login
{"domain": "x.com"}, # retry succeeds
])
c = _make_client(t)
c.login()
result = c.sites.web_domain_get(1)
assert result == {"domain": "x.com"}
# 4 transport calls: login, failed get, login, successful get.
assert [call[0] for call in t.calls] == [
"login", "sites_web_domain_get", "login", "sites_web_domain_get",
]
def test_session_expired_no_retry_when_disabled() -> None:
t = _FakeTransport([
"sid-first",
SoapFault("Server", "Session expired"),
])
c = ISPConfigClient("http://fake/", "u", "p", max_retries=0)
c._transport = t # type: ignore[assignment]
c.login()
with pytest.raises(AuthError):
c.sites.web_domain_get(1)
def test_fault_mapping_auth() -> None:
t = _FakeTransport([SoapFault("Server", "Login failed")])
c = _make_client(t)
with pytest.raises(AuthError):
c.login()
def test_fault_mapping_permission() -> None:
t = _FakeTransport(["sid", SoapFault("Server", "Permission denied")])
c = _make_client(t)
c.login()
with pytest.raises(PermissionError):
c.sites.web_domain_get(1)
def test_fault_mapping_not_found() -> None:
t = _FakeTransport(["sid", SoapFault("Server", "No records found")])
c = _make_client(t)
c.login()
with pytest.raises(NotFoundError):
c.sites.web_domain_get(999)
def test_fault_mapping_generic() -> None:
t = _FakeTransport(["sid", SoapFault("Server", "something weird")])
c = _make_client(t)
c.login()
with pytest.raises(FaultError):
c.sites.web_domain_get(1)
def test_update_client_id_footgun_passes_through() -> None:
"""``web_domain_update`` must send ``client_id`` as the 2nd positional arg."""
t = _FakeTransport(["sid", 1])
c = _make_client(t)
c.login()
c.sites.web_domain_update(0, 156, {"php": "fast-cgi"})
_, args = t.calls[-1]
assert args[0][0] == "session_id"
assert args[1] == ("client_id", 0)
assert args[2] == ("primary_id", 156)
assert args[3][0] == "params"
def test_envelope_encoding_map_and_scalars() -> None:
"""Smoke test for the XML encoder — catches regressions."""
xml = SoapTransport._build_envelope(
"sites_web_domain_update",
(
("session_id", "abc"),
("client_id", 0),
("primary_id", 156),
("params", {"php": "fast-cgi", "active": "y"}),
),
)
assert "<session_id" in xml and ">abc<" in xml
assert '<client_id xsi:type="xsd:int">0</client_id>' in xml
assert 'ns2:Map' in xml
assert '<key xsi:type="xsd:string">php</key>' in xml
assert '<value xsi:type="xsd:string">fast-cgi</value>' in xml
def test_response_parsing_map() -> None:
body = b'''<?xml version="1.0"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:ns1="/remote/index.php"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ns2="http://xml.apache.org/xml-soap">
<SOAP-ENV:Body>
<ns1:sites_web_domain_getResponse>
<return xsi:type="ns2:Map">
<item><key xsi:type="xsd:string">domain</key><value xsi:type="xsd:string">mcb.com</value></item>
<item><key xsi:type="xsd:string">active</key><value xsi:type="xsd:string">y</value></item>
</return>
</ns1:sites_web_domain_getResponse>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>'''
result = SoapTransport._parse_response("sites_web_domain_get", body)
assert result == {"domain": "mcb.com", "active": "y"}
def test_response_parsing_fault() -> None:
body = b'''<?xml version="1.0"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
<SOAP-ENV:Body><SOAP-ENV:Fault>
<faultcode>SOAP-ENV:Server</faultcode>
<faultstring>Login failed.</faultstring>
</SOAP-ENV:Fault></SOAP-ENV:Body>
</SOAP-ENV:Envelope>'''
with pytest.raises(SoapFault) as excinfo:
SoapTransport._parse_response("login", body)
assert "Login failed" in excinfo.value.faultstring