--- qontract_reconcile-0.10.2.dev658/tools/app_interface_reporter.py (excerpt) ---
import contextlib
import logging
import os
import textwrap
from collections.abc import Mapping, MutableMapping
from datetime import (
datetime,
)
import click
import requests
import yaml
from dateutil.relativedelta import relativedelta
from prometheus_client.parser import text_string_to_metric_families
from sretoolbox.utils.threaded import run
from reconcile import (
jenkins_base,
mr_client_gateway,
queries,
)
from reconcile.cli import (
config_file,
dry_run,
gitlab_project_id,
log_level,
threaded,
)
from reconcile.jenkins_job_builder import init_jjb
from reconcile.utils.constants import DEFAULT_THREAD_POOL_SIZE
from reconcile.utils.datetime_util import ensure_utc, utc_now
from reconcile.utils.mr import CreateAppInterfaceReporter
from reconcile.utils.runtime.environment import init_env
from reconcile.utils.secret_reader import SecretReader
CONTENT_FORMAT_VERSION = "1.0.0"
DASHDOTDB_SECRET = os.environ.get(
"DASHDOTDB_SECRET", "app-sre/dashdot/auth-proxy-production"
)
class Report:
def __init__(self, app: Mapping, date: datetime) -> None:
settings = queries.get_app_interface_settings()
self.secret_reader = SecretReader(settings=settings)
self.app = app
# standard date format
self.date = date.strftime("%Y-%m-%d")
self.report_sections: dict = {}
# promotions
self.add_report_section("promotions", self.app.get("promotions"))
# merge activities
self.add_
--- qontract_reconcile-0.10.2.dev658/tools/qontract_cli.py (excerpt) ---
#!/usr/bin/env python3
# ruff: noqa: PLC0415 - `import` should be at the top-level of a file
from __future__ import annotations
import base64
import json
import logging
import os
import re
import sys
import tempfile
import textwrap
from collections import defaultdict
from datetime import (
datetime,
timedelta,
)
from operator import itemgetter
from pathlib import Path
from statistics import median
from textwrap import dedent
from typing import TYPE_CHECKING, Any, cast
import boto3
import click
import click.core
import requests
import yaml
from gitlab.const import PipelineStatus
from rich import box
from rich import print as rich_print
from rich.console import Console, Group
from rich.prompt import Confirm
from rich.table import Table
from rich.tree import Tree
import reconcile.aus.base as aus
import reconcile.change_owners.change_log_tracking as cl
import reconcile.openshift_base as ob
import reconcile.openshift_resources_base as orb
import reconcile.prometheus_rules_tester.integration as ptr
import reconcile.terraform_resources as tfr
import reconcile.terraform_users as tfu
import reconcile.terraform_vpc_peerings as tfvpc
from reconcile import queries
from reconcile.aus.base import (
AbstractUpgradePolicy,
AdvancedUpgradeSchedulerBaseIntegration,
AdvancedUpgradeSchedulerBaseIntegrationParams,
addon_upgrade_policy_soonest_next_run,
init_addon_service_version,
)
from reconcile.aus.models import OrganizationUpgradeSpec
from reconcile.change_owner
--- qontract_reconcile-0.10.2.dev658/tools/cli_commands/erv2.py (excerpt) ---
from __future__ import annotations
import json
import os
import sys
from contextlib import contextmanager, suppress
from difflib import get_close_matches
from enum import Enum
from subprocess import CalledProcessError, run
from typing import TYPE_CHECKING, Any, Protocol
from pydantic import BaseModel
from rich import print as rich_print
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.prompt import Confirm, IntPrompt
from reconcile.external_resources.integration import get_aws_api
from reconcile.external_resources.manager import setup_factories
from reconcile.external_resources.meta import FLAG_RESOURCE_MANAGED_BY_ERV2
from reconcile.external_resources.model import (
ExternalResourceKey,
ExternalResourceModuleConfiguration,
ExternalResourcesInventory,
load_module_inventory,
)
from reconcile.external_resources.state import (
ExternalResourcesStateDynamoDB,
ResourceStatus,
)
from reconcile.typed_queries.external_resources import (
get_modules,
get_namespaces,
get_settings,
)
from reconcile.utils import gql
from reconcile.utils.exceptions import FetchResourceError
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path
from reconcile.utils.secret_reader import SecretReaderBase
UP = "\x1b[1A"
CLEAR = "\x1b[2K"
def progress_spinner() -> Progress:
"""Display shiny progress spinner."""
console = Console(record=True)
reimport contextlib
import logging
import os
import textwrap
from collections.abc import Mapping, MutableMapping
from datetime import (
datetime,
)
import click
import requests
import yaml
from dateutil.relativedelta import relativedelta
from prometheus_client.parser import text_string_to_metric_families
from sretoolbox.utils.threaded import run
from reconcile import (
jenkins_base,
mr_client_gateway,
queries,
)
from reconcile.cli import (
config_file,
dry_run,
gitlab_project_id,
log_level,
threaded,
)
from reconcile.jenkins_job_builder import init_jjb
from reconcile.utils.constants import DEFAULT_THREAD_POOL_SIZE
from reconcile.utils.datetime_util import ensure_utc, utc_now
from reconcile.utils.mr import CreateAppInterfaceReporter
from reconcile.utils.runtime.environment import init_env
from reconcile.utils.secret_reader import SecretReader
CONTENT_FORMAT_VERSION = "1.0.0"
DASHDOTDB_SECRET = os.environ.get(
"DASHDOTDB_SECRET", "app-sre/dashdot/auth-proxy-production"
)
class Report:
def __init__(self, app: Mapping, date: datetime) -> None:
settings = queries.get_app_interface_settings()
self.secret_reader = SecretReader(settings=settings)
self.app = app
# standard date format
self.date = date.strftime("%Y-%m-%d")
self.report_sections: dict = {}
# promotions
self.add_report_section("promotions", self.app.get("promotions"))
# merge activities
self.add_
#!/usr/bin/env python3
# ruff: noqa: PLC0415 - `import` should be at the top-level of a file
from __future__ import annotations
import base64
import json
import logging
import os
import re
import sys
import tempfile
import textwrap
from collections import defaultdict
from datetime import (
datetime,
timedelta,
)
from operator import itemgetter
from pathlib import Path
from statistics import median
from textwrap import dedent
from typing import TYPE_CHECKING, Any, cast
import boto3
import click
import click.core
import requests
import yaml
from gitlab.const import PipelineStatus
from rich import box
from rich import print as rich_print
from rich.console import Console, Group
from rich.prompt import Confirm
from rich.table import Table
from rich.tree import Tree
import reconcile.aus.base as aus
import reconcile.change_owners.change_log_tracking as cl
import reconcile.openshift_base as ob
import reconcile.openshift_resources_base as orb
import reconcile.prometheus_rules_tester.integration as ptr
import reconcile.terraform_resources as tfr
import reconcile.terraform_users as tfu
import reconcile.terraform_vpc_peerings as tfvpc
from reconcile import queries
from reconcile.aus.base import (
AbstractUpgradePolicy,
AdvancedUpgradeSchedulerBaseIntegration,
AdvancedUpgradeSchedulerBaseIntegrationParams,
addon_upgrade_policy_soonest_next_run,
init_addon_service_version,
)
from reconcile.aus.models import OrganizationUpgradeSpec
from reconcile.change_owner
from __future__ import annotations
import json
import os
import sys
from contextlib import contextmanager, suppress
from difflib import get_close_matches
from enum import Enum
from subprocess import CalledProcessError, run
from typing import TYPE_CHECKING, Any, Protocol
from pydantic import BaseModel
from rich import print as rich_print
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.prompt import Confirm, IntPrompt
from reconcile.external_resources.integration import get_aws_api
from reconcile.external_resources.manager import setup_factories
from reconcile.external_resources.meta import FLAG_RESOURCE_MANAGED_BY_ERV2
from reconcile.external_resources.model import (
ExternalResourceKey,
ExternalResourceModuleConfiguration,
ExternalResourcesInventory,
load_module_inventory,
)
from reconcile.external_resources.state import (
ExternalResourcesStateDynamoDB,
ResourceStatus,
)
from reconcile.typed_queries.external_resources import (
get_modules,
get_namespaces,
get_settings,
)
from reconcile.utils import gql
from reconcile.utils.exceptions import FetchResourceError
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path
from reconcile.utils.secret_reader import SecretReaderBase
UP = "\x1b[1A"
CLEAR = "\x1b[2K"
def progress_spinner() -> Progress:
"""Display shiny progress spinner."""
console = Console(record=True)
re