File: //usr/lib/python2.7/site-packages/amazon_linux_extras/software_catalog.py
# Copyright 2017, 2018 Amazon.com, Inc. or its affiliates.
# This module is part of Amazon Linux Extras.
#
# Amazon Linux Extras is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License v2 as published
# by the Free Software Foundation.
#
# Amazon Linux Extras is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with Amazon Linux Extras. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function, unicode_literals
import ast
import os
import pwd
import sys
import json
import errno
from time import time
from tempfile import gettempdir, NamedTemporaryFile
import logging as loggingmod
import subprocess
import shutil
if sys.version_info.major == 2:
from gettext import gettext as gettext_yields_encoded
_ = lambda *args: gettext_yields_encoded(*args).decode("UTF-8")
else:
from gettext import gettext as _
try:
from urllib2 import urlopen
from urllib2 import URLError
except ImportError:
from urllib.request import urlopen
from urllib.error import URLError
CATALOG_URL = os.environ.get("CATALOGURL", "http://amazonlinux.{awsregion}.{awsdomain}/{releasever}/extras-catalog-{basearch}.json")
IS_TESTING = os.environ.get("AMZN2DEBUGGING", False) and True
ACTUAL_UID = int(os.environ.get("SUDO_UID", os.getuid()))
VERSION_KEY = "_exact_version_"
logger = loggingmod.getLogger(__name__)
def get_cache_file_name(uid=ACTUAL_UID):
"""If there is no base user, actual UID is zero, then use /var/cache for
storage. Else, look up the user's home and put the file in ~/.cache/ .
normal user will go in ~normal/.cache
sudo user will go in ~normal/.cache
natural root will go in /var/cache """
if IS_TESTING:
return "extras-software-catalog-cache"
if uid == 0:
cache_container = "/var/cache"
else:
cache_container = os.path.join(pwd.getpwuid(uid).pw_dir, ".cache")
assert "/root" not in cache_container
cache_file_name = os.path.join(cache_container, "amzn2extras/catalog-cache-{0}".format(uid))
for container in (cache_container, os.path.dirname(cache_file_name)):
try:
os.mkdir(container)
# If we make the directory, chown and chmod it
os.chown(container, uid, -1)
os.chmod(container, 0o700)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
return cache_file_name
class CatalogError(ValueError): """List of software is not usable."""
def make_catalog_lookups_easy(catalog):
# a fast look-up by topic name to get topic data.
catalog["topics-by-name"] = dict((topic["n"], topic) for topic in catalog.get("topics", []))
# expand topic indexes into sets of names, for easy comparison.
catalog["whitelists-by-names"] = [list(catalog["topics"][i]["n"] for i in wli) for wli in catalog["whitelists"]]
return catalog
def get_yum_variables():
"""Query Yum subsystem for its variable storage, and then override those
values when they're in environment."""
# 'yum' is py2 only, and we might be py3, so we can't just import.
yum_process = subprocess.Popen(["env", "-i", "python", "-c", "import yum; y=yum.YumBase(); y.doConfigSetup(init_plugins=False); print(y.conf.yumvar)"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
yum_output, _ = yum_process.communicate()
try:
discovered_vars = ast.literal_eval(yum_output.decode("UTF-8"))
except Exception as exc:
discovered_vars = {}
yumvars = discovered_vars.copy()
for name, default in (("awsdomain", "amazonaws.com"), ("awsregion", "default"), ("releasever", "2"), ("basearch", None)):
override_value = os.environ.get(name.upper())
val = override_value or yumvars.get(name) or default
if val:
yumvars[name] = val
return yumvars
def fetch_new_catalog():
"""Get a data structure that represents what we know of available
software."""
yumvars = get_yum_variables()
url = CATALOG_URL.format(**yumvars)
try:
request = urlopen(url)
fetched_string = request.read().decode("UTF-8")
except URLError:
logger.error(_("Catalog is not reachable. Try again later."))
logger.exception("catalog at %s", url)
raise CatalogError("not reachable")
catalog = json.loads(fetched_string)
make_catalog_lookups_easy(catalog)
return catalog
def get_catalog(insist_stable_ordinal=False):
"""Load the Bonus-software catalog from disk or network and check its sanity.
Some user commands can take numbers as arguments, and when that happens and
that command uses this fn to decide what the user meant, then we must refuse
to renumber the items that the user saw. `insist_stable_ordinal` means we do
not load something new, but what the user last saw.
"""
catalog = load_cached_catalog()
if not catalog or IS_TESTING:
if insist_stable_ordinal and not IS_TESTING:
logger.error(_("The catalog of Extras might have changed. Please \"list\" to refresh."))
raise CatalogError("perhaps not consistent numbering")
catalog = fetch_new_catalog()
if catalog.get("motd"):
print(catalog["motd"], file=sys.stderr) # translations? :(
if catalog.get("status", "ok") != "ok":
# Maybe this is disabled.
if not IS_TESTING:
sys.exit(1)
if catalog.get("version") != 1:
logger.error(_("The catalog version is newer than this tool understands. Please upgrade."))
logger.error("$ sudo yum upgrade amazon-linux-extras")
raise CatalogError("catalog is too new")
try:
store_catalog_in_cache(catalog)
except CatalogError:
raise
except Exception as exc:
logger.warn(_("Couldn't write catalog to cache."))
logger.warn(str(exc))
return catalog
def load_cached_catalog():
"""Get a catalog off of disk. Return it if it is safe, but return False if something about it
is wrong."""
cache_file_name = get_cache_file_name()
if os.path.islink(cache_file_name):
return False
if not os.access(cache_file_name, os.W_OK):
# Unwritable caches are not trustable.
return False
if os.path.getmtime(cache_file_name) < (time()-(5*60)): # five minutes.
return False
with open(cache_file_name) as cache_file:
try:
catalog = json.load(cache_file)
except ValueError:
return False
if "version" not in catalog: return False
if "topics" not in catalog: return False
if "whitelists" not in catalog: return False
if not catalog["topics"]: return False
return catalog
def store_catalog_in_cache(catalog):
"""Preserve the catalog for immediate use again. Keep it writable by the
real user, not an escalated role."""
if not catalog:
logger.debug("There is no catalog.")
return
with NamedTemporaryFile(mode="w+t", delete=False) as ntf:
written_file_name = ntf.name
try:
json.dump(catalog, ntf)
except Exception:
raise CatalogError("can't store") # Remap because we only care about this one.
if os.geteuid() != ACTUAL_UID:
logger.debug("Not running as normal user.")
os.chown(written_file_name, ACTUAL_UID, -1) # change ownership back to original user
cache_file_name = get_cache_file_name()
logger.debug("writing cache to %s", cache_file_name)
shutil.move(ntf.name, cache_file_name)