HEX
Server: LiteSpeed
System: Linux ip-172-31-76-142.ec2.internal 4.14.158-129.185.amzn2.x86_64 #1 SMP Tue Dec 24 03:15:32 UTC 2019 x86_64
User: 69b4844ae61d4e92bf26ad98af552775 (1065)
PHP: 7.2.27
Disabled: exec,passthru,shell_exec,system,eval
Upload Files
File: //lib/python2.7/site-packages/cwlogs/filter.py
#  Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#  Licensed under the Amazon Software License (the "License").
#  You may not use this file except in compliance with the License.
#  A copy of the License is located at
#
#  http://aws.amazon.com/asl/
#
#  or in the "license" file accompanying this file. This file is distributed
#  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#  express or implied. See the License for the specific language governing
#  permissions and limitations under the License.

from threading import Event
from datetime import datetime
import logging
from dateutil.tz import tzlocal
from six.moves import queue as Queue

from awscli.errorhandler import ServerError
from awscli.customizations.commands import BasicCommand
import cwlogs
from cwlogs import utils
from cwlogs.utils import print_stdout
from cwlogs.utils import print_stderr
from cwlogs.retry import ExponentialBackoff
from cwlogs.threads import BaseThread, ExitChecker
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

def initialize(cli):
    """
    The entry point for CloudWatch Logs filter command.
    """
    cli.register('building-command-table.logs', inject_commands)


def inject_commands(command_table, session, **kwargs):
    """
    Called when the CloudWatch Logs command table is being built.
    Used to inject new high level commands into the command list.
    These high level commands must not collide with existing
    low-level API call names.
    """
    command_table['filter'] = LogsFilterCommand(session)


class LogsFilterCommand(BasicCommand):
    """
    """
    NAME = 'filter'
    DESCRIPTION = 'Filters log events that match a filter pattern.'
    SYNOPSIS = ''
    EXAMPLES = BasicCommand.FROM_FILE('logs', 'filter.rst', root_module=cwlogs)

    ARG_TABLE = [
        {'name': 'log-group-name', 'required': True,
         'help_text': 'Specifies the log group.'},
        {'name': 'log-stream-names',
         'schema': {'type': 'array', 'items': {'type': 'string'}},
         'help_text': 'Specifies the log streams to search within the group. '
                       'All the streams in the group are searched by default. '
                       'Syntax is: "streamName1" "streamName2" "streamName3" ...'},
        {'name': 'filter-pattern',
         'help_text': 'The filter pattern to filter for.'},
        {'name': 'interleaved',
          'action': 'store_true',
         'help_text': 'Whether or not these events are interleaved.'},
        {'name': 'start-time',
         'help_text': 'Optional value to specify start time of log events. '
                      'The value has to be in ISO8601 format '
                      '(YYYY-MM-DDThh:mm:ssZ). e.g. 2013-12-23T14:01:00Z'},
        {'name': 'end-time',
         'help_text': 'Optional value to specify end time of log events. '
                      'The value has to be in ISO8601 format '
                      '(YYYY-MM-DDThh:mm:ssZ). e.g. 2013-12-23T14:01:00Z'},
        {'name': 'output-format', 'default': '{logStreamName} {timestamp} {message}',
         'help_text': 'Optional value to specify the output format '
                      'of log events. Defaults to "{logStreamName} {timestamp} {message}". '
                      'Valid keys are logStreamName, timestamp, message and ingestionTime.'},
    ]

    UPDATE = False

    QUEUE_SIZE = 10

    def _run_main(self, args, parsed_globals):
        logging.basicConfig(level=logging.INFO)
        for handler in logging.root.handlers:
            handler.addFilter(logging.Filter(__name__))

        client_args = {
            'region_name': None,
            'verify': None
        }
        if parsed_globals.region is not None:
            client_args['region_name'] = parsed_globals.region
        if parsed_globals.verify_ssl is not None:
            client_args['verify'] = parsed_globals.verify_ssl
        if parsed_globals.endpoint_url is not None:
            client_args['endpoint_url'] = parsed_globals.endpoint_url
        # Initialize services and append cwlogs version to user agent
        self._session.user_agent_extra += 'cwlogs/' + cwlogs.__version__
        self.logs = self._session.create_client('logs', **client_args)
        # This unregister call will go away once the client switchover
        # is done, but for now we're relying on Logs catching a ClientError
        # when we check if a bucket exists, so we need to ensure the
        # botocore ClientError is raised instead of the CLI's error handler.
        self.logs.meta.events.unregister('after-call', unique_id='awscli-error-handler')
        # Run the command and report success
        self._call(args, parsed_globals)

        return 0

    def _call(self, options, parsed_globals):
        if options.start_time:
            try:
                start_time_in_ms = utils.iso8601_to_epoch(options.start_time)
            except ValueError as e:
                raise ValueError('%s. You must pass a valid start time to '
                                 ' --start-time' % e)
        else:
            start_time_in_ms = None

        if options.end_time:
            try:
                end_time_in_ms = utils.iso8601_to_epoch(options.end_time)
            except ValueError as e:
                raise ValueError('%s. You must pass a valid end time to '
                                 ' --end-time' % e)
        else:
            end_time_in_ms = None

        interleaved = True if options.interleaved is not None and options.interleaved is True else False

        logger.debug('Going to start event puller...')
        threads = []
        stop_flag = Event()
        queue = Queue.Queue(self.QUEUE_SIZE)
        puller = EventsPuller(stop_flag, queue,
                              self.logs,
                              options.log_group_name,
                              options.log_stream_names,
                              options.filter_pattern,
                              start_time_in_ms,
                              end_time_in_ms,
                              interleaved)
        puller.start()
        threads.append(puller)
        logger.debug('Going to start event renderer...')
        renderer = EventsRenderer(stop_flag, queue,
                                  options.output_format)
        renderer.start()
        threads.append(renderer)

        self._wait_on_exit(stop_flag)
        for thread in threads:
            try:
                thread.join()
            except:
                pass

    def _wait_on_exit(self, stop_flag):
        exit_checker = ExitChecker(stop_flag)
        exit_checker.start()
        try:
            while exit_checker.is_alive() and not stop_flag.is_set():
                exit_checker.join(5)
        except KeyboardInterrupt:
            pass
        logger.debug('Shutting down...')
        stop_flag.set()
        exit_checker.join()


class EventsPuller(BaseThread):
    def __init__(self, stop_flag, queue, logs_service, log_group_name,
                 log_stream_names, filter_pattern, start_time, end_time, interleaved):
        super(EventsPuller, self).__init__(stop_flag)
        self.queue = queue
        self.logs_service = logs_service
        self.log_group_name = log_group_name
        self.log_stream_names = log_stream_names
        self.filter_pattern = filter_pattern
        self.start_time = start_time
        self.end_time = end_time
        self.next_token = None
        self.interleaved = interleaved

    def on_run_failed(self, exception):
        self.stop_flag.set()
        logger.error("FilterLogEvents request failed")

    @ExponentialBackoff(logger=logger, exception=ClientError)
    def _run(self):
        params = dict(logGroupName=self.log_group_name,
                      interleaved=self.interleaved)

        if self.log_stream_names:
            params['logStreamNames'] = self.log_stream_names

        if self.filter_pattern is not None:
            params['filterPattern'] = self.filter_pattern

        while True:
            if self.stop_flag.is_set():
                logger.debug('Puller is leaving...')
                break

            logger.debug('Filtering log events with [%s] [%s] [%s] [%s]' %
                         (self.log_group_name,
                          self.log_stream_names,
                          self.filter_pattern,
                          self.next_token))

            if self.next_token:
                params['nextToken'] = self.next_token
            if self.start_time:
                params['startTime'] = self.start_time
            if self.end_time:
                params['endTime'] = self.end_time
            log_events_response = self.logs_service.filter_log_events(**params)
            if log_events_response:
                logger.debug('Adding %d log events to the queue' %
                             len(log_events_response.get('events')))
                self.next_token = log_events_response.get('nextToken')
                self.queue.put(EventBatch(self.log_group_name,
                                          self.log_stream_names,
                                          log_events_response.get('events'),
                                          log_events_response.get('searchedLogStreams')))
            if log_events_response.get('nextToken') is None:
                logger.debug('Filter complete.')
                self.stop_flag.set()
            else:
                logger.debug('Polling again.')
                self.stop_flag.wait(1/1000.0)


class EventsRenderer(BaseThread):
    def __init__(self, stop_flag, queue, output_format):
        super(EventsRenderer, self).__init__(stop_flag)
        self.queue = queue
        self.output_format = u''.join(output_format)

    def _run(self):
        total_searched_streams = set()
        while True:
            try:
                event_batch = self.queue.get(False)
                for event in event_batch.events:
                    revised_event = event.copy()
                    if event.get('timestamp') is not None:
                        revised_event['timestamp'] = \
                            datetime.fromtimestamp(event['timestamp']/1000.0,
                                                   tzlocal())
                    if event.get('ingestionTime') is not None:
                        revised_event['ingestionTime'] = \
                            datetime.fromtimestamp(
                                event['ingestionTime']/1000.0,
                                tzlocal())
                    print_stdout(self.output_format.format(**revised_event))
                if event_batch.searched_log_streams:
                    for searched_log_stream in event_batch.searched_log_streams:
                        total_searched_streams.add(searched_log_stream.get('logStreamName'))
                print_stderr("Searching for more data...")
            except Queue.Empty:
                if self.stop_flag.is_set():
                    print_stderr('{0} log streams searched in log group {1}:'.format(len(total_searched_streams), event_batch.log_group_name))
                    print_stderr(', '.join(sorted(total_searched_streams)))
                    logger.debug('Renderer is leaving...')
                    break
                else:
                    self.stop_flag.wait(0.1)


class EventBatch(object):
    def __init__(self, log_group_name, log_stream_names, events, searched_log_streams):
        self.log_group_name = log_group_name
        self.log_stream_names = log_stream_names
        self.events = events
        self.searched_log_streams = searched_log_streams