HEX
Server: LiteSpeed
System: Linux ip-172-31-76-142.ec2.internal 4.14.158-129.185.amzn2.x86_64 #1 SMP Tue Dec 24 03:15:32 UTC 2019 x86_64
User: 69b4844ae61d4e92bf26ad98af552775 (1065)
PHP: 7.2.27
Disabled: exec,passthru,shell_exec,system,eval
Upload Files
File: //lib/python2.7/site-packages/cfnbootstrap/rpm_tools.py
#==============================================================================
# Copyright 2011 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#==============================================================================
from cfnbootstrap.construction_errors import ToolError
from cfnbootstrap.util import ProcessHelper, LoggingProcessHelper
import logging
import re
import subprocess

log = logging.getLogger("cfn.init")

class YumTool(object):
    """
    Installs packages via Yum

    """

    def apply(self, action, auth_config=None):
        """
        Install a set of packages via yum, returning the packages actually installed or updated.

        Arguments:
        action -- a dict of package name to version; version can be empty, a single string or a list of strings

        Exceptions:
        ToolError -- on expected failures (such as a non-zero exit code)
        """

        pkgs_changed = []

        if not action.keys():
            log.debug("No packages specified for yum")
            return pkgs_changed

        cache_result = ProcessHelper(['yum', '-y', 'makecache']).call()

        if cache_result.returncode:
            log.error("Yum makecache failed. Output: %s", cache_result.stdout)
            raise ToolError("Could not create yum cache", cache_result.returncode)

        pkg_specs_to_upgrade = []
        pkg_specs_to_downgrade = []

        for pkg_name in action:
            if action[pkg_name]:
                if isinstance(action[pkg_name], basestring):
                    pkg_ver = action[pkg_name]
                else:
                    # Yum only cares about one version anyway... so take the max specified version in the list
                    pkg_ver = RpmTool.max_version(action[pkg_name])
            else:
                pkg_ver = None

            pkg_spec = '%s-%s' % (pkg_name, pkg_ver) if pkg_ver else pkg_name

            if self._pkg_installed(pkg_spec):
                # If the EXACT requested spec is installed, don't do anything
                log.debug("%s will not be installed as it is already present", pkg_spec)
            elif not self._pkg_available(pkg_spec):
                # If the requested spec is not available, blow up
                log.error("%s is not available to be installed", pkg_spec)
                raise ToolError("Yum does not have %s available for installation" % pkg_spec)
            elif not pkg_ver:
                # If they didn't request a specific version, always upgrade
                pkg_specs_to_upgrade.append(pkg_spec)
                pkgs_changed.append(pkg_name)
            else:
                # They've requested a specific version that's available but not installed.
                # Figure out if it's an upgrade or a downgrade
                installed_version = RpmTool.get_package_version(pkg_name, False)[1]
                if self._should_upgrade(pkg_ver, installed_version):
                    pkg_specs_to_upgrade.append(pkg_spec)
                    pkgs_changed.append(pkg_name)
                else:
                    log.debug("Downgrading to %s from installed version %s", pkg_spec, installed_version)
                    pkg_specs_to_downgrade.append(pkg_spec)
                    pkgs_changed.append(pkg_name)


        if not pkgs_changed:
            log.debug("All yum packages were already installed")
            return []

        if pkg_specs_to_upgrade:
            log.debug("Installing/updating %s via yum", pkg_specs_to_upgrade)

            result = LoggingProcessHelper(['yum', '-y', 'install'] + pkg_specs_to_upgrade, name=u'yum install %s' % ' '.join(pkg_specs_to_upgrade)).call()

            if result.returncode:
                log.error("Yum failed. Output: %s", result.stdout)
                raise ToolError("Could not successfully install/update yum packages", result.returncode)

        if pkg_specs_to_downgrade:
            log.debug("Downgrading %s via yum", pkg_specs_to_downgrade)

            result = LoggingProcessHelper(['yum', '-y', 'downgrade'] + pkg_specs_to_downgrade, name=u'yum downgrade %s' % ' '.join(pkg_specs_to_downgrade)).call()

            if result.returncode:
                log.error("Yum failed. Output: %s", result.stdout)
                raise ToolError("Could not successfully downgrade yum packages", result.returncode)


        log.info("Yum installed %s", pkgs_changed)

        return pkgs_changed


    def _should_upgrade(self, requested_ver, installed_version):
        # If they haven't requested a version, always install
        if not requested_ver:
            return True
        #Now we need to detect whether or not we need to upgrade
        ver_cmp = RpmTool.compare_rpm_versions(requested_ver, installed_version)
        if ver_cmp > 0:
            log.debug("Requested version %s is greater than installed version %s, so we will upgrade", requested_ver, installed_version)
            return True
        else:
            log.debug("Requested version %s is NOT greater than installed version %s, so we will NOT upgrade", requested_ver, installed_version)
            return False

    def _pkg_installed(self, pkg):
        result = ProcessHelper(['yum', '-C', '-y', 'list', 'installed', pkg]).call()

        return result.returncode == 0

    def _pkg_available(self, pkg):
        # --showduplicates seems to be required to see downgradable versions when running yum non-interactively
        # but not when running interactively -- but we rarely run interactively
        result = ProcessHelper(['yum', '-C', '-y', '--showduplicates', 'list', 'available', pkg]).call()

        return result.returncode == 0

class RpmTool(object):

    def apply(self, action, auth_config=None):
        """
        Install a set of packages via RPM, returning the packages actually installed or updated.

        Arguments:
        action -- a dict of package name to version; version can be empty, a single string or a list of strings

        Exceptions:
        ToolError -- on expected failures (such as a non-zero exit code)
        """

        pkgs_changed = []

        if not action.keys():
            log.debug("No packages installed for RPM")
            return pkgs_changed

        pkgs = []

        for pkg_name, loc in action.iteritems():
            pkgs_to_process = ([loc] if isinstance(loc, basestring) else loc)
            pkgs_filtered = [pkg_key for pkg_key in pkgs_to_process if self._package_filter(pkg_key)]
            if pkgs_filtered:
                pkgs.extend(pkgs_filtered)
                pkgs_changed.append(pkg_name)


        if not pkgs:
            log.info("All RPMs were already installed")
            return []

        log.debug("Installing %s via RPM", pkgs)

        result = ProcessHelper(['rpm', '-U', '--quiet', '--nosignature', '--replacepkgs'] + pkgs).call()

        if result.returncode:
            log.error("RPM failed. Output: %s", result.stdout)
            raise ToolError("Could not successfully install rpm packages", result.returncode)
        else:
            log.debug("RPM output: %s", result.stdout)

        return pkgs_changed

    def _package_filter(self, pkg):
        if not pkg:
            log.warn("RPM specified with no location")
            return False

        if self._is_installed(pkg):
            log.debug("Skipping RPM at %s as it is already installed", pkg)
            return False

        return True

    @classmethod
    def get_package_version(cls, pkg, is_file=True):
        """
        Given the name of an installed package or package location, return a tuple of (name, version-release)
        of either the installed package or the specified package location

        Parameters:
            - pkg: the package name/location
            - is_file : if True, pkg refers to a package location; if False, the name of an installed package
        """

        query_mode = '-qp' if is_file else '-qa'

        log.debug("Querying for version of package %s", pkg)

        query_result = ProcessHelper(['rpm', query_mode, '--queryformat', '%{NAME}|%{VERSION}-%{RELEASE}', '--nosignature', pkg], stderr=subprocess.PIPE).call()

        log.debug("RPM stdout: %s", query_result.stdout)
        log.debug("RPM stderr: %s", query_result.stderr)

        if query_result.returncode:
            log.error("Could not determine package contained by rpm at %s", pkg)
            return (None, None)

        # The output from the command is just name|version-release
        name, sep, version = query_result.stdout.strip().partition('|')

        return (name, version)

    @classmethod
    def order_versions(cls, pkg_vers):
        return sorted(pkg_vers, cmp=cls.compare_rpm_versions)

    @classmethod
    def max_version(cls, versions):
        max_ver = None
        for ver in versions:
            if cls.compare_rpm_versions(max_ver, ver) < 0:
                max_ver = ver

        return max_ver

    @classmethod
    def compare_rpm_versions(cls, first_pkg, second_pkg):
        """
        Given two package versions in form VERSION-RELEASE, (-RELEASE optional), compare them
        based on "newness" (where "greater than" equals "newer")
        """

        # Partition the RPM version strings into (VERSION, RELEASE)
        first_fields = first_pkg.split('-', 1) if first_pkg else ()
        second_fields = second_pkg.split('-', 1) if second_pkg else ()

        # Compare VERSION and then RELEASE
        for i in range(2):
            # Build a list of wholly-alpha and wholly-numeric fields; treat non-alphanumeric sequences as separators
            first_chars = re.findall('[a-zA-Z]+|[0-9]+', first_fields[i]) if i < len(first_fields) else []
            second_chars = re.findall('[a-zA-Z]+|[0-9]+', second_fields[i]) if i < len(second_fields) else []

            # Compare position by position
            for j in range(min(len(first_chars), len(second_chars))):
                c1 = first_chars[j]
                c2 = second_chars[j]
                if c1.isdigit():
                    if c2.isdigit():
                        # If both fields are numeric, compare based on int values
                        int_cmp = cmp(int(c1), int(c2))
                        if int_cmp:
                            return int_cmp
                    else:
                        # If one is alpha and one is numeric, then numeric is "greater"
                        return 1
                elif c2.isdigit():
                    # If one is alpha and one is numeric, then numeric is "greater"
                    return -1
                else:
                    # If they're both strings, just compare lexicographically
                    str_cmp = cmp(c1, c2)
                    if str_cmp:
                        return str_cmp

            # If all of the intersecting fields match, the longer string is newer
            len_cmp = cmp(len(first_chars), len(second_chars))
            if len_cmp:
                return len_cmp

        # If both VERSION and RELEASE match for both RPMs, ignoring non-alphanumeric chars, they are equal
        return 0

    def _is_installed(self, pkg):
        pkg_with_version = RpmTool.get_package_version(pkg)

        if not pkg_with_version or not pkg_with_version[0]:
            # If there's an error retrieving the version, assume we have to install it (a failure there will be terminal)
            return True

        pkg_spec = '-'.join(pkg_with_version) if pkg_with_version[1] else pkg_with_version[0]

        # rpm -q will try to find the specific RPM in the local system
        # --quiet will reduce this command to just an exit code
        test_result = ProcessHelper(['rpm', '-q', '--quiet', pkg_spec]).call()

        # if rpm -q returns 0, that means the package exists
        return test_result.returncode == 0