diff --git a/tools/send-updates.py b/tools/send-updates.py new file mode 100755 index 00000000..454e7aaa --- /dev/null +++ b/tools/send-updates.py @@ -0,0 +1,534 @@ +#!/usr/bin/env python3 +import configparser +import getpass +import itertools +import email.message +import enum +import os +import smtplib +import sys +import textwrap + +from datetime import datetime + +import xml.etree.ElementTree as etree + + +DESCRIPTION = """\ +Send email updates for XEP changes based on the difference between two \ +xeplist files.""" + +EPILOG = """\ +Configuration file contents: + +[smtp] +host= +port=587 +user= +password= +from=
+ +If user is omitted, anonymous mail sending is attempted. + +If options are missing from the configuration file and the standard input and \ +standard output are a terminal, the script interactively asks for the option \ +values. If no terminal is connected, the script exits with an error instead.""" + + +class Status(enum.Enum): + PROTO = 'ProtoXEP' + EXPERIMENTAL = 'Experimental' + PROPOSED = 'Proposed' + DRAFT = 'Draft' + ACTIVE = 'Active' + FINAL = 'Final' + RETRACTED = 'Retracted' + OBSOLETE = 'Obsolete' + DEFERRED = 'Deferred' + REJECTED = 'Rejected' + DEPRECATED = 'Deprecated' + + @classmethod + def fromstr(cls, s): + if s == "Proto" or s.lower() == "protoxep": + s = "ProtoXEP" + return cls(s) + + +class Action(enum.Enum): + PROTO = "Proposed XMPP Extension" + NEW = "NEW" + DRAFT = "DRAFT" + ACTIVE = "ACTIVE" + FINAL = "FINAL" + RETRACT = "RETRACTED" + OBSOLETE = "OBSOLETED" + DEFER = "DEFERRED" + UPDATE = "UPDATED" + + @classmethod + def fromstatus(cls, status): + return { + Status.EXPERIMENTAL: cls.NEW, + Status.DRAFT: cls.DRAFT, + Status.ACTIVE: cls.ACTIVE, + Status.FINAL: cls.FINAL, + Status.RETRACTED: cls.RETRACT, + Status.OBSOLETED: cls.OBSOLETE, + Status.DEPRECATED: cls.DEPRECATE, + Status.DEFERRED: cls.DEFERRED, + } + + +XEP_URL_PREFIX = "https://xmpp.org/extensions/" + + +MAIL_PROTO_TEMPLATE = """\ +The XMPP Extensions Editor has received a proposal for a new XEP. + +Title: {info[title]} +Abstract: +{info[abstract]} + +URL: {url} + +The {approver} will decide in the next two weeks whether to accept this \ +proposal as an official XEP.""" + + +SUBJECT_PROTO_TEMPLATE = "Proposed XMPP Extension: {info[title]}" + + +MAIL_NONPROTO_TEMPLATE = """\ +Version {info[last_revision][version]} of XEP-{info[number]:04d} \ +({info[title]}) has been released. + +Abstract: +{info[abstract]} + +Changelog: +{changelog} + +URL: {url}""" + + +SUBJECT_NONPROTO_TEMPLATE = \ + "{action.value}: XEP-{info[number]:04d} ({info[title]})" + + +def load_xepinfo(el): + accepted = el.get("accepted").lower() == "true" + + info = { + "title": el.find("title").text, + "abstract": el.find("abstract").text, + "type": el.find("type").text, + "status": Status.fromstr(el.find("status").text), + "approver": el.find("approver").text, + "accepted": accepted, + } + + last_revision_el = el.find("last-revision") + if last_revision_el is not None: + last_revision = { + "version": last_revision_el.find("version").text, + "date": last_revision_el.find("date").text, + "initials": None, + "remark": None, + } + + initials_el = last_revision_el.find("initials") + if initials_el is not None: + last_revision["initials"] = initials_el.text + + remark_el = last_revision_el.find("remark") + if remark_el is not None: + last_revision["remark"] = remark_el.text + + info["last_revision"] = last_revision + + sig = el.find("sig") + if sig is not None: + info["sig"] = sig.text + + if accepted: + info["number"] = int(el.find("number").text) + else: + info["protoname"] = el.find("proto-name").text + + return info + + +def load_xepinfos(tree): + accepted, protos = {}, {} + for info_el in tree.getroot(): + info = load_xepinfo(info_el) + if info["accepted"]: + accepted[info["number"]] = info + else: + protos[info["protoname"]] = info + + return accepted, protos + + +def dummy_info(number): + return { + "status": None, + "accepted": False, + "number": number, + } + + +def diff_infos(old, new): + if old["status"] != new["status"]: + if new["status"] == Status.PROTO: + return Action.PROTO + elif old["status"] is None: + return Action.NEW + else: + return Action.fromstatus(new["status"]) + + old_version = old.get("last_revision", {}).get("version") + new_version = new.get("last_revision", {}).get("version") + + if old_version != new_version: + return Action.UPDATE + + return None + + +def wraptext(text): + return "\n".join( + itertools.chain( + *[textwrap.wrap(line) if line else [line] for line in text.split("\n")] + ) + ) + + +def make_proto_mail(info): + kwargs = { + "info": info, + "approver": info["approver"], + "url": "{}inbox/{}.html".format( + XEP_URL_PREFIX, + info["protoname"], + ), + } + + mail = email.message.EmailMessage() + mail["Subject"] = SUBJECT_PROTO_TEMPLATE.format(**kwargs) + mail["XSF-XEP-Action"] = "PROTO" + mail["XSF-XEP-Title"] = info["title"] + mail["XSF-XEP-Type"] = info["type"] + mail["XSF-XEP-Status"] = info["status"].value + mail["XSF-XEP-Url"] = kwargs["url"] + mail["XSF-XEP-Approver"] = kwargs["approver"] + mail.set_content( + wraptext(MAIL_PROTO_TEMPLATE.format(**kwargs)), + "plain", + "utf-8", + ) + + return mail + + +def make_nonproto_mail(action, info): + last_revision = info.get("last_revision") + changelog = "(see in-document revision history)" + if last_revision is not None: + remark = last_revision.get("remark") + initials = last_revision.get("initials") + if remark and initials: + changelog = "{} ({})".format(remark, initials) + + kwargs = { + "info": info, + "changelog": changelog, + "action": action, + "url": "{}xep-{:04d}.html".format( + XEP_URL_PREFIX, + info["number"], + ), + } + + mail = email.message.EmailMessage() + mail["Subject"] = SUBJECT_NONPROTO_TEMPLATE.format(**kwargs) + mail["XSF-XEP-Action"] = action.value + mail["XSF-XEP-Title"] = info["title"] + mail["XSF-XEP-Type"] = info["type"] + mail["XSF-XEP-Status"] = info["status"].value + mail["XSF-XEP-Number"] = "{:04d}".format(info["number"]) + mail["XSF-XEP-Url"] = kwargs["url"] + mail.set_content( + wraptext(MAIL_NONPROTO_TEMPLATE.format(**kwargs)), + "plain", + "utf-8", + ) + + return mail + + +def get_or_ask(config, section, name, prompt): + try: + return config.get(section, name) + except (configparser.NoSectionError, + configparser.NoOptionError): + return input(prompt) + + +def interactively_extend_smtp_config(config): + try: + host = config.get("smtp", "host") + except (configparser.NoSectionError, + configparser.NoOptionError): + host = input("SMTP server: ").strip() + port = int(input("SMTP port (blank for 587): ").strip() or "587") + user = input( + "SMTP user (leave blank for anon): " + ).strip() or None + if user: + password = getpass.getpass() + else: + password = None + else: + port = config.getint("smtp", "port", fallback=587) + user = config.get("smtp", "user", fallback=None) + password = config.get("smtp", "password", fallback=None) + + try: + from_ = config.get("smtp", "from") + except (configparser.NoSectionError, + configparser.NoOptionError): + from_ = input("From address: ").strip() + + if not config.has_section("smtp"): + config.add_section("smtp") + config.set("smtp", "host", host) + config.set("smtp", "port", str(port)) + if user: + config.set("smtp", "user", user) + if password is None: + password = getpass.getpass() + config.set("smtp", "password", password) + config.set("smtp", "from", from_) + + +def choose(prompt, options, *, + eof=EOFError, + keyboard_interrupt=KeyboardInterrupt): + while True: + try: + choice = input(prompt).strip() + except EOFError: + if eof is EOFError: + raise + return eof + except KeyboardInterrupt: + if keyboard_interrupt is KeyboardInterrupt: + raise + return keyboard_interrupt + + if choice not in options: + print("invalid choice. please enter one of: {}".format( + ", ".join(map(str, options)) + )) + continue + + return choice + + +def make_smtpconn(config): + host = config.get("smtp", "host") + port = config.getint("smtp", "port") + user = config.get("smtp", "user", fallback=None) + password = config.get("smtp", "password", fallback=None) + + conn = smtplib.SMTP(host, port) + conn.starttls() + if user is not None: + conn.login(user, password) + + return conn + + +def make_fake_smtpconn(): + class Fake: + def send_message(self, mail): + print("---8<---") + print(mail.as_string()) + print("--->8---") + + def close(self): + pass + + return Fake() + + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description=wraptext(DESCRIPTION), + epilog=wraptext(EPILOG), + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + "-c", "--config", + metavar="FILE", + type=argparse.FileType("r"), + help="Configuration file", + ) + parser.add_argument( + "-y", + dest="ask_confirmation", + default=True, + action="store_false", + help="'I trust this script to do the right thing and send emails" + "without asking for confirmation.'" + ) + parser.add_argument( + "--no-proto", + dest="process_proto", + default=True, + action="store_false", + help="Disable processing of ProtoXEPs.", + ) + parser.add_argument( + "-n", "--dry-run", + dest="dry_run", + action="store_true", + default=False, + help="Instead of sending emails, print them to stdout (implies -y)", + ) + + parser.add_argument( + "old", + type=argparse.FileType("rb"), + help="Old xep-infos XML file", + ) + parser.add_argument( + "new", + type=argparse.FileType("rb"), + help="New xep-infos XML file", + ) + + parser.add_argument( + "to", + nargs="+", + help="The mail addresses to send the update mails to." + ) + + args = parser.parse_args() + + can_be_interactive = ( + os.isatty(sys.stdin.fileno()) and + os.isatty(sys.stdout.fileno()) + ) + + if args.dry_run: + args.ask_confirmation = False + + if args.ask_confirmation and not can_be_interactive: + print("Cannot ask for confirmation (stdio is not a TTY), but -y is", + "not given either. Aborting.", sep="\n", file=sys.stderr) + sys.exit(2) + + config = configparser.ConfigParser() + if args.config is not None: + config.read_file(args.config) + + with args.old as f: + tree = etree.parse(f) + old_accepted, old_proto = load_xepinfos(tree) + + with args.new as f: + tree = etree.parse(f) + new_accepted, new_proto = load_xepinfos(tree) + + old_xeps = set(old_accepted.keys()) + new_xeps = set(new_accepted.keys()) + + common_xeps = old_xeps & new_xeps + added_xeps = new_xeps - old_xeps + + added_protos = set(new_proto.keys()) - set(old_proto.keys()) + + updates = [] + + for common_xep in common_xeps: + old_info = old_accepted[common_xep] + new_info = new_accepted[common_xep] + + action = diff_infos(old_info, new_info) + if action is not None: + updates.append((common_xep, action, new_info)) + + for added_xep in added_xeps: + old_info = dummy_info(added_xep) + new_info = new_accepted[common_xep] + + action = diff_infos(old_info, new_info) + if action is not None: + updates.append((added_xep, action, new_info)) + + if args.process_proto: + for added_proto in added_protos: + old_info = dummy_info('xxxx') + new_info = new_proto[added_proto] + + action = diff_infos(old_info, new_info) + if action is not None: + updates.append((added_proto, action, new_info)) + + if args.dry_run: + smtpconn = make_fake_smtpconn() + else: + if can_be_interactive: + interactively_extend_smtp_config(config) + + try: + smtpconn = make_smtpconn(config) + except (configparser.NoSectionError, + configparser.NoOptionError) as exc: + print("Missing configuration: {}".format(exc), + file=sys.stderr) + print("(cannot ask for configuration on stdio because it is " + "not a TTY)", file=sys.stderr) + sys.exit(3) + + try: + for id_, action, info in updates: + if action == Action.PROTO: + mail = make_proto_mail(info) + else: + mail = make_nonproto_mail(action, info) + mail["Date"] = datetime.utcnow() + mail["From"] = config.get("smtp", "from") + mail["To"] = args.to + + if args.ask_confirmation: + print() + print("---8<---") + print(mail.as_string()) + print("--->8---") + print() + choice = choose( + "Send this email? [y]es, [n]o, [a]bort: ", + "yna", + eof="a", + ) + + if choice == "n": + continue + elif choice == "a": + print("Exiting on user request.", file=sys.stderr) + sys.exit(4) + + smtpconn.send_message(mail) + finally: + smtpconn.close() + + +if __name__ == "__main__": + main()