File: //lib/python2.7/site-packages/cfnbootstrap/update_hooks.py
#==============================================================================
# Copyright 2011 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#==============================================================================
import threading
from cfnbootstrap import util
from cfnbootstrap.aws_client import AwsQueryError
from cfnbootstrap.cfn_client import CloudFormationClient
from cfnbootstrap.sqs_client import SQSClient
from cfnbootstrap.util import ProcessHelper
from threading import Timer
import cfnbootstrap.json_file_manager as JsonFileManager
import ConfigParser
import calendar
import datetime
import logging
import os
import random
import socket
import subprocess
import tempfile
import time
from util import Credentials
try:
import simplejson as json
except ImportError:
import json
log = logging.getLogger("cfn.hup")
def parse_config(config_path):
main_conf_path = os.path.join(config_path, 'cfn-hup.conf')
if not os.path.isfile(main_conf_path):
raise ValueError("Could not find main configuration at %s" % main_conf_path)
main_config = ConfigParser.SafeConfigParser()
main_config.read(main_conf_path)
if not main_config.has_option('main', 'stack'):
raise ValueError("[main] section must contain stack option")
stack = main_config.get('main', 'stack')
if main_config.has_option('main', 'role'):
credentials = util.RoleBasedCredentials(main_config.get('main', 'role'))
elif main_config.has_option('main', 'credential-file'):
try:
credentials = util.extract_credentials(main_config.get('main', 'credential-file'))
except IOError, e:
raise ValueError("Could not retrieve credentials from file:\n\t%s" % e.strerror)
else:
credentials = Credentials('', '')
additional_hooks_path = os.path.join(config_path, 'hooks.d')
additional_files = []
if os.path.isdir(additional_hooks_path):
for hook_file in os.listdir(additional_hooks_path):
if os.path.isfile(os.path.join(additional_hooks_path, hook_file)):
additional_files.append(os.path.join(additional_hooks_path, hook_file))
hooks_config = ConfigParser.SafeConfigParser()
files_read = hooks_config.read([os.path.join(config_path, 'hooks.conf')] + additional_files)
if not files_read:
raise ValueError("No hook configurations found at %s or %s." % (os.path.join(config_path, 'hooks.conf'), additional_hooks_path))
hooks = []
cmd_hooks = []
for section in hooks_config.sections():
if not hooks_config.has_option(section, 'triggers'):
logging.error("No triggers specified for hook %s", section)
continue
triggers = [s.strip() for s in hooks_config.get(section, 'triggers').split(',')]
if not hooks_config.has_option(section, 'path'):
logging.error("No path specified for hook %s", section)
continue
if not hooks_config.has_option(section, 'action'):
logging.error("No action specified for hook %s", section)
continue
runas = None
if hooks_config.has_option(section, 'runas'):
runas = hooks_config.get(section, 'runas').strip()
hook = Hook(section,
triggers,
hooks_config.get(section, 'path').strip(),
hooks_config.get(section, 'action'),
runas)
if hook.is_cmd_hook():
if hooks_config.has_option(section, 'singleton'):
hook.singleton = util.interpret_boolean(hooks_config.get(section, 'singleton'))
if hooks_config.has_option(section, 'send_result'):
hook.send_result = util.interpret_boolean(hooks_config.get(section, 'send_result'))
cmd_hooks.append(hook)
else:
hooks.append(hook)
if not hooks and not cmd_hooks:
raise ValueError("No valid hooks found")
region = 'us-east-1'
if main_config.has_option('main', 'region'):
region = main_config.get('main', 'region')
cfn_url = CloudFormationClient.endpointForRegion(region)
if main_config.has_option('main', 'url'):
cfn_url = main_config.get('main', 'url')
cfn_client = CloudFormationClient(credentials, cfn_url, region)
if main_config.has_option('main', 'multi-threaded'):
value = main_config.get('main', 'multi-threaded')
multi_threaded = util.interpret_boolean(value)
else:
multi_threaded = True
if hooks:
processor = HookProcessor(hooks, stack, cfn_client)
else:
processor = None
if cmd_hooks:
sqs_url = SQSClient.endpointForRegion(region)
if main_config.has_option('main', 'sqs_url'):
sqs_url = main_config.get('main', 'sqs_url')
sqs_client = SQSClient(credentials, sqs_url, region=region)
cmd_processor = CmdProcessor(stack, cmd_hooks, sqs_client,
CloudFormationClient(credentials, cfn_url, region),
multi_threaded)
else:
cmd_processor = None
return (main_config, processor, cmd_processor)
class FatalUpdateError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class InFlightStatusError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class Hook(object):
def __init__(self, name, triggers, path, action, runas):
self._triggers = triggers[:]
self._path = path
self._action = action
self._name = name
self._runas = runas
self.singleton = False
self.send_result = True
@property
def triggers(self):
return self._triggers
@property
def path(self):
return self._path
@property
def action(self):
return self._action
@property
def name(self):
return self._name
@property
def runas(self):
return self._runas
def is_cmd_hook(self):
return self._triggers == ['on.command']
class AutoRefreshingCredentialsProvider(object):
def __init__(self, cfn_client, stack_name, listener_id):
self._cfn_client = cfn_client
self._stack_name = stack_name
self._listener_id = listener_id
self._creds = None
self._last_timer = None
self.listener_expired = False
self._refresh_lock = threading.Lock()
def refresh(self):
with self._refresh_lock:
log.info("Refreshing listener credentials")
if self._last_timer:
self._last_timer.cancel()
try:
self._creds = self._cfn_client.get_listener_credentials(self._stack_name, self._listener_id)
self.listener_expired = False
except IOError, e:
if hasattr(e, 'error_code') and 'ListenerExpired' == e.error_code:
self.listener_expired = True
log.exception("Listener expired")
else:
self.listener_expired = False
log.exception("IOError caught while refreshing credentials")
except Exception:
self.listener_expired = False
log.exception("Exception refreshing credentials")
now = time.time()
expiration = calendar.timegm(self._creds.expiration.utctimetuple()) if self._creds else now
remaining = expiration - now
if remaining > 30 * 60:
next_refresh = min(2 * 60 * 60, remaining / 2)
else:
next_refresh = 60 * random.random()
log.info("Scheduling next credential refresh in %s seconds", next_refresh)
t = Timer(next_refresh, self.refresh)
t.daemon = True
t.start()
self._last_timer = t
def creds_expired(self):
return self._creds and self._creds.expiration < datetime.datetime.utcnow()
@property
def credentials(self):
for i in range(3):
if self._creds:
break
self.refresh()
if not self._creds:
raise ValueError('Could not retrieve listener credentials')
return self._creds
class CmdProcessor(object):
"""Processes CommandService hooks"""
def __init__(self, stack_name, hooks, sqs_client, cfn_client, multi_threaded):
"""Takes a list of Hook objects and processes them"""
self.stack_name = stack_name
self.hooks = self._hooks_by_path(hooks)
self.sqs_client = sqs_client
self.cfn_client = cfn_client
self.multi_threaded = multi_threaded
if not self.multi_threaded:
log.debug("Enabled single threading mode.");
if util.is_ec2():
self.listener_id = util.get_instance_id()
elif not cfn_client.using_instance_identity:
self.listener_id = socket.getfqdn()
else:
raise ValueError("Could not retrieve instance id")
self._creds_provider = AutoRefreshingCredentialsProvider(self.cfn_client, self.stack_name, self.listener_id)
self.queue_url = None
self._create_storage_dir()
self._runfile = os.path.join(self.storage_dir, 'commands_run.json')
self._commands_run = self._load_commands_run()
if not 'by_id' in self._commands_run:
self._commands_run['by_id'] = {}
if not 'by_day' in self._commands_run:
self._commands_run['by_day'] = {}
self._currently_running = set()
self._currently_running_lock = threading.RLock()
self._commands_run_lock = threading.RLock()
def _load_commands_run(self):
if os.path.isfile(self._runfile):
with file(self._runfile, 'r') as f:
try:
return json.load(f)
except Exception:
log.exception("Could not load previously run commands")
os.remove(self._runfile)
return {}
else:
return {}
def is_registered(self):
return self.queue_url is not None and not self._creds_provider.listener_expired
def creds_expired(self):
return self._creds_provider.creds_expired()
def register(self):
try:
self.queue_url = self.cfn_client.register_listener(self.stack_name, self.listener_id).queue_url
except AwsQueryError, e:
if e.error_code == 'ValidationError':
raise FatalUpdateError('Terminal failure registering listener: %s' % str(e))
raise
self._creds_provider.listener_expired = False
def _create_storage_dir(self):
if os.name == 'nt':
self.storage_dir = os.path.expandvars(r'${SystemDrive}\cfn\cfn-hup\data')
else:
self.storage_dir = '/var/lib/cfn-hup/data'
if not os.path.isdir(self.storage_dir):
log.debug("Creating %s", self.storage_dir)
try:
os.makedirs(self.storage_dir, 0700)
except OSError:
log.warn("Could not create %s; using temporary directory", self.storage_dir)
self.storage_dir = tempfile.mkdtemp()
def _command_completed(self, msg):
now = datetime.datetime.utcnow()
cmd_time = now.replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
with self._commands_run_lock:
self._commands_run['by_id'][self._get_id(msg)] = True
if cmd_time not in self._commands_run['by_day']:
self._commands_run['by_day'][cmd_time] = []
self._commands_run['by_day'][cmd_time].append(self._get_id(msg))
try:
keys_to_delete = []
for key in self._commands_run['by_day'].iterkeys():
if now - datetime.datetime.strptime(key, '%Y-%m-%dT%H:%M:%S') > datetime.timedelta(days=2):
for cmd_id in self._commands_run['by_day'][key]:
del self._commands_run['by_id'][cmd_id]
keys_to_delete.append(key)
for key in keys_to_delete:
del self._commands_run['by_day'][key]
self._write_commands_run()
except Exception:
log.exception('Could not write runfile to %s', self._runfile)
def _write_commands_run(self):
with file(self._runfile, 'w') as f:
json.dump(self._commands_run, f)
def _delete_message(self, receipt_handle):
try:
self.sqs_client.delete_message(self.queue_url, receipt_handle, request_credentials = self._creds_provider.credentials)
except Exception:
log.exception("Could not delete message for handle %s", receipt_handle)
def _get_id(self, msg):
return msg['DispatcherId'] + '|' + msg['CommandName'] + '|' + msg['InvocationId']
def _already_run(self, msg):
with self._commands_run_lock:
return self._get_id(msg) in self._commands_run['by_id']
def _parse(self, msg):
try:
return json.loads(json.loads(msg.body)['Message'])
except Exception:
log.exception("Received invalid message")
return None
def process(self):
if self.queue_url is None:
raise FatalUpdateError("Cannot process command hooks before registering")
threads = []
try:
for msg in self.sqs_client.receive_message(self.queue_url, request_credentials = self._creds_provider.credentials, wait_time=20):
cmd_msg = self._parse(msg)
if not cmd_msg:
log.info("Invalid message, deleting: %s", msg)
self._delete_message(msg.receipt_handle)
continue
if self._already_run(cmd_msg):
log.info("Already ran %s, deleting", self._get_id(cmd_msg))
self._delete_message(msg.receipt_handle)
continue
with self._currently_running_lock:
if self._get_id(cmd_msg) not in self._currently_running and not self._already_run(cmd_msg):
self._currently_running.add(self._get_id(cmd_msg))
if self.multi_threaded:
thread = threading.Thread(target=self._process_msg, args=(cmd_msg, msg.receipt_handle))
thread.daemon = True
threads.append(thread)
thread.start()
else:
self._process_msg(cmd_msg, msg.receipt_handle)
except FatalUpdateError:
raise
except IOError, e:
if hasattr(e, 'error_code') and 'AWS.SimpleQueueService.NonExistentQueue' == e.error_code:
self.queue_url = None
log.exception("IOError caught while processing messages")
except Exception:
log.exception("Exception caught while processing messages")
return threads
def _process_msg(self, cmd_msg, receipt_handle):
log.debug("Processing message: %s", cmd_msg)
delete = False
try:
log.debug("Command message: %s", cmd_msg)
expiration = datetime.datetime.utcfromtimestamp(int(cmd_msg['Expiration']) / 1000)
if expiration < datetime.datetime.utcnow():
log.info("Invocation %s of command %s for stack %s expired at %s; skipping",
cmd_msg['InvocationId'], cmd_msg['CommandName'], cmd_msg['DispatcherId'],
expiration.isoformat())
delete = True
else:
log.info("Received command %s (invocation id: %s)", cmd_msg['CommandName'], cmd_msg['InvocationId'])
hook_to_run = self.hooks.get(cmd_msg['CommandName'])
if not hook_to_run or self._run_hook(hook_to_run, cmd_msg):
self._command_completed(cmd_msg)
delete = True
except (ValueError, KeyError):
log.exception("Invalid message received; deleting it")
delete = True
except Exception:
log.exception("Unexpected exception running command")
finally:
if delete:
self._delete_message(receipt_handle)
with self._currently_running_lock:
self._currently_running.remove(self._get_id(cmd_msg))
def _run_hook(self, hook, cmd_msg):
if hook.singleton:
log.info("Hook %s is configured to run as a singleton", hook.name)
leader = self.cfn_client.elect_command_leader(self.stack_name,
cmd_msg['CommandName'],
cmd_msg['InvocationId'],
self.listener_id)
if leader == self.listener_id:
log.info("This listener is the leader. Continuing with action")
else:
log.info("This listener is not the leader; %s is the leader.", leader)
return True
action_env = self._get_environment(cmd_msg)
result_queue = cmd_msg['ResultQueue']
log.info("Running action for %s", hook.name)
log.debug("Action environment: %s", action_env)
action = hook.action
if hook.runas:
if os.name == 'posix':
action = ['su', hook.runas, '-c', action]
else:
log.warn('runas is not supported on this operating system')
result = ProcessHelper(action, stderr=subprocess.PIPE, env=action_env).call()
log.debug("Action for %s output: %s", hook.name, result.stdout if result.stdout else '<None>')
if not hook.send_result:
return True
result_msg = { 'DispatcherId' : cmd_msg['DispatcherId'],
'InvocationId' : cmd_msg['InvocationId'],
'CommandName' : cmd_msg['CommandName'],
'Status' : "FAILURE" if result.returncode else "SUCCESS",
'ListenerId' : self.listener_id }
if result.returncode:
result_stderr = result.stderr.rstrip()
log.warn("Action for %s exited with %s, returning FAILURE", hook.name, result.returncode)
result_msg['Message'] = result_stderr if len(result_stderr) <= 1024 else result_stderr[0:500] + '...' + result_stderr[-500:]
else:
result_stdout = result.stdout.rstrip()
if len(result_stdout) > 1024:
log.error("stdout for %s was greater than 1024 in length, which is not allowed", hook.name)
result_msg['Status'] = 'FAILURE'
result_msg['Message'] = 'Result data was longer than 1024 bytes. Started with: ' + result_stdout[0:100]
else:
log.info("Action for %s succeeded, returning SUCCESS", hook.name)
result_msg['Data'] = result_stdout
try:
self.sqs_client.send_message(result_queue, json.dumps(result_msg), request_credentials=self._creds_provider.credentials)
except Exception:
log.exception('Error sending result; will leave message in queue')
return False
return True
def _hooks_by_path(self, hooks):
ret_hooks = {}
for hook in hooks:
if hook.path in ret_hooks:
raise FatalUpdateError("Multiple hooks for the same command (%s)" % hook.path)
ret_hooks[hook.path] = hook
return ret_hooks
def _get_environment(self, cmd_msg):
action_env = dict(os.environ)
if 'Data' in cmd_msg:
action_env['CMD_DATA'] = cmd_msg['Data']
action_env['INVOCATION_ID'] = cmd_msg['InvocationId']
action_env['DISPATCHER_ID'] = cmd_msg['DispatcherId']
action_env['CMD_NAME'] = cmd_msg['CommandName']
action_env['STACK_NAME'] = self.stack_name
action_env['LISTENER_ID'] = self.listener_id
action_env['RESULT_QUEUE'] = cmd_msg['ResultQueue']
if 'EventHandle' in cmd_msg:
action_env['EVENT_HANDLE'] = cmd_msg['EventHandle']
creds = self._creds_provider.credentials
action_env['RESULT_ACCESS_KEY'] = creds.access_key
action_env['RESULT_SECRET_KEY'] = creds.secret_key
action_env['RESULT_SESSION_TOKEN'] = creds.security_token
return action_env
class HookProcessor(object):
"""Processes update hooks"""
def __init__(self, hooks, stack_name, client):
"""Takes a list of Hook objects and processes them"""
self.hooks = hooks
if os.name == 'nt':
self.dir = os.path.expandvars(r'${SystemDrive}\cfn\cfn-hup\data')
else:
self.dir = '/var/lib/cfn-hup/data'
if not os.path.isdir(self.dir):
log.debug("Creating %s", self.dir)
try:
os.makedirs(self.dir, 0700)
except OSError:
log.warn("Could not create %s; using temporary directory", self.dir)
self.dir = tempfile.mkdtemp()
JsonFileManager.create(self.dir, 'metadata_db.json')
self.client = client
self.stack_name = stack_name
self._jsonConverter = JsonFileManager.Converter([])
def process(self):
metadata = JsonFileManager.read(self.dir, 'metadata_db.json')
self._resource_cache = {}
for hook in self.hooks:
try:
self._process_hook(hook, metadata)
except FatalUpdateError:
raise
except Exception:
log.exception("Exception caught while running hook %s", hook.name)
def _process_hook(self, hook, metadata):
try:
new_data = self._retrieve_path_data(hook.path)
except InFlightStatusError:
return
old_data = self._jsonConverter.deserialize(metadata.get(hook.name + "|" + hook.path, None))
if 'post.add' in hook.triggers and not old_data and new_data:
log.info("Previous state not found; action for %s will be run", hook.name)
elif 'post.remove' in hook.triggers and old_data and not new_data:
log.info('Path %s was removed; action for %s will be run', hook.path, hook.name)
elif 'post.update' in hook.triggers and old_data and new_data and old_data != new_data:
log.info("Data has changed from previous state; action for %s will be run", hook.name)
else:
log.debug("No change in path %s for hook %s", hook.path, hook.name)
metadata[hook.name + "|" + hook.path] = self._jsonConverter.serialize(new_data)
JsonFileManager.write(self.dir, 'metadata_db.json', metadata)
return
log.info("Running action for %s", hook.name)
action_env = dict(os.environ)
env_key = self._retrieve_env_key(hook.path)
if old_data:
action_env['CFN_OLD_%s' % env_key] = self._as_string(old_data)
if new_data:
action_env['CFN_NEW_%s' % env_key] = self._as_string(new_data)
action = hook.action
if hook.runas:
action = ['su', hook.runas, '-c', action]
result = ProcessHelper(action, env=action_env).call()
if result.returncode:
log.warn("Action for %s exited with %s; will retry on next iteration", hook.name, result.returncode)
else:
metadata[hook.name + '|' + hook.path] = self._jsonConverter.serialize(new_data)
JsonFileManager.write(self.dir, 'metadata_db.json', metadata)
log.debug("Action for %s output: %s", hook.name, result.stdout if result.stdout else '<None>')
def _as_string(self, obj):
if isinstance(obj, basestring):
return obj
elif isinstance(obj, datetime.datetime):
return obj.isoformat()
return json.dumps(obj)
def _retrieve_env_key(self, path):
"""Given a hook path, return the key to append to environment variables for old/new data"""
parts = path.split('.', 3)
if len(parts) < 3:
return 'LAST_UPDATED'
elif parts[2].lower() == 'metadata':
return 'METADATA'
elif parts[2].lower() == 'physicalresourceid':
return 'PHYSICAL_RESOURCE_ID'
def _retrieve_path_data(self, path):
parts = path.split('.', 3)
if len(parts) < 2:
raise FatalUpdateError("Unsupported path: paths must be in the form Resources.<LogicalResourceId>(.Metadata|PhysicalResourceId)(.<optional Metadata subkey>). Input: %s" % path)
if parts[0].lower() != 'resources':
raise FatalUpdateError('Unsupported path: only changes to Resources are supported (path: %s)' % path)
if len(parts) == 2:
resourcePart = None
elif parts[2].lower() not in ['metadata', 'physicalresourceid']:
raise FatalUpdateError("Unsupported path: only Metadata or PhysicalResourceId can be specified after LogicalResourceId (path: %s)" % path)
else:
resourcePart = parts[2].lower()
logical_id = parts[1]
subpath = ('' if len(parts) < 4 else parts[3])
if logical_id not in self._resource_cache:
self._resource_cache[logical_id] = self.client.describe_stack_resource(logical_id, self.stack_name)
resource = self._resource_cache[logical_id]
status = resource.resourceStatus
if status and status.endswith('_IN_PROGRESS'):
log.debug("Skipping resource %s in %s as it is in status %s", logical_id, self.stack_name, status)
raise InFlightStatusError('%s in %s is in status %s' % (logical_id, self.stack_name, status))
if resourcePart == 'metadata':
if not resource.metadata:
log.warn("No metadata for %s in %s", logical_id, self.stack_name)
return None
return util.extract_value(resource.metadata, subpath)
elif 'DELETE_COMPLETE' == status:
return None
elif resourcePart == 'physicalresourceid':
return resource.physicalResourceId
else:
return resource.lastUpdated