Source code for swagger_spec_compatibility.cli.run

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import json
import sys
import typing

from six import iteritems

from swagger_spec_compatibility.cli.common import add_rules_arguments
from swagger_spec_compatibility.cli.common import CLIProtocol
from swagger_spec_compatibility.cli.common import rules
from swagger_spec_compatibility.cli.common import uri
from swagger_spec_compatibility.rules import compatibility_status
from swagger_spec_compatibility.rules import ValidationMessage
from swagger_spec_compatibility.rules.common import Level
from swagger_spec_compatibility.rules.common import RuleProtocol
from swagger_spec_compatibility.spec_utils import load_spec_from_uri


class _Namespace(CLIProtocol):
    blacklist_rules = None  # type: typing.Iterable[typing.Text]
    rules = None  # type: typing.Iterable[typing.Text]
    strict = None  # type: bool
    json_output = None  # type: bool
    old_spec = None  # type: typing.Text
    new_spec = None  # type: typing.Text


def _extract_rules_with_given_message_level(
    rules_to_messages_mapping,  # type: typing.Mapping[typing.Type[RuleProtocol], typing.Iterable[ValidationMessage]]
    level,  # type: Level
):
    # type: (...) -> typing.List[ValidationMessage]
    return [
        message
        for rule, messages in iteritems(rules_to_messages_mapping)
        for message in messages
        if message.level is level
    ]


def _print_raw_messages(messages_by_level):
    # type: (typing.Mapping[Level, typing.Iterable[ValidationMessage]]) -> None
    for level, messages in iteritems(messages_by_level):
        messages = list(messages)
        if messages:
            print(
                '{} rules:\n\t{}'.format(
                    level.name,
                    '\n\t'.join(
                        message.string_representation()
                        for message in messages
                    ),
                ),
            )


def _print_json_messages(messages_by_level):
    # type: (typing.Mapping[Level, typing.Iterable[ValidationMessage]]) -> None
    json_output = {}
    for level, messages in iteritems(messages_by_level):
        level_output = [
            message.json_representation()
            for message in messages
        ]
        if level_output:
            json_output[level.name] = level_output
    json.dump(json_output, sys.stdout)


[docs]def execute(cli_args): # type: (_Namespace) -> int rules_to_messages_mapping = compatibility_status( old_spec=load_spec_from_uri(cli_args.old_spec), new_spec=load_spec_from_uri(cli_args.new_spec), rules=rules(cli_args), ) messages_by_level = { level: _extract_rules_with_given_message_level(rules_to_messages_mapping, level) for level in Level } if cli_args.json_output: _print_json_messages(messages_by_level) else: _print_raw_messages(messages_by_level) if cli_args.strict: return 1 if any(messages_by_level.values()) else 0 else: return 1 if any(messages_by_level[Level.ERROR]) else 0
[docs]def add_sub_parser(subparsers): # type: (argparse._SubParsersAction) -> argparse.ArgumentParser run_detection_parser = subparsers.add_parser('run', help='run backward compatibility detection') add_rules_arguments(run_detection_parser) run_detection_parser.add_argument( '--strict', action='store_true', help='Convert warnings to errors', ) run_detection_parser.add_argument( '--json-output', action='store_true', help='Return machine readable json output', ) run_detection_parser.add_argument( 'old_spec', type=uri, help='Path/URI of the "old" version of the Swagger spec', ) run_detection_parser.add_argument( 'new_spec', type=uri, help='Path/URI of the "new" version of the Swagger spec', ) return run_detection_parser