2019-07-15 12:14:27 -07:00
#!/usr/bin/env python3
# coding=utf-8
""" MILC - A CLI Framework
PYTHON_ARGCOMPLETE_OK
MILC is an opinionated framework for writing CLI apps . It optimizes for the
most common unix tool pattern - small tools that are run from the command
line but generally do not feature any user interaction while they run .
For more details see the MILC documentation :
< https : / / github . com / clueboard / milc / tree / master / docs >
"""
from __future__ import division , print_function , unicode_literals
import argparse
import logging
import os
import re
2019-09-22 13:25:33 -07:00
import shlex
2019-07-15 12:14:27 -07:00
import sys
from decimal import Decimal
2019-11-18 14:54:50 -08:00
from pathlib import Path
2019-07-15 12:14:27 -07:00
from tempfile import NamedTemporaryFile
from time import sleep
try :
from ConfigParser import RawConfigParser
except ImportError :
from configparser import RawConfigParser
try :
import thread
import threading
except ImportError :
thread = None
import argcomplete
import colorama
2019-09-22 13:25:33 -07:00
from appdirs import user_config_dir
# Disable logging until we can configure it how the user wants
2019-11-18 14:54:50 -08:00
logging . basicConfig ( stream = os . devnull )
2019-07-15 12:14:27 -07:00
# Log Level Representations
EMOJI_LOGLEVELS = {
' CRITICAL ' : ' {bg_red} {fg_white} ¬_¬ {style_reset_all} ' ,
' ERROR ' : ' {fg_red} ☒ {style_reset_all} ' ,
' WARNING ' : ' {fg_yellow} ⚠ {style_reset_all} ' ,
' INFO ' : ' {fg_blue} ℹ {style_reset_all} ' ,
' DEBUG ' : ' {fg_cyan} ☐ {style_reset_all} ' ,
' NOTSET ' : ' {style_reset_all} ¯ \\ _(o_o)_/¯ '
}
EMOJI_LOGLEVELS [ ' FATAL ' ] = EMOJI_LOGLEVELS [ ' CRITICAL ' ]
EMOJI_LOGLEVELS [ ' WARN ' ] = EMOJI_LOGLEVELS [ ' WARNING ' ]
2019-09-22 13:25:33 -07:00
UNICODE_SUPPORT = sys . stdout . encoding . lower ( ) . startswith ( ' utf ' )
2019-07-15 12:14:27 -07:00
# ANSI Color setup
# Regex was gratefully borrowed from kfir on stackoverflow:
# https://stackoverflow.com/a/45448194
ansi_regex = r ' \ x1b( ' \
r ' ( \ [ \ ?? \ d+[hl])| ' \
r ' ([=<>a-kzNM78])| ' \
r ' ([ \ ( \ )][a-b0-2])| ' \
r ' ( \ [ \ d { 0,2}[ma-dgkjqi])| ' \
r ' ( \ [ \ d+; \ d+[hfy]?)| ' \
r ' ( \ [;?[hf])| ' \
r ' (#[3-68])| ' \
r ' ([01356]n)| ' \
r ' (O[mlnp-z]?)| ' \
r ' (/Z)| ' \
r ' ( \ d+)| ' \
r ' ( \ [ \ ? \ d; \ d0c)| ' \
r ' ( \ d; \ dR)) '
ansi_escape = re . compile ( ansi_regex , flags = re . IGNORECASE )
ansi_styles = (
( ' fg ' , colorama . ansi . AnsiFore ( ) ) ,
( ' bg ' , colorama . ansi . AnsiBack ( ) ) ,
( ' style ' , colorama . ansi . AnsiStyle ( ) ) ,
)
ansi_colors = { }
for prefix , obj in ansi_styles :
for color in [ x for x in obj . __dict__ if not x . startswith ( ' _ ' ) ] :
ansi_colors [ prefix + ' _ ' + color . lower ( ) ] = getattr ( obj , color )
def format_ansi ( text ) :
""" Return a copy of text with certain strings replaced with ansi.
"""
# Avoid .format() so we don't have to worry about the log content
for color in ansi_colors :
text = text . replace ( ' { %s } ' % color , ansi_colors [ color ] )
return text + ansi_colors [ ' style_reset_all ' ]
class ANSIFormatter ( logging . Formatter ) :
""" A log formatter that inserts ANSI color.
"""
def format ( self , record ) :
msg = super ( ANSIFormatter , self ) . format ( record )
return format_ansi ( msg )
class ANSIEmojiLoglevelFormatter ( ANSIFormatter ) :
2019-09-22 13:25:33 -07:00
""" A log formatter that makes the loglevel an emoji on UTF capable terminals.
2019-07-15 12:14:27 -07:00
"""
def format ( self , record ) :
2019-09-22 13:25:33 -07:00
if UNICODE_SUPPORT :
record . levelname = EMOJI_LOGLEVELS [ record . levelname ] . format ( * * ansi_colors )
2019-07-15 12:14:27 -07:00
return super ( ANSIEmojiLoglevelFormatter , self ) . format ( record )
class ANSIStrippingFormatter ( ANSIFormatter ) :
""" A log formatter that strips ANSI.
"""
def format ( self , record ) :
msg = super ( ANSIStrippingFormatter , self ) . format ( record )
return ansi_escape . sub ( ' ' , msg )
class Configuration ( object ) :
""" Represents the running configuration.
This class never raises IndexError , instead it will return None if a
section or option does not yet exist .
"""
def __contains__ ( self , key ) :
return self . _config . __contains__ ( key )
def __iter__ ( self ) :
return self . _config . __iter__ ( )
def __len__ ( self ) :
return self . _config . __len__ ( )
def __repr__ ( self ) :
return self . _config . __repr__ ( )
def keys ( self ) :
return self . _config . keys ( )
def items ( self ) :
return self . _config . items ( )
def values ( self ) :
return self . _config . values ( )
def __init__ ( self , * args , * * kwargs ) :
self . _config = { }
2019-09-22 13:25:33 -07:00
def __getattr__ ( self , key ) :
return self . __getitem__ ( key )
2019-07-15 12:14:27 -07:00
def __getitem__ ( self , key ) :
""" Returns a config section, creating it if it doesn ' t exist yet.
"""
if key not in self . _config :
2019-09-22 13:25:33 -07:00
self . __dict__ [ key ] = self . _config [ key ] = ConfigurationSection ( self )
2019-07-15 12:14:27 -07:00
return self . _config [ key ]
def __setitem__ ( self , key , value ) :
self . __dict__ [ key ] = value
self . _config [ key ] = value
def __delitem__ ( self , key ) :
if key in self . __dict__ and key [ 0 ] != ' _ ' :
del self . __dict__ [ key ]
2019-09-22 13:25:33 -07:00
if key in self . _config :
del self . _config [ key ]
2019-07-15 12:14:27 -07:00
2019-09-22 13:25:33 -07:00
class ConfigurationSection ( Configuration ) :
def __init__ ( self , parent , * args , * * kwargs ) :
super ( ConfigurationSection , self ) . __init__ ( * args , * * kwargs )
self . parent = parent
2019-07-15 12:14:27 -07:00
def __getitem__ ( self , key ) :
2019-09-22 13:25:33 -07:00
""" Returns a config value, pulling from the `user` section as a fallback.
2019-11-24 20:24:47 +01:00
This is called when the attribute is accessed either via the get method or through [ ] index .
2019-07-15 12:14:27 -07:00
"""
2020-01-12 11:21:49 +01:00
if key in self . _config and self . _config . get ( key ) is not None :
2019-09-22 13:25:33 -07:00
return self . _config [ key ]
2019-07-15 12:14:27 -07:00
2019-09-22 13:25:33 -07:00
elif key in self . parent . user :
return self . parent . user [ key ]
return None
2019-07-15 12:14:27 -07:00
2019-11-24 20:24:47 +01:00
def __getattr__ ( self , key ) :
""" Returns the config value from the `user` section.
This is called when the attribute is accessed via dot notation but does not exists .
"""
if key in self . parent . user :
return self . parent . user [ key ]
return None
2019-07-15 12:14:27 -07:00
def handle_store_boolean ( self , * args , * * kwargs ) :
""" Does the add_argument for action= ' store_boolean ' .
"""
disabled_args = None
disabled_kwargs = kwargs . copy ( )
disabled_kwargs [ ' action ' ] = ' store_false '
2019-09-22 13:25:33 -07:00
disabled_kwargs [ ' dest ' ] = self . get_argument_name ( * args , * * kwargs )
2019-07-15 12:14:27 -07:00
disabled_kwargs [ ' help ' ] = ' Disable ' + kwargs [ ' help ' ]
kwargs [ ' action ' ] = ' store_true '
kwargs [ ' help ' ] = ' Enable ' + kwargs [ ' help ' ]
for flag in args :
if flag [ : 2 ] == ' -- ' :
disabled_args = ( ' --no- ' + flag [ 2 : ] , )
break
self . add_argument ( * args , * * kwargs )
self . add_argument ( * disabled_args , * * disabled_kwargs )
return ( args , kwargs , disabled_args , disabled_kwargs )
class SubparserWrapper ( object ) :
2019-11-18 14:54:50 -08:00
""" Wrap subparsers so we can track what options the user passed.
2019-07-15 12:14:27 -07:00
"""
def __init__ ( self , cli , submodule , subparser ) :
self . cli = cli
self . submodule = submodule
self . subparser = subparser
for attr in dir ( subparser ) :
if not hasattr ( self , attr ) :
setattr ( self , attr , getattr ( subparser , attr ) )
def completer ( self , completer ) :
""" Add an arpcomplete completer to this subcommand.
"""
self . subparser . completer = completer
def add_argument ( self , * args , * * kwargs ) :
2019-11-18 14:54:50 -08:00
""" Add an argument for this subcommand.
This also stores the default for the argument in ` self . cli . default_arguments ` .
"""
2019-07-15 12:14:27 -07:00
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_boolean ' :
2019-11-18 14:54:50 -08:00
# Store boolean will call us again with the enable/disable flag arguments
2019-07-15 12:14:27 -07:00
return handle_store_boolean ( self , * args , * * kwargs )
self . cli . acquire_lock ( )
self . subparser . add_argument ( * args , * * kwargs )
2019-11-18 14:54:50 -08:00
if self . submodule not in self . cli . default_arguments :
self . cli . default_arguments [ self . submodule ] = { }
self . cli . default_arguments [ self . submodule ] [ self . cli . get_argument_name ( * args , * * kwargs ) ] = kwargs . get ( ' default ' )
2019-07-15 12:14:27 -07:00
self . cli . release_lock ( )
class MILC ( object ) :
""" MILC - An Opinionated Batteries Included Framework
"""
def __init__ ( self ) :
""" Initialize the MILC object.
2019-11-18 14:54:50 -08:00
version
The version string to associate with your CLI program
2019-07-15 12:14:27 -07:00
"""
# Setup a lock for thread safety
self . _lock = threading . RLock ( ) if thread else None
# Define some basic info
self . acquire_lock ( )
self . _description = None
self . _entrypoint = None
self . _inside_context_manager = False
self . ansi = ansi_colors
2019-09-22 13:25:33 -07:00
self . arg_only = [ ]
2020-02-17 11:42:11 -08:00
self . config = self . config_source = None
2019-07-15 12:14:27 -07:00
self . config_file = None
2019-11-18 14:54:50 -08:00
self . default_arguments = { }
self . version = ' unknown '
2019-07-15 12:14:27 -07:00
self . release_lock ( )
2019-09-22 13:25:33 -07:00
# Figure out our program name
self . prog_name = sys . argv [ 0 ] [ : - 3 ] if sys . argv [ 0 ] . endswith ( ' .py ' ) else sys . argv [ 0 ]
self . prog_name = self . prog_name . split ( ' / ' ) [ - 1 ]
2019-07-15 12:14:27 -07:00
# Initialize all the things
2019-11-18 14:54:50 -08:00
self . read_config_file ( )
2019-07-15 12:14:27 -07:00
self . initialize_argparse ( )
self . initialize_logging ( )
@property
def description ( self ) :
return self . _description
@description.setter
def description ( self , value ) :
2019-11-18 14:54:50 -08:00
self . _description = self . _arg_parser . description = value
2019-07-15 12:14:27 -07:00
def echo ( self , text , * args , * * kwargs ) :
2019-09-22 13:25:33 -07:00
""" Print colorized text to stdout.
2019-07-15 12:14:27 -07:00
ANSI color strings ( such as { fg - blue } ) will be converted into ANSI
escape sequences , and the ANSI reset sequence will be added to all
strings .
If * args or * * kwargs are passed they will be used to % - format the strings .
"""
if args and kwargs :
raise RuntimeError ( ' You can only specify *args or **kwargs, not both! ' )
2019-09-22 13:25:33 -07:00
args = args or kwargs
text = format_ansi ( text )
2019-07-15 12:14:27 -07:00
2019-09-22 13:25:33 -07:00
print ( text % args )
2019-07-15 12:14:27 -07:00
def initialize_argparse ( self ) :
""" Prepare to process arguments from sys.argv.
"""
kwargs = {
' fromfile_prefix_chars ' : ' @ ' ,
' conflict_handler ' : ' resolve ' ,
}
self . acquire_lock ( )
self . subcommands = { }
self . _subparsers = None
self . argwarn = argcomplete . warn
self . args = None
self . _arg_parser = argparse . ArgumentParser ( * * kwargs )
self . set_defaults = self . _arg_parser . set_defaults
self . print_usage = self . _arg_parser . print_usage
self . print_help = self . _arg_parser . print_help
self . release_lock ( )
def completer ( self , completer ) :
2019-09-22 13:25:33 -07:00
""" Add an argcomplete completer to this subcommand.
2019-07-15 12:14:27 -07:00
"""
self . _arg_parser . completer = completer
def add_argument ( self , * args , * * kwargs ) :
2019-11-18 14:54:50 -08:00
""" Wrapper to add arguments and track whether they were passed on the command line.
2019-07-15 12:14:27 -07:00
"""
2019-09-22 13:25:33 -07:00
if ' action ' in kwargs and kwargs [ ' action ' ] == ' store_boolean ' :
return handle_store_boolean ( self , * args , * * kwargs )
2019-07-15 12:14:27 -07:00
self . acquire_lock ( )
2019-11-18 14:54:50 -08:00
2019-07-15 12:14:27 -07:00
self . _arg_parser . add_argument ( * args , * * kwargs )
2019-11-18 14:54:50 -08:00
if ' general ' not in self . default_arguments :
self . default_arguments [ ' general ' ] = { }
self . default_arguments [ ' general ' ] [ self . get_argument_name ( * args , * * kwargs ) ] = kwargs . get ( ' default ' )
2019-07-15 12:14:27 -07:00
self . release_lock ( )
def initialize_logging ( self ) :
""" Prepare the defaults for the logging infrastructure.
"""
self . acquire_lock ( )
self . log_file = None
self . log_file_mode = ' a '
self . log_file_handler = None
self . log_print = True
self . log_print_to = sys . stderr
self . log_print_level = logging . INFO
self . log_file_level = logging . DEBUG
self . log_level = logging . INFO
self . log = logging . getLogger ( self . __class__ . __name__ )
self . log . setLevel ( logging . DEBUG )
logging . root . setLevel ( logging . DEBUG )
self . release_lock ( )
self . add_argument ( ' -V ' , ' --version ' , version = self . version , action = ' version ' , help = ' Display the version and exit ' )
self . add_argument ( ' -v ' , ' --verbose ' , action = ' store_true ' , help = ' Make the logging more verbose ' )
self . add_argument ( ' --datetime-fmt ' , default = ' % Y- % m- %d % H: % M: % S ' , help = ' Format string for datetimes ' )
self . add_argument ( ' --log-fmt ' , default = ' %(levelname)s %(message)s ' , help = ' Format string for printed log output ' )
self . add_argument ( ' --log-file-fmt ' , default = ' [ %(levelname)s ] [ %(asctime)s ] [file: %(pathname)s ] [line: %(lineno)d ] %(message)s ' , help = ' Format string for log file. ' )
self . add_argument ( ' --log-file ' , help = ' File to write log messages to ' )
self . add_argument ( ' --color ' , action = ' store_boolean ' , default = True , help = ' color in output ' )
2019-11-18 14:54:50 -08:00
self . add_argument ( ' --config-file ' , help = ' The location for the configuration file ' )
self . arg_only . append ( ' config_file ' )
2019-07-15 12:14:27 -07:00
def add_subparsers ( self , title = ' Sub-commands ' , * * kwargs ) :
if self . _inside_context_manager :
raise RuntimeError ( ' You must run this before the with statement! ' )
self . acquire_lock ( )
self . _subparsers = self . _arg_parser . add_subparsers ( title = title , dest = ' subparsers ' , * * kwargs )
self . release_lock ( )
def acquire_lock ( self ) :
""" Acquire the MILC lock for exclusive access to properties.
"""
if self . _lock :
self . _lock . acquire ( )
def release_lock ( self ) :
""" Release the MILC lock.
"""
if self . _lock :
self . _lock . release ( )
def find_config_file ( self ) :
""" Locate the config file.
"""
if self . config_file :
return self . config_file
2019-11-18 14:54:50 -08:00
if ' --config-file ' in sys . argv :
return Path ( sys . argv [ sys . argv . index ( ' --config-file ' ) + 1 ] ) . expanduser ( ) . resolve ( )
2019-07-15 12:14:27 -07:00
2019-11-18 14:54:50 -08:00
filedir = user_config_dir ( appname = ' qmk ' , appauthor = ' QMK ' )
filename = ' %s .ini ' % self . prog_name
return Path ( filedir ) / filename
2019-07-15 12:14:27 -07:00
def get_argument_name ( self , * args , * * kwargs ) :
""" Takes argparse arguments and returns the dest name.
"""
try :
return self . _arg_parser . _get_optional_kwargs ( * args , * * kwargs ) [ ' dest ' ]
except ValueError :
return self . _arg_parser . _get_positional_kwargs ( * args , * * kwargs ) [ ' dest ' ]
def argument ( self , * args , * * kwargs ) :
""" Decorator to call self.add_argument or self.<subcommand>.add_argument.
"""
if self . _inside_context_manager :
raise RuntimeError ( ' You must run this before the with statement! ' )
def argument_function ( handler ) :
2019-09-22 13:25:33 -07:00
if ' arg_only ' in kwargs and kwargs [ ' arg_only ' ] :
arg_name = self . get_argument_name ( * args , * * kwargs )
self . arg_only . append ( arg_name )
del kwargs [ ' arg_only ' ]
2019-10-13 20:23:11 +02:00
name = handler . __name__ . replace ( " _ " , " - " )
2019-07-15 12:14:27 -07:00
if handler is self . _entrypoint :
self . add_argument ( * args , * * kwargs )
2019-10-13 20:23:11 +02:00
elif name in self . subcommands :
self . subcommands [ name ] . add_argument ( * args , * * kwargs )
2019-07-15 12:14:27 -07:00
else :
raise RuntimeError ( ' Decorated function is not entrypoint or subcommand! ' )
return handler
return argument_function
def arg_passed ( self , arg ) :
""" Returns True if arg was passed on the command line.
"""
2019-11-18 14:54:50 -08:00
return self . default_arguments . get ( arg ) != self . args [ arg ]
2019-07-15 12:14:27 -07:00
def parse_args ( self ) :
""" Parse the CLI args.
"""
if self . args :
self . log . debug ( ' Warning: Arguments have already been parsed, ignoring duplicate attempt! ' )
return
argcomplete . autocomplete ( self . _arg_parser )
self . acquire_lock ( )
self . args = self . _arg_parser . parse_args ( )
if ' entrypoint ' in self . args :
self . _entrypoint = self . args . entrypoint
self . release_lock ( )
2019-11-18 14:54:50 -08:00
def read_config_file ( self ) :
""" Read in the configuration file and store it in self.config.
2019-07-15 12:14:27 -07:00
"""
self . acquire_lock ( )
2019-11-18 14:54:50 -08:00
self . config = Configuration ( )
2020-02-17 11:42:11 -08:00
self . config_source = Configuration ( )
2019-07-15 12:14:27 -07:00
self . config_file = self . find_config_file ( )
2019-11-18 14:54:50 -08:00
if self . config_file and self . config_file . exists ( ) :
2019-07-15 12:14:27 -07:00
config = RawConfigParser ( self . config )
2019-11-18 14:54:50 -08:00
config . read ( str ( self . config_file ) )
2019-07-15 12:14:27 -07:00
# Iterate over the config file options and write them into self.config
for section in config . sections ( ) :
for option in config . options ( section ) :
value = config . get ( section , option )
# Coerce values into useful datatypes
if value . lower ( ) in [ ' 1 ' , ' yes ' , ' true ' , ' on ' ] :
value = True
2019-11-18 14:54:50 -08:00
elif value . lower ( ) in [ ' 0 ' , ' no ' , ' false ' , ' off ' ] :
2019-07-15 12:14:27 -07:00
value = False
2019-11-18 14:54:50 -08:00
elif value . lower ( ) in [ ' none ' ] :
continue
2019-07-15 12:14:27 -07:00
elif value . replace ( ' . ' , ' ' ) . isdigit ( ) :
if ' . ' in value :
value = Decimal ( value )
else :
value = int ( value )
self . config [ section ] [ option ] = value
2020-02-17 11:42:11 -08:00
self . config_source [ section ] [ option ] = ' config_file '
2019-07-15 12:14:27 -07:00
2019-11-18 14:54:50 -08:00
self . release_lock ( )
def merge_args_into_config ( self ) :
""" Merge CLI arguments into self.config to create the runtime configuration.
"""
self . acquire_lock ( )
2019-07-15 12:14:27 -07:00
for argument in vars ( self . args ) :
if argument in ( ' subparsers ' , ' entrypoint ' ) :
continue
2019-11-18 14:54:50 -08:00
if argument not in self . arg_only :
# Find the argument's section
2020-01-12 14:56:11 +01:00
# Underscores in command's names are converted to dashes during initialization.
# TODO(Erovia) Find a better solution
entrypoint_name = self . _entrypoint . __name__ . replace ( " _ " , " - " )
if entrypoint_name in self . default_arguments and argument in self . default_arguments [ entrypoint_name ] :
2019-11-18 14:54:50 -08:00
argument_found = True
section = self . _entrypoint . __name__
if argument in self . default_arguments [ ' general ' ] :
argument_found = True
section = ' general '
if not argument_found :
raise RuntimeError ( ' Could not find argument in `self.default_arguments`. This should be impossible! ' )
exit ( 1 )
# Merge this argument into self.config
2020-01-12 14:56:11 +01:00
if argument in self . default_arguments [ ' general ' ] or argument in self . default_arguments [ entrypoint_name ] :
2019-09-22 13:25:33 -07:00
arg_value = getattr ( self . args , argument )
2020-01-12 14:56:11 +01:00
if arg_value is not None :
2019-11-18 14:54:50 -08:00
self . config [ section ] [ argument ] = arg_value
2020-02-17 11:42:11 -08:00
self . config_source [ section ] [ argument ] = ' argument '
2019-09-22 13:25:33 -07:00
else :
2020-01-12 14:56:11 +01:00
if argument not in self . config [ entrypoint_name ] :
2019-11-24 20:24:47 +01:00
# Check if the argument exist for this section
arg = getattr ( self . args , argument )
2020-01-12 11:21:49 +01:00
if arg is not None :
2019-11-24 20:24:47 +01:00
self . config [ section ] [ argument ] = arg
2020-02-17 11:42:11 -08:00
self . config_source [ section ] [ argument ] = ' argument '
2019-07-15 12:14:27 -07:00
self . release_lock ( )
def save_config ( self ) :
""" Save the current configuration to the config file.
"""
2019-11-18 14:54:50 -08:00
self . log . debug ( " Saving config file to ' %s ' " , str ( self . config_file ) )
2019-07-15 12:14:27 -07:00
if not self . config_file :
self . log . warning ( ' %s .config_file file not set, not saving config! ' , self . __class__ . __name__ )
return
self . acquire_lock ( )
2019-11-18 14:54:50 -08:00
# Generate a sanitized version of our running configuration
2019-07-15 12:14:27 -07:00
config = RawConfigParser ( )
for section_name , section in self . config . _config . items ( ) :
config . add_section ( section_name )
for option_name , value in section . items ( ) :
if section_name == ' general ' :
2019-11-18 14:54:50 -08:00
if option_name in [ ' config_file ' ] :
2019-07-15 12:14:27 -07:00
continue
2019-11-18 14:54:50 -08:00
if value is not None :
config . set ( section_name , option_name , str ( value ) )
2019-07-15 12:14:27 -07:00
2019-11-18 14:54:50 -08:00
# Write out the config file
config_dir = self . config_file . parent
if not config_dir . exists ( ) :
config_dir . mkdir ( parents = True , exist_ok = True )
2019-09-22 13:25:33 -07:00
2019-11-18 14:54:50 -08:00
with NamedTemporaryFile ( mode = ' w ' , dir = str ( config_dir ) , delete = False ) as tmpfile :
2019-07-15 12:14:27 -07:00
config . write ( tmpfile )
# Move the new config file into place atomically
if os . path . getsize ( tmpfile . name ) > 0 :
2020-02-11 10:37:15 +01:00
os . replace ( tmpfile . name , str ( self . config_file ) )
2019-07-15 12:14:27 -07:00
else :
2019-11-18 14:54:50 -08:00
self . log . warning ( ' Config file saving failed, not replacing %s with %s . ' , str ( self . config_file ) , tmpfile . name )
2019-07-15 12:14:27 -07:00
2019-11-18 14:54:50 -08:00
# Housekeeping
2019-07-15 12:14:27 -07:00
self . release_lock ( )
2019-11-18 14:54:50 -08:00
cli . log . info ( ' Wrote configuration to %s ' , shlex . quote ( str ( self . config_file ) ) )
2019-07-15 12:14:27 -07:00
def __call__ ( self ) :
""" Execute the entrypoint function.
"""
if not self . _inside_context_manager :
# If they didn't use the context manager use it ourselves
with self :
2019-08-21 23:40:24 -07:00
return self . __call__ ( )
2019-07-15 12:14:27 -07:00
if not self . _entrypoint :
raise RuntimeError ( ' No entrypoint provided! ' )
return self . _entrypoint ( self )
def entrypoint ( self , description ) :
""" Set the entrypoint for when no subcommand is provided.
"""
if self . _inside_context_manager :
raise RuntimeError ( ' You must run this before cli()! ' )
self . acquire_lock ( )
self . description = description
self . release_lock ( )
def entrypoint_func ( handler ) :
self . acquire_lock ( )
self . _entrypoint = handler
self . release_lock ( )
return handler
return entrypoint_func
2019-11-21 21:52:00 +01:00
def add_subcommand ( self , handler , description , name = None , hidden = False , * * kwargs ) :
2019-07-15 12:14:27 -07:00
""" Register a subcommand.
If name is not provided we use ` handler . __name__ ` .
"""
2019-11-21 21:52:00 +01:00
2019-07-15 12:14:27 -07:00
if self . _inside_context_manager :
raise RuntimeError ( ' You must run this before the with statement! ' )
if self . _subparsers is None :
2019-11-21 21:52:00 +01:00
self . add_subparsers ( metavar = " " )
2019-07-15 12:14:27 -07:00
if not name :
2019-10-13 20:23:11 +02:00
name = handler . __name__ . replace ( " _ " , " - " )
2019-07-15 12:14:27 -07:00
self . acquire_lock ( )
2019-11-21 21:52:00 +01:00
if not hidden :
self . _subparsers . metavar = " { %s , %s } " % ( self . _subparsers . metavar [ 1 : - 1 ] , name ) if self . _subparsers . metavar else " { %s %s } " % ( self . _subparsers . metavar [ 1 : - 1 ] , name )
kwargs [ ' help ' ] = description
2019-07-15 12:14:27 -07:00
self . subcommands [ name ] = SubparserWrapper ( self , name , self . _subparsers . add_parser ( name , * * kwargs ) )
self . subcommands [ name ] . set_defaults ( entrypoint = handler )
self . release_lock ( )
return handler
2019-11-21 21:52:00 +01:00
def subcommand ( self , description , hidden = False , * * kwargs ) :
2019-07-15 12:14:27 -07:00
""" Decorator to register a subcommand.
"""
def subcommand_function ( handler ) :
2019-11-21 21:52:00 +01:00
return self . add_subcommand ( handler , description , hidden = hidden , * * kwargs )
2019-07-15 12:14:27 -07:00
return subcommand_function
def setup_logging ( self ) :
""" Called by __enter__() to setup the logging configuration.
"""
if len ( logging . root . handlers ) != 0 :
2019-09-22 13:25:33 -07:00
# MILC is the only thing that should have root log handlers
logging . root . handlers = [ ]
2019-07-15 12:14:27 -07:00
self . acquire_lock ( )
if self . config [ ' general ' ] [ ' verbose ' ] :
self . log_print_level = logging . DEBUG
self . log_file = self . config [ ' general ' ] [ ' log_file ' ] or self . log_file
self . log_file_format = self . config [ ' general ' ] [ ' log_file_fmt ' ]
self . log_file_format = ANSIStrippingFormatter ( self . config [ ' general ' ] [ ' log_file_fmt ' ] , self . config [ ' general ' ] [ ' datetime_fmt ' ] )
self . log_format = self . config [ ' general ' ] [ ' log_fmt ' ]
if self . config . general . color :
2019-11-18 14:54:50 -08:00
self . log_format = ANSIEmojiLoglevelFormatter ( self . args . log_fmt , self . config . general . datetime_fmt )
2019-07-15 12:14:27 -07:00
else :
2019-11-18 14:54:50 -08:00
self . log_format = ANSIStrippingFormatter ( self . args . log_fmt , self . config . general . datetime_fmt )
2019-07-15 12:14:27 -07:00
if self . log_file :
self . log_file_handler = logging . FileHandler ( self . log_file , self . log_file_mode )
self . log_file_handler . setLevel ( self . log_file_level )
self . log_file_handler . setFormatter ( self . log_file_format )
logging . root . addHandler ( self . log_file_handler )
if self . log_print :
self . log_print_handler = logging . StreamHandler ( self . log_print_to )
self . log_print_handler . setLevel ( self . log_print_level )
self . log_print_handler . setFormatter ( self . log_format )
logging . root . addHandler ( self . log_print_handler )
self . release_lock ( )
def __enter__ ( self ) :
if self . _inside_context_manager :
self . log . debug ( ' Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that. ' )
return
self . acquire_lock ( )
self . _inside_context_manager = True
self . release_lock ( )
colorama . init ( )
self . parse_args ( )
2019-11-18 14:54:50 -08:00
self . merge_args_into_config ( )
2019-07-15 12:14:27 -07:00
self . setup_logging ( )
return self
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
self . acquire_lock ( )
self . _inside_context_manager = False
self . release_lock ( )
if exc_type is not None and not isinstance ( SystemExit ( ) , exc_type ) :
print ( exc_type )
logging . exception ( exc_val )
exit ( 255 )
cli = MILC ( )
if __name__ == ' __main__ ' :
@cli.argument ( ' -c ' , ' --comma ' , help = ' comma in output ' , default = True , action = ' store_boolean ' )
@cli.entrypoint ( ' My useful CLI tool with subcommands. ' )
def main ( cli ) :
comma = ' , ' if cli . config . general . comma else ' '
cli . log . info ( ' {bg_green} {fg_red} Hello %s World! ' , comma )
@cli.argument ( ' -n ' , ' --name ' , help = ' Name to greet ' , default = ' World ' )
@cli.subcommand ( ' Description of hello subcommand here. ' )
def hello ( cli ) :
comma = ' , ' if cli . config . general . comma else ' '
cli . log . info ( ' {fg_blue} Hello %s %s ! ' , comma , cli . config . hello . name )
def goodbye ( cli ) :
comma = ' , ' if cli . config . general . comma else ' '
cli . log . info ( ' {bg_red} Goodbye %s %s ! ' , comma , cli . config . goodbye . name )
@cli.argument ( ' -n ' , ' --name ' , help = ' Name to greet ' , default = ' World ' )
@cli.subcommand ( ' Think a bit before greeting the user. ' )
def thinking ( cli ) :
comma = ' , ' if cli . config . general . comma else ' '
spinner = cli . spinner ( text = ' Just a moment... ' , spinner = ' earth ' )
spinner . start ( )
sleep ( 2 )
spinner . stop ( )
with cli . spinner ( text = ' Almost there! ' , spinner = ' moon ' ) :
sleep ( 2 )
cli . log . info ( ' {fg_cyan} Hello %s %s ! ' , comma , cli . config . thinking . name )
@cli.subcommand ( ' Show off our ANSI colors. ' )
def pride ( cli ) :
cli . echo ( ' {bg_red} ' )
cli . echo ( ' {bg_lightred_ex} ' )
cli . echo ( ' {bg_lightyellow_ex} ' )
cli . echo ( ' {bg_green} ' )
cli . echo ( ' {bg_blue} ' )
cli . echo ( ' {bg_magenta} ' )
# You can register subcommands using decorators as seen above, or using functions like like this:
cli . add_subcommand ( goodbye , ' This will show up in --help output. ' )
cli . goodbye . add_argument ( ' -n ' , ' --name ' , help = ' Name to bid farewell to ' , default = ' World ' )
cli ( ) # Automatically picks between main(), hello() and goodbye()