HEX
Server: LiteSpeed
System: Linux ip-172-31-76-142.ec2.internal 4.14.158-129.185.amzn2.x86_64 #1 SMP Tue Dec 24 03:15:32 UTC 2019 x86_64
User: 69b4844ae61d4e92bf26ad98af552775 (1065)
PHP: 7.2.27
Disabled: exec,passthru,shell_exec,system,eval
Upload Files
File: //lib/python2.7/site-packages/amazon_linux_extras/software_catalog.py
# Copyright 2017, 2018 Amazon.com, Inc. or its affiliates.

# This module is part of Amazon Linux Extras.
#
# Amazon Linux Extras is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License v2 as published
# by the Free Software Foundation.
#
# Amazon Linux Extras is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with Amazon Linux Extras.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function, unicode_literals

import ast
import os
import pwd
import sys
import json
import errno
from time import time
from tempfile import gettempdir, NamedTemporaryFile
import logging as loggingmod
import subprocess
import shutil

if sys.version_info.major == 2:
    from gettext import gettext as gettext_yields_encoded
    _ = lambda *args: gettext_yields_encoded(*args).decode("UTF-8")
else:
    from gettext import gettext as _

try:
    from urllib2 import urlopen
    from urllib2 import URLError
except ImportError:
    from urllib.request import urlopen
    from urllib.error import URLError


CATALOG_URL = os.environ.get("CATALOGURL", "http://amazonlinux.{awsregion}.{awsdomain}/{releasever}/extras-catalog-{basearch}.json")

IS_TESTING = os.environ.get("AMZN2DEBUGGING", False) and True

ACTUAL_UID = int(os.environ.get("SUDO_UID", os.getuid()))
VERSION_KEY = "_exact_version_"

logger = loggingmod.getLogger(__name__)


def get_cache_file_name(uid=ACTUAL_UID):
    """If there is no base user, actual UID is zero, then use /var/cache for
    storage. Else, look up the user's home and put the file in ~/.cache/ .

    normal user will go in ~normal/.cache
    sudo user will go in ~normal/.cache
    natural root will go in /var/cache """

    if IS_TESTING:
        return "extras-software-catalog-cache"

    if uid == 0:
        cache_container = "/var/cache"
    else:
        cache_container = os.path.join(pwd.getpwuid(uid).pw_dir, ".cache")
        assert "/root" not in cache_container

    cache_file_name = os.path.join(cache_container, "amzn2extras/catalog-cache-{0}".format(uid))

    for container in (cache_container, os.path.dirname(cache_file_name)):
        try:
            os.mkdir(container)
            # If we make the directory, chown and chmod it
            os.chown(container, uid, -1)
            os.chmod(container, 0o700)
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise

    return cache_file_name


class CatalogError(ValueError): """List of software is not usable."""


def make_catalog_lookups_easy(catalog):
    # a fast look-up by topic name to get topic data.
    catalog["topics-by-name"] = dict((topic["n"], topic) for topic in catalog.get("topics", []))
    # expand topic indexes into sets of names, for easy comparison.
    catalog["whitelists-by-names"] = [list(catalog["topics"][i]["n"] for i in wli) for wli in catalog["whitelists"]]
    return catalog


def get_yum_variables():
    """Query Yum subsystem for its variable storage, and then override those
    values when they're in environment."""

    # 'yum' is py2 only, and we might be py3, so we can't just import.
    yum_process = subprocess.Popen(["env", "-i", "python", "-c", "import yum; y=yum.YumBase(); y.doConfigSetup(init_plugins=False); print(y.conf.yumvar)"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    yum_output, _ = yum_process.communicate()
    try:
        discovered_vars = ast.literal_eval(yum_output.decode("UTF-8"))
    except Exception as exc:
        discovered_vars = {}

    yumvars = discovered_vars.copy()
    for name, default in (("awsdomain", "amazonaws.com"), ("awsregion", "default"), ("releasever", "2"), ("basearch", None)):
        override_value = os.environ.get(name.upper())
        val = override_value or yumvars.get(name) or default
        if val:
            yumvars[name] = val

    return yumvars


def fetch_new_catalog():
    """Get a data structure that represents what we know of available
    software."""

    yumvars = get_yum_variables()
    url = CATALOG_URL.format(**yumvars)
    try:
        request = urlopen(url)
        fetched_string = request.read().decode("UTF-8")
    except URLError:
        logger.error(_("Catalog is not reachable. Try again later."))
        logger.exception("catalog at %s", url)
        raise CatalogError("not reachable")
    catalog = json.loads(fetched_string)
    make_catalog_lookups_easy(catalog)

    return catalog


def get_catalog(insist_stable_ordinal=False):
    """Load the Bonus-software catalog from disk or network and check its sanity.

    Some user commands can take numbers as arguments, and when that happens and
    that command uses this fn to decide what the user meant, then we must refuse
    to renumber the items that the user saw. `insist_stable_ordinal` means we do
    not load something new, but what the user last saw.
    """
    catalog = load_cached_catalog()
    if not catalog or IS_TESTING:
        if insist_stable_ordinal and not IS_TESTING:
            logger.error(_("The catalog of Extras might have changed. Please \"list\" to refresh."))
            raise CatalogError("perhaps not consistent numbering")

        catalog = fetch_new_catalog()

    if catalog.get("motd"):
        print(catalog["motd"], file=sys.stderr)  # translations? :(

        if catalog.get("status", "ok") != "ok":
            # Maybe this is disabled.
            if not IS_TESTING:
                sys.exit(1)

    if catalog.get("version") != 1:
        logger.error(_("The catalog version is newer than this tool understands. Please upgrade."))
        logger.error("$ sudo yum upgrade amazon-linux-extras")
        raise CatalogError("catalog is too new")

    try:
        store_catalog_in_cache(catalog)
    except CatalogError:
        raise
    except Exception as exc:
        logger.warn(_("Couldn't write catalog to cache."))
        logger.warn(str(exc))
    return catalog


def load_cached_catalog():
    """Get a catalog off of disk. Return it if it is safe, but return False if something about it
    is wrong."""

    cache_file_name = get_cache_file_name()

    if os.path.islink(cache_file_name):
        return False

    if not os.access(cache_file_name, os.W_OK):
        # Unwritable caches are not trustable.
        return False

    if os.path.getmtime(cache_file_name) < (time()-(5*60)):  # five minutes.
        return False

    with open(cache_file_name) as cache_file:
        try:
            catalog = json.load(cache_file)
        except ValueError:
            return False

    if "version" not in catalog: return False
    if "topics" not in catalog: return False
    if "whitelists" not in catalog: return False
    if not catalog["topics"]: return False

    return catalog


def store_catalog_in_cache(catalog):
    """Preserve the catalog for immediate use again. Keep it writable by the
    real user, not an escalated role."""
    if not catalog:
        logger.debug("There is no catalog.")
        return

    with NamedTemporaryFile(mode="w+t", delete=False) as ntf:
        written_file_name = ntf.name
        try:
            json.dump(catalog, ntf)
        except Exception:
            raise CatalogError("can't store")  # Remap because we only care about this one.

    if os.geteuid() != ACTUAL_UID:
        logger.debug("Not running as normal user.")
        os.chown(written_file_name, ACTUAL_UID, -1)  # change ownership back to original user

    cache_file_name = get_cache_file_name()
    logger.debug("writing cache to %s", cache_file_name)
    shutil.move(ntf.name, cache_file_name)