From cf921de9b668c0b8ea6f3885c2f9815134cf91c3 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 15 Jul 2024 12:42:06 +0200 Subject: [PATCH] more tests and refinements --- chatmaild/src/chatmaild/tests/plugin.py | 19 +++++++ cmdeploy/src/cmdeploy/chatmail.zone.j2 | 15 +++--- cmdeploy/src/cmdeploy/cmdeploy.py | 40 +++++++++++---- cmdeploy/src/cmdeploy/dns.py | 34 +++---------- cmdeploy/src/cmdeploy/remote_funcs.py | 2 +- cmdeploy/src/cmdeploy/tests/test_dns.py | 67 ++++++++++++++++++------- 6 files changed, 114 insertions(+), 63 deletions(-) diff --git a/chatmaild/src/chatmaild/tests/plugin.py b/chatmaild/src/chatmaild/tests/plugin.py index bd245c41..c122c725 100644 --- a/chatmaild/src/chatmaild/tests/plugin.py +++ b/chatmaild/src/chatmaild/tests/plugin.py @@ -79,3 +79,22 @@ def maildata(name, from_addr, to_addr, subject="..."): return BytesParser(policy=policy.default).parsebytes(text.encode()) return maildata + + +@pytest.fixture +def mockout(): + class MockOut: + captured_red = [] + captured_green = [] + captured_plain = [] + + def red(self, msg): + self.captured_red.append(msg) + + def green(self, msg): + self.captured_green.append(msg) + + def __call__(self, msg): + self.captured_plain.append(msg) + + return MockOut() diff --git a/cmdeploy/src/cmdeploy/chatmail.zone.j2 b/cmdeploy/src/cmdeploy/chatmail.zone.j2 index c2d49b8b..491952e2 100644 --- a/cmdeploy/src/cmdeploy/chatmail.zone.j2 +++ b/cmdeploy/src/cmdeploy/chatmail.zone.j2 @@ -1,9 +1,11 @@ +# # Required DNS entries for chatmail servers -{% if ipv4 %} -{{ chatmail_domain }}. A {{ ipv4 }} +# +{% if A %} +{{ chatmail_domain }}. A {{ A }} {% endif %} -{% if ipv6 %} -{{ chatmail_domain }}. AAAA {{ ipv6 }} +{% if AAAA %} +{{ chatmail_domain }}. AAAA {{ AAAA }} {% endif %} {{ chatmail_domain }}. MX 10 {{ chatmail_domain }}. _mta-sts.{{ chatmail_domain }}. TXT "v=STSv1; id={{ sts_id }}" @@ -11,7 +13,9 @@ mta-sts.{{ chatmail_domain }}. CNAME {{ chatmail_domain }}. www.{{ chatmail_domain }}. CNAME {{ chatmail_domain }}. {{ dkim_entry }} -# Recommended DNS entries +# +# Recommended DNS entries for interoperability and security-hardening +# {{ chatmail_domain }}. TXT "v=spf1 a:{{ chatmail_domain }} ~all" _dmarc.{{ chatmail_domain }}. TXT "v=DMARC1;p=reject;adkim=s;aspf=s" @@ -20,7 +24,6 @@ _dmarc.{{ chatmail_domain }}. TXT "v=DMARC1;p=reject;adkim=s;aspf=s" {% endif %} _adsp._domainkey.{{ chatmail_domain }}. TXT "dkim=discardable" -# The following are technically not required for Delta Chat _submission._tcp.{{ chatmail_domain }}. SRV 0 1 587 {{ chatmail_domain }}. _submissions._tcp.{{ chatmail_domain }}. SRV 0 1 465 {{ chatmail_domain }}. _imap._tcp.{{ chatmail_domain }}. SRV 0 1 143 {{ chatmail_domain }}. diff --git a/cmdeploy/src/cmdeploy/cmdeploy.py b/cmdeploy/src/cmdeploy/cmdeploy.py index 0c99befe..28d1439e 100644 --- a/cmdeploy/src/cmdeploy/cmdeploy.py +++ b/cmdeploy/src/cmdeploy/cmdeploy.py @@ -7,6 +7,7 @@ import importlib.resources import importlib.util import os +import pathlib import shutil import subprocess import sys @@ -54,7 +55,8 @@ def run_cmd_options(parser): def run_cmd(args, out): """Deploy chatmail services on the remote server.""" - remote_data = dns.get_initial_remote_data(args) + sshexec = args.get_sshexec() + remote_data = dns.get_initial_remote_data(sshexec, args.config.mail_domain) if not dns.check_initial_remote_data(remote_data, print=out.red): return 1 @@ -80,16 +82,37 @@ def dns_cmd_options(parser): parser.add_argument( "--zonefile", dest="zonefile", - help="print the whole zonefile for deploying directly", + type=pathlib.Path, + default=None, + help="write out a zonefile", ) def dns_cmd(args, out): """Check DNS entries and optionally generate dns zone file.""" - remote_data = dns.get_initial_remote_data(args) + sshexec = args.get_sshexec() + remote_data = dns.get_initial_remote_data(sshexec, args.config.mail_domain) if not remote_data: return 1 - retcode = dns.show_dns(args, out, remote_data) + + if not remote_data["acme_account_url"]: + out.red("could not get letsencrypt account url, please run 'cmdeploy run'") + return 1 + + if not remote_data["dkim_entry"]: + out.red("could not determine dkim_entry, please run 'cmdeploy run'") + return 1 + + zonefile = dns.get_filled_zone_file(remote_data) + + if args.zonefile: + args.zonefile.write_text(zonefile) + out.green(f"DNS records successfully written to: {args.zonefile}") + return 0 + + retcode = dns.check_full_zone( + sshexec, remote_data=remote_data, zonefile=zonefile, out=out + ) return retcode @@ -283,14 +306,9 @@ def main(args=None): if not hasattr(args, "func"): return parser.parse_args(["-h"]) - ssh_cache = [] - def get_sshexec(): - if not ssh_cache: - print(f"[ssh] login to {args.config.mail_domain}") - ssh = SSHExec(args.config.mail_domain, remote_funcs, verbose=args.verbose) - ssh_cache.append(ssh) - return ssh_cache[0] + print(f"[ssh] login to {args.config.mail_domain}") + return SSHExec(args.config.mail_domain, remote_funcs, verbose=args.verbose) args.get_sshexec = get_sshexec diff --git a/cmdeploy/src/cmdeploy/dns.py b/cmdeploy/src/cmdeploy/dns.py index 3540b0ee..4dd987ba 100644 --- a/cmdeploy/src/cmdeploy/dns.py +++ b/cmdeploy/src/cmdeploy/dns.py @@ -6,9 +6,7 @@ from . import remote_funcs -def get_initial_remote_data(args): - sshexec = args.get_sshexec() - mail_domain = args.config.mail_domain +def get_initial_remote_data(sshexec, mail_domain): return sshexec.logged( call=remote_funcs.perform_initial_checks, kwargs=dict(mail_domain=mail_domain) ) @@ -25,7 +23,7 @@ def check_initial_remote_data(remote_data, print=print): return remote_data -def get_filled_zone_file(remote_data, mail_domain): +def get_filled_zone_file(remote_data): sts_id = remote_data.get("sts_id") if not sts_id: sts_id = datetime.datetime.now().strftime("%Y%m%d%H%M") @@ -33,12 +31,12 @@ def get_filled_zone_file(remote_data, mail_domain): template = importlib.resources.files(__package__).joinpath("chatmail.zone.j2") content = template.read_text() zonefile = Template(content).render( - acme_account_url=remote_data.get("acme_account_url"), + acme_account_url=remote_data["acme_account_url"], dkim_entry=remote_data["dkim_entry"], - ipv4=remote_data["A"], - ipv6=remote_data["AAAA"], + A=remote_data["A"], + AAAA=remote_data["AAAA"], sts_id=sts_id, - chatmail_domain=mail_domain, + chatmail_domain=remote_data["mail_domain"], ) lines = [x.strip() for x in zonefile.split("\n") if x.strip()] lines.append("") @@ -46,28 +44,10 @@ def get_filled_zone_file(remote_data, mail_domain): return zonefile -def show_dns(args, out, remote_data) -> int: +def check_full_zone(sshexec, remote_data, out, zonefile) -> int: """Check existing DNS records, optionally write them to zone file and return (exitcode, remote_data) tuple.""" - sshexec = args.get_sshexec() - - if not remote_data["acme_account_url"]: - out.red("could not get letsencrypt account url, please run 'cmdeploy run'") - return 1 - - if not remote_data["dkim_entry"]: - out.red("could not determine dkim_entry, please run 'cmdeploy run'") - return 1 - - zonefile = get_filled_zone_file(remote_data, args.config.mail_domain) - - if getattr(args, "zonefile", None): - with open(args.zonefile, "w+") as zf: - zf.write(zonefile) - out.green(f"DNS records successfully written to: {args.zonefile}") - return 0 - required_diff, recommended_diff = sshexec.logged( remote_funcs.check_zonefile, kwargs=dict(zonefile=zonefile) ) diff --git a/cmdeploy/src/cmdeploy/remote_funcs.py b/cmdeploy/src/cmdeploy/remote_funcs.py index 365b4b48..c8423467 100644 --- a/cmdeploy/src/cmdeploy/remote_funcs.py +++ b/cmdeploy/src/cmdeploy/remote_funcs.py @@ -31,7 +31,7 @@ def get_systemd_running(): def perform_initial_checks(mail_domain): - """Collecting initial DNS zone content.""" + """Collecting initial DNS settings.""" assert mail_domain A = query_dns("A", mail_domain) AAAA = query_dns("AAAA", mail_domain) diff --git a/cmdeploy/src/cmdeploy/tests/test_dns.py b/cmdeploy/src/cmdeploy/tests/test_dns.py index f03bd311..f20e4e2c 100644 --- a/cmdeploy/src/cmdeploy/tests/test_dns.py +++ b/cmdeploy/src/cmdeploy/tests/test_dns.py @@ -1,7 +1,7 @@ import pytest from cmdeploy import remote_funcs -from cmdeploy.dns import check_initial_remote_data +from cmdeploy.dns import check_full_zone, check_initial_remote_data @pytest.fixture @@ -59,9 +59,13 @@ def test_perform_initial_checks_no_mta_sts(self, mockdns): assert len(l) == 2 -def parse_zonefile_into_dict(zonefile, mockdns_base): +def parse_zonefile_into_dict(zonefile, mockdns_base, only_required=False): for zf_line in zonefile.split("\n"): - if not zf_line.strip() or zf_line.startswith("#"): + if zf_line.startswith("#"): + if "Recommended" in zf_line and only_required: + return + continue + if not zf_line.strip(): continue zf_domain, zf_typ, zf_value = zf_line.split(maxsplit=2) zf_domain = zf_domain.rstrip(".") @@ -69,18 +73,45 @@ def parse_zonefile_into_dict(zonefile, mockdns_base): mockdns_base.setdefault(zf_typ, {})[zf_domain] = zf_value -def test_check_zonefile_all_ok(data, mockdns_base): - zonefile = data.get("zftest.zone") - parse_zonefile_into_dict(zonefile, mockdns_base) - required_diff, recommended_diff = remote_funcs.check_zonefile(zonefile) - assert not required_diff and not recommended_diff - - -def test_check_zonefile_recommended_not_set(data, mockdns_base): - zonefile = data.get("zftest.zone") - - zonefile_mocked = zonefile.split("# Recommended")[0] - parse_zonefile_into_dict(zonefile_mocked, mockdns_base) - required_diff, recommended_diff = remote_funcs.check_zonefile(zonefile) - assert not required_diff - assert len(recommended_diff) == 8 +class MockSSHExec: + def logged(self, func, kwargs): + return func(**kwargs) + + def call(self, func, kwargs): + return func(**kwargs) + + +class TestZonefileChecks: + def test_check_zonefile_all_ok(self, data, mockdns_base): + zonefile = data.get("zftest.zone") + parse_zonefile_into_dict(zonefile, mockdns_base) + required_diff, recommended_diff = remote_funcs.check_zonefile(zonefile) + assert not required_diff and not recommended_diff + + def test_check_zonefile_recommended_not_set(self, data, mockdns_base): + zonefile = data.get("zftest.zone") + zonefile_mocked = zonefile.split("# Recommended")[0] + parse_zonefile_into_dict(zonefile_mocked, mockdns_base) + required_diff, recommended_diff = remote_funcs.check_zonefile(zonefile) + assert not required_diff + assert len(recommended_diff) == 8 + + def test_check_zonefile_output_required_fine(self, data, mockdns_base, mockout): + zonefile = data.get("zftest.zone") + zonefile_mocked = zonefile.split("# Recommended")[0] + parse_zonefile_into_dict(zonefile_mocked, mockdns_base, only_required=True) + mssh = MockSSHExec() + res = check_full_zone(mssh, mockdns_base, out=mockout, zonefile=zonefile) + assert res == 0 + assert "WARNING" in mockout.captured_plain[0] + assert len(mockout.captured_plain) == 9 + + def test_check_zonefile_output_full(self, data, mockdns_base, mockout): + zonefile = data.get("zftest.zone") + parse_zonefile_into_dict(zonefile, mockdns_base) + mssh = MockSSHExec() + res = check_full_zone(mssh, mockdns_base, out=mockout, zonefile=zonefile) + assert res == 0 + assert not mockout.captured_red + assert "correct" in mockout.captured_green[0] + assert not mockout.captured_red