HEX
Server: LiteSpeed
System: Linux ip-172-31-76-142.ec2.internal 4.14.158-129.185.amzn2.x86_64 #1 SMP Tue Dec 24 03:15:32 UTC 2019 x86_64
User: 69b4844ae61d4e92bf26ad98af552775 (1065)
PHP: 7.2.27
Disabled: exec,passthru,shell_exec,system,eval
Upload Files
File: //usr/lib/python2.7/site-packages/amazon_linux_extras/repo.py
# Copyright 2017, 2018 Amazon.com, Inc. or its affiliates.

# This module is part of Amazon Linux Extras.
#
# Amazon Linux Extras is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License v2 as published
# by the Free Software Foundation.
#
# Amazon Linux Extras is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with Amazon Linux Extras.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function, unicode_literals

from .software_catalog import VERSION_KEY

import re
import os
import sys
import shutil
from tempfile import NamedTemporaryFile
from string import Formatter
try:
    import ConfigParser as configparser
except ImportError:
    import configparser
import logging as loggingmod

if sys.version_info.major == 2:
    from gettext import gettext as gettext_yields_encoded
    _ = lambda *args: gettext_yields_encoded(*args).decode("UTF-8")
else:
    from gettext import gettext as _

class UrlFormatHelper(Formatter):
    def convert_field(self, value, conversion):
        """Extend conversion symbol to support uppercasing values in the URL format.
        """
        if conversion == "u":
            return str(value).upper()
        return super(UrlFormatHelper, self).convert_field(value, conversion)

CONFIG_URL_MATCHER = re.compile(r".*?/extras/(?P<extraname>[^/]+)/(?P<exactver>[^/]+)/\$basearch/mirror\.list$")

YUMCONFIG_FILE_NAME = os.path.join(os.environ.get("TESTROOT", "/etc/yum.repos.d/"), "amzn2-extras.repo")
YUM_AMZN2_PRIORITY = 10

YUMCONFIG_SECTION_EXTRAPREFIX = "amzn2extra-"

URL_FMT = os.environ.get("URL_FORMAT",
                         "http://amazonlinux.$awsregion.$awsdomain/$releasever/extras/{extraname}/{exactver}/{sect}/mirror.list")
USE_MIRRORLIST = True if os.environ.get("USE_MIRRORLIST", "yes").lower() in ("yes", "true", "1", "on") else False

logger = loggingmod.getLogger(__name__)


def yum_ini_unquote(quoted_string):
    """
    >>> yum_ini_unquote("abcdef_21__40__23__24_")
    'abcdef!@#$'
    """
    def decode(match):
        return chr(int(match.group(1), 16))
    unquoted = re.sub("_([0-9a-fA-F]{2})_", decode, quoted_string)
    unnamespaced_unquoted = re.sub("^(?:"+re.escape(YUMCONFIG_SECTION_EXTRAPREFIX)+")?", r"", unquoted)
    return unnamespaced_unquoted


def yum_ini_quote(raw_string):
    """
    >>> yum_ini_quote("test1.1+other2")
    'test1.1_2b_other2'
    >>> yum_ini_quote("abcdef!@#$")
    'abcdef_21__40__23__24_'
    """
    def encode(match):
        return "_%02x_" % (ord(match.group(1)),)
    quoted = re.sub("([^-A-Za-z0-9.])", encode, raw_string) # important to match _
    return YUMCONFIG_SECTION_EXTRAPREFIX + quoted


def read_configuration():
    """Read the YUM configuration file we manage."""
    config = configparser.RawConfigParser()
    if os.path.isfile(YUMCONFIG_FILE_NAME):
        config.read(YUMCONFIG_FILE_NAME)
    state = {}
    for section in config.sections():

        if section.endswith(("-source", "-debuginfo")):
            continue

        quoted_section_name = yum_ini_unquote(section)
        state[quoted_section_name] = {}
        for key, value in config.items(section):
            if ("URL_FORMAT" not in os.environ
                 and (key == "mirrorlist" or key == "baseurl")):
                # For now, we assume any overide url passed in URL_FORMAT
                # points to latest
                match = CONFIG_URL_MATCHER.match(value)
                if not match:
                    logger.error("Malformed url in %s section %s  %r", YUMCONFIG_FILE_NAME, section, value)
                    raise ValueError(value)
                else:
                    state[quoted_section_name][VERSION_KEY] = match.group("exactver")

            if key in ("enabled",):
                if hasattr(value, "lower") and value.lower().strip() in ("true", "1", "yes"):
                    state[quoted_section_name][key] = 1
                elif hasattr(value, "lower") and value.lower().strip() in ("false", "0", "no"):
                    state[quoted_section_name][key] = 0
                elif value in (0, 1):
                    state[quoted_section_name][key] = value
                else:
                    logger.warn("Unexpected value for %s %r is %r", section, key, value)
                    state[quoted_section_name][key] = value
            else:
                state[quoted_section_name][key] = value

        if VERSION_KEY not in state[quoted_section_name] and state[quoted_section_name].get("enabled"):
            state[quoted_section_name][VERSION_KEY] = "latest"
    return state


def write_configuration(state):
    """Write the probably-mutated YUM configuration file we manage."""

    src_suffix = "-source"
    dbg_suffix = "-debuginfo"

    config = configparser.RawConfigParser()
    config.read(YUMCONFIG_FILE_NAME)

    for key in state:
        ini_section = yum_ini_quote(key)
        ini_section_dbg = ini_section + dbg_suffix
        ini_section_src = ini_section + src_suffix

        if not config.has_section(ini_section_src):
            config.add_section(ini_section_src)

        if not config.has_section(ini_section_dbg):
            config.add_section(ini_section_dbg)

        if not config.has_section(ini_section):
            config.add_section(ini_section)

        # defaults
        config.set(ini_section, "enabled", state[key]["enabled"])

        config.set(ini_section_dbg, "enabled",
                config.get(ini_section_dbg, "enabled") if config.has_option(ini_section_dbg, "enabled") else "0")

        config.set(ini_section_src, "enabled",
                config.get(ini_section_src, "enabled") if config.has_option(ini_section_src, "enabled") else "0")

        # user settings
        for k, v in sorted(state[key].items()):
            if k == VERSION_KEY:
                continue
            config.set(ini_section, k, v)

        # overrides of user settings
        config.set(ini_section, "name", "Amazon Extras repo for " + key)
        config.set(ini_section_src, "name", "Amazon Extras source repo for " + key)
        config.set(ini_section_dbg, "name", "Amazon Extras debuginfo repo for " + key)

        if USE_MIRRORLIST:
            url_key = "mirrorlist"
        else:
            url_key = "baseurl"

        # Get alternates suffixes for srpms and debuginfo if needed
        srpm_suffix = os.environ.get("SRC_SUFFIX", "SRPMS")
        debuginfo_suffix = os.environ.get("DEBUGINFO_SUFFIX", "debuginfo/$basearch")

        url_format_helper = UrlFormatHelper()
        
        config.set(ini_section, url_key,
                   url_format_helper.format(URL_FMT, extraname=key,
                                            exactver=state[key].get(VERSION_KEY, "latest"),
                                            sect="$basearch"))
        config.set(ini_section_src, url_key,
                   url_format_helper.format(URL_FMT, extraname=key,
                                            exactver=state[key].get(VERSION_KEY, "latest"),
                                            sect=srpm_suffix))
        config.set(ini_section_dbg, url_key,
                   url_format_helper.format(URL_FMT, extraname=key,
                                            exactver=state[key].get(VERSION_KEY, "latest"),
                                            sect=debuginfo_suffix))

        if "URL_FORMAT" not in os.environ:
            assert CONFIG_URL_MATCHER.match(config.get(ini_section, url_key))
        config.set(ini_section, "gpgcheck", 1)
        config.set(ini_section_src, "gpgcheck", 1)
        config.set(ini_section_dbg, "gpgcheck", 1)
        config.set(ini_section, "gpgkey", "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-amazon-linux-2")
        config.set(ini_section_src, "gpgkey", "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-amazon-linux-2")
        config.set(ini_section_dbg, "gpgkey", "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-amazon-linux-2")
        config.set(ini_section, "priority", YUM_AMZN2_PRIORITY)
        config.set(ini_section_src, "priority", YUM_AMZN2_PRIORITY)
        config.set(ini_section_dbg, "priority", YUM_AMZN2_PRIORITY)

    with NamedTemporaryFile(mode="w+t", delete=False) as ntf:
        written_file_name = ntf.name
        ntf.write("\n\n\n\n### This file is managed with amazon-linux-extras. Please manage with that tool.\n")
        ntf.write("\n"*20)  # Scroll the good stuff below impulsive attentions.
        config.write(ntf)
    os.chmod(written_file_name, 0o644)  # Make world-readable, owner writable

    try:
        shutil.move(written_file_name, YUMCONFIG_FILE_NAME)
    except IOError as exc:
        logger.error(_("You lack permissions to write to system configuration.") + "  " + YUMCONFIG_FILE_NAME)
        raise