HEX
Server: LiteSpeed
System: Linux ip-172-31-76-142.ec2.internal 4.14.158-129.185.amzn2.x86_64 #1 SMP Tue Dec 24 03:15:32 UTC 2019 x86_64
User: 69b4844ae61d4e92bf26ad98af552775 (1065)
PHP: 7.2.27
Disabled: exec,passthru,shell_exec,system,eval
Upload Files
File: //usr/lib/python2.7/site-packages/amazon_linux_extras/cli.py
# encoding: UTF-8
# Copyright 2017, 2018 Amazon.com, Inc. or its affiliates.

# This module is part of Amazon Linux Extras.
#
# Amazon Linux Extras is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License v2 as published
# by the Free Software Foundation.
#
# Amazon Linux Extras is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with Amazon Linux Extras.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function, unicode_literals, absolute_import

from .software_catalog import get_catalog, fetch_new_catalog, VERSION_KEY, CatalogError
from .repo import read_configuration, write_configuration


import sys
import logging as loggingmod
import os
import re
import textwrap
import subprocess
from datetime import datetime

if sys.version_info.major == 2:
    from gettext import gettext as gettext_yields_encoded
    _ = lambda *args: gettext_yields_encoded(*args).decode("UTF-8")
else:
    from gettext import gettext as _


IS_TESTING = os.environ.get("AMZN2DEBUGGING", False) and True

logger = loggingmod.getLogger(__name__)


def host_is_sane():
    """Predicate to test if it makes sense to run on this kind of machine."""
    if IS_TESTING:
        return True
    try:
        with open("/etc/system-release") as sysrel:
            if not "Amazon" in sysrel.read(50):
                return False
    except IOError:
        return False
    #if not os.path.exists("/etc/yum/vars/awsdomain"): return False
    #if not os.path.exists("/etc/yum/vars/awsregion"): return False
    return True


def topic_combination_was_tested(enabled_topics, catalog, additional_topic_name):
    """Predicate to test if adding the proposed Topic has been tested. We do this by looking for a
    known whitelis that equal or is a superset of the proposed set."""
    proposed = set()
    if additional_topic_name:
        proposed.add(additional_topic_name)
    proposed.update(enabled_topics)

    if additional_topic_name and additional_topic_name not in catalog["topics-by-name"]:
        raise KeyError(additional_topic_name)

    return bool(any(proposed <= set(known_good) for known_good in catalog["whitelists-by-names"]))


def topic_info(topic_dict, machine_state):
    is_enabled = machine_state.get(topic_dict["n"], {}).get("enabled")
    version = machine_state.get(topic_dict["n"], {}).get(VERSION_KEY, "")
    expiry = None
    if "deprecated-at" in topic_dict:
        try:
            expiry = datetime.strptime(topic_dict["deprecated-at"], "%Y-%m-%d").date()
        except ValueError as exc:
            pass
    return "{0}={1}".format(topic_dict["n"], version) if is_enabled else topic_dict["n"], is_enabled, expiry


def cmd_list(args):
    """Lists topics in the catalog. Some may be enabled."""

    try:
        catalog = get_catalog()
    except CatalogError:
        # get_catalog emits its own messages.
        sys.exit(2)

    emphasis_color = "\033[94m" if sys.stdout.isatty() else ""  # ANSI blue foreground
    warning_color = "\033[93m" if sys.stdout.isatty() else ""  # ANSI yellow foreground
    reset_color = "\033[0m" if sys.stdout.isatty() else ""  # ANSI colors reset
    machine_state = read_configuration()

    today = datetime.utcnow().date()
    name_width = max(len(topic_info(topic, machine_state)[0]) for topic in catalog["topics"])
    status_width = max([len(_("enabled")), len(_("available"))]) + 2
    contains_expired = False
    contains_expiring = False
    for i, topic in enumerate(catalog["topics"]):
        name, is_enabled, expiry = topic_info(topic, machine_state)

        status = _("enabled") if is_enabled else _("available")

        expired_note = " "
        if expiry:
            if today > expiry:
                if not is_enabled:
                    continue
                else:
                    expired_note = warning_color + "*" + reset_color
                    contains_expired = True
            else:
                expired_note = "†"
                contains_expiring = True

        if topic_combination_was_tested([k for k, v in machine_state.items() if v.get("enabled")], catalog, topic["n"]):
            ord_desc = "{0: 3d}".format(i)
        else:
            #ord_desc = "{0:>3s}".format(u"\N{NO ENTRY}")
            ord_desc = "{0:>3s}".format(u"_")

        versions_display = "  ".join(["=" + v for v in topic.get("versions", [])])
        if versions_display:
            versions_display = "[ " + versions_display + " ]"

        optionally_emphasize = emphasis_color if is_enabled else ""

        print("{ord_desc:3s} {expired_note}{optionally_emphasize}{name:{name_width}}  {status:{status_width}}{reset_color}  ".format(**locals()), end="")
        if name_width + status_width + len(versions_display) > 60:  # typical width minus padding
            print("\\")
            print(textwrap.fill(versions_display, initial_indent=" "*8, subsequent_indent=" "*10, break_on_hyphens=False))
        else:
            print(versions_display)

    if contains_expired:
        print("{warning_color}*{reset_color} ".format(**locals()) + _("Extra topic has reached end of support."))
    if contains_expiring:
        print("† ".format(**locals()) + _("Note on end-of-support. Use 'info' subcommand."))


def actually_enable(args, catalog, machine_state, fail_on_untested_combination=True):
    wanted = []
    for arg in args:
        if arg.startswith("-"):
            continue

        pieces = arg.split("=", 1)
        arg = pieces[0]
        exactver = pieces[1] if len(pieces) > 1 else None

        if re.match(r"^[0-9]+$", arg):
            try:
                wanted.append( (catalog["topics"][int(arg)]["n"], exactver or "latest") )
            except IndexError as exc:
                logger.error(_("Topic %s is not found."), arg)
                sys.exit(3)
        else:
            wanted.append( (arg, exactver or "latest") )

    failures = []
    for name, exact_version in wanted:
        if machine_state.get(name, {}).get("enabled", 0):
            continue  # Ignore already-installed.

        try:
            if not topic_combination_was_tested([k for k, d in machine_state.items() if d.get("enabled")], catalog, name):
                failures.append(name)
        except KeyError:
            logger.error(_("Topic %s is not found."), name)
            sys.exit(4)

        name_and_maybe_version, is_enabled, expiry = topic_info(catalog["topics-by-name"][name], machine_state)
        if expiry:
            logger.warning(_("Topic {0} has end-of-support date of {1:%Y-%m-%d}").format(name, expiry))

        if exact_version != "latest" and exact_version not in catalog["topics-by-name"][name].get("versions", []):
            logger.error(_("Topic %s is not found."), "%s=%s" % (name, exact_version))
            sys.exit(5)

    if failures:
        for failure in failures:
            logger.error(_("Refusing because %s could cause an invalid combination."), failure)
        sys.exit(6)

    proposed_enabled_items = dict((k, d[VERSION_KEY]) for k, d in machine_state.items() if d.get("enabled"))
    proposed_enabled_items.update(wanted)
    if not topic_combination_was_tested(proposed_enabled_items, catalog, None):
        logger.error(_("Refusing because %s could cause an invalid combination."), "+".join(proposed_enabled_items.keys()))
        sys.exit(7)

    previously_enabled = machine_state.copy()
    for name, exact_version in wanted:
        if name not in machine_state:
            machine_state[name] = {}
        machine_state[name]["enabled"] = 1
        machine_state[name][VERSION_KEY] = exact_version
        if "visible" in catalog["topics-by-name"][name] and catalog["topics-by-name"][name]["visible"]:
            machine_state[name]["includepkgs"] = " ".join(catalog["topics-by-name"][name]["visible"])

    try:
        write_configuration(machine_state)
    except IOError:
        sys.exit(8)

    return wanted


def cmd_enable(args):
    #"""Enables topics specified by name or number."""
    if not args:
        logger.error(_("Specify a topic name or number."))
        sys.exit(9)

    try:
        catalog = get_catalog(insist_stable_ordinal=any(re.match(r"^[0-9]+$", arg) for arg in args))
    except CatalogError:
        # get_catalog emits its own messages.
        sys.exit(10)

    machine_state = read_configuration()
    is_valid_before_enabling = topic_combination_was_tested([k for k, d in machine_state.items() if d.get("enabled")], catalog, None)
    newly_enabled = actually_enable(args, catalog, machine_state)

    cmd_list([])

    print()
    for topic, version in sorted(newly_enabled):
        if catalog["topics-by-name"][topic].get("inst"):
            cmd = "dnf" if any(os.access(os.path.join(p, "dnf"), os.X_OK) for p in os.environ.get("PATH", "").split(":")) else "yum"
            print(_("Now you can install:"))
            print(" # {0} clean metadata".format(cmd))
            print(" # {0} install {1}".format(cmd, " ".join(catalog["topics-by-name"][topic]["inst"])))


def cmd_install(args):
    """Enables specified topics and installs their packages."""
    previous_machine_state = read_configuration()

    try:
        catalog = get_catalog(insist_stable_ordinal=any(re.match(r"^[0-9]+$", arg) for arg in args))
    except CatalogError:
        # get_catalog emits its own messages.
        sys.exit(11)

    is_valid_before_enabling = topic_combination_was_tested([k for k, d in previous_machine_state.items() if d.get("enabled")], catalog, None)
    actually_enable(args, catalog, previous_machine_state.copy())
    try:

        install_args_excluding_options = [arg for arg in args if not arg.startswith("-")]
        packages = set()
        for name in install_args_excluding_options:
            topic = re.sub(r"=.*", "", name)
            try:
                integer_match = re.match(r"^([0-9]+)$", topic)
                if integer_match:
                    i = int(integer_match.group(1))
                    topic = catalog["topics"][i]["n"]
                    recommendations = catalog["topics"][i].get("inst", [])
                else:
                    recommendations = catalog["topics-by-name"][topic].get("inst", [])
            except (IndexError, KeyError):
                logger.error(_("Topic %s is not found."), topic)
                sys.exit(12)

            if not recommendations:
                logger.warn(_("Topic %s does not install packages automatically."), topic)
                logger.warn(_("Use \"repoquery --repoid=%s --query --all\" to list packages."), topic)
            else:
                packages.update(set(recommendations))

        automation = ["-y"] if not sys.stdin.isatty() or "-y" in args else []
        verbosity = ["-v"] if IS_TESTING or logger.isEnabledFor(loggingmod.INFO) else []
        try:
            if packages:
                print(_("Installing {0}").format(", ".join(packages)))
                cmd = "dnf" if any(os.access(os.path.join(p, "dnf"), os.X_OK) for p in os.environ.get("PATH", "").split(":")) else "yum"
                subprocess.check_call([cmd, "clean", "metadata"])
                subprocess.check_call([cmd, "install"] + verbosity + automation + sorted(packages))
        except subprocess.CalledProcessError:
            write_configuration(previous_machine_state)
            logger.error(_("Installation failed. Check that you have permissions to install."))
            sys.exit(13)

    except Exception:
        write_configuration(previous_machine_state)
        raise

    cmd_list([])


def cmd_disable(args):
    #"""Disable specified software topics."""

    if not args or any(re.match(r"^[0-9]+$", arg) for arg in args):
        logger.error(_("Specify a topic name."))
        sys.exit(14)

    machine_state = read_configuration()
    logger.warn(_("Beware that disabling topics is not supported after they are installed."))
    for item in args:
        if not machine_state.get(item, {}).get("enabled", 0):
            logger.error(_("%r was not enabled. Ignoring."), item)
            continue
        machine_state[item]["enabled"] = 0
    try:
        write_configuration(machine_state)
    except IOError:
        sys.exit(15)

    cmd_list([])


def cmd_info(args):
    """See details of a specific package."""

    if not args or any(re.match(r"^[0-9]+$", arg) for arg in args):
        logger.error(_("Specify a topic name."))
        sys.exit(16)

    try:
        catalog = get_catalog(insist_stable_ordinal=any(re.match(r"^[0-9]+$", arg) for arg in args))
    except CatalogError:
        # get_catalog emits its own messages.
        sys.exit(17)
    machine_state = read_configuration()

    cmd = "dnf" if any(os.access(os.path.join(p, "dnf"), os.X_OK) for p in os.environ.get("PATH", "").split(":")) else "yum"

    wanted = [catalog["topics"][int(arg)]["n"] if re.match(r"^[0-9]+$", arg) else arg for arg in args if not arg.startswith("-")]

    seen = set()
    expiration_warning = False
    for topic in wanted:
        if topic in seen:
            continue
        seen.add(topic)

        try:
            recommendations = catalog["topics-by-name"][topic].get("inst", [])
        except KeyError:
            logger.error(_("Topic %s is not found."), topic)
            continue

        name_and_maybe_version, is_enabled, expiry = topic_info(catalog["topics-by-name"][topic], machine_state)

        if expiry:
            expiration_warning = True
            print(_("{0} has end-of-support date of {1:%Y-%m-%d}").format(topic, expiry))

        for rec in recommendations:
            print((_("{0} recommends {1:25s}")+"  # {2} install {1}").format(topic, rec, cmd))

    if expiration_warning:
        sys.exit(127)


def cmd_suggest(args):
    logger.warning(_("We don't receive suggestions at present. Thank you for trying."))
    sys.exit(18)

    # Defined in spec, but not for beta.
    if not args:
        print(_("We collect suggestions for package topics. Specify what "
                "you hoped to find."))
    else:
        pass
        print(_("Thanks!"))


def cmd_help(args):
    """See list of commands."""
    for name in sorted(globals()):
        if name.startswith("cmd_"):
            function = globals()[name]
            if function.__doc__:
                first_line = function.__doc__.split("\n")[0].strip()
                if first_line:
                    print("  {0:9s} {1}".format(name[4:], first_line))  # translations? :(
    print()
    print(textwrap.fill(_("Amazon Linux Extras software topics give you access to the most-recent "
                          "stable software you specifically choose, without the uncertainty of "
                          "a wholly new environment.")))
    print()
    print(textwrap.fill(_("Best practice is to enable only one or two topics. More than that, and "
                          "you lose the benefits of working in a stable environment.")))


def cmd_system_motd(args):
    try:
        catalog = fetch_new_catalog()   # bypass checks a user could address and only get data
        message = catalog.get("system-motd")
        if message:
            print(message)
    except Exception:
        # Do not complain!
        pass


def main(args):
    if not host_is_sane():
        logger.error(_("This OS is not supported."))
        return 1

    if not args:
        args = ["list"]

    action_function_name = "cmd_" + args.pop(0)
    try:
        action = globals()[action_function_name]
    except KeyError:
        logger.error(_("That is not a valid option. Try \"help\"."))
        return 1

    action(args)
    return 0


# vi: set expandtab autoindent shiftwidth=4 softtabstop=4 fileencoding=utf-8 filetype=python :