1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """Classes to handle advanced configuration in simple to complex applications.
19
20 Allows to load the configuration from a file or from command line
21 options, to generate a sample configuration file or to display
22 program's usage. Fills the gap between optik/optparse and ConfigParser
23 by adding data types (which are also available as a standalone optik
24 extension in the `optik_ext` module).
25
26
27 Quick start: simplest usage
28 ---------------------------
29
30 .. python ::
31
32 >>> import sys
33 >>> from logilab.common.configuration import Configuration
34 >>> options = [('dothis', {'type':'yn', 'default': True, 'metavar': '<y or n>'}),
35 ... ('value', {'type': 'string', 'metavar': '<string>'}),
36 ... ('multiple', {'type': 'csv', 'default': ('yop',),
37 ... 'metavar': '<comma separated values>',
38 ... 'help': 'you can also document the option'}),
39 ... ('number', {'type': 'int', 'default':2, 'metavar':'<int>'}),
40 ... ]
41 >>> config = Configuration(options=options, name='My config')
42 >>> print config['dothis']
43 True
44 >>> print config['value']
45 None
46 >>> print config['multiple']
47 ('yop',)
48 >>> print config['number']
49 2
50 >>> print config.help()
51 Usage: [options]
52
53 Options:
54 -h, --help show this help message and exit
55 --dothis=<y or n>
56 --value=<string>
57 --multiple=<comma separated values>
58 you can also document the option [current: none]
59 --number=<int>
60
61 >>> f = open('myconfig.ini', 'w')
62 >>> f.write('''[MY CONFIG]
63 ... number = 3
64 ... dothis = no
65 ... multiple = 1,2,3
66 ... ''')
67 >>> f.close()
68 >>> config.load_file_configuration('myconfig.ini')
69 >>> print config['dothis']
70 False
71 >>> print config['value']
72 None
73 >>> print config['multiple']
74 ['1', '2', '3']
75 >>> print config['number']
76 3
77 >>> sys.argv = ['mon prog', '--value', 'bacon', '--multiple', '4,5,6',
78 ... 'nonoptionargument']
79 >>> print config.load_command_line_configuration()
80 ['nonoptionargument']
81 >>> print config['value']
82 bacon
83 >>> config.generate_config()
84 # class for simple configurations which don't need the
85 # manager / providers model and prefer delegation to inheritance
86 #
87 # configuration values are accessible through a dict like interface
88 #
89 [MY CONFIG]
90
91 dothis=no
92
93 value=bacon
94
95 # you can also document the option
96 multiple=4,5,6
97
98 number=3
99 >>>
100 """
101 __docformat__ = "restructuredtext en"
102
103 __all__ = ('OptionsManagerMixIn', 'OptionsProviderMixIn',
104 'ConfigurationMixIn', 'Configuration',
105 'OptionsManager2ConfigurationAdapter')
106
107 import os
108 import sys
109 import re
110 from os.path import exists, expanduser
111 from copy import copy
112 from configparser import ConfigParser, NoOptionError, NoSectionError, \
113 DuplicateSectionError
114 from warnings import warn
115
116 from logilab.common.compat import callable, raw_input, str_encode as _encode
117 from logilab.common.deprecation import deprecated
118 from logilab.common.textutils import normalize_text, unquote
119 from logilab.common import optik_ext
120 import collections
121
122 OptionError = optik_ext.OptionError
123
124 REQUIRED = []
125
127 """raised by set_option when it doesn't know what to do for an action"""
128
129
131 encoding = encoding or getattr(stream, 'encoding', None)
132 if not encoding:
133 import locale
134 encoding = locale.getpreferredencoding()
135 return encoding
136
137
138
139
140
141
142
144 """validate and return a converted value for option of type 'choice'
145 """
146 if not value in optdict['choices']:
147 msg = "option %s: invalid value: %r, should be in %s"
148 raise optik_ext.OptionValueError(msg % (name, value, optdict['choices']))
149 return value
150
152 """validate and return a converted value for option of type 'choice'
153 """
154 choices = optdict['choices']
155 values = optik_ext.check_csv(None, name, value)
156 for value in values:
157 if not value in choices:
158 msg = "option %s: invalid value: %r, should be in %s"
159 raise optik_ext.OptionValueError(msg % (name, value, choices))
160 return values
161
163 """validate and return a converted value for option of type 'csv'
164 """
165 return optik_ext.check_csv(None, name, value)
166
168 """validate and return a converted value for option of type 'yn'
169 """
170 return optik_ext.check_yn(None, name, value)
171
173 """validate and return a converted value for option of type 'named'
174 """
175 return optik_ext.check_named(None, name, value)
176
178 """validate and return a filepath for option of type 'file'"""
179 return optik_ext.check_file(None, name, value)
180
182 """validate and return a valid color for option of type 'color'"""
183 return optik_ext.check_color(None, name, value)
184
186 """validate and return a string for option of type 'password'"""
187 return optik_ext.check_password(None, name, value)
188
190 """validate and return a mx DateTime object for option of type 'date'"""
191 return optik_ext.check_date(None, name, value)
192
194 """validate and return a time object for option of type 'time'"""
195 return optik_ext.check_time(None, name, value)
196
198 """validate and return an integer for option of type 'bytes'"""
199 return optik_ext.check_bytes(None, name, value)
200
201
202 VALIDATORS = {'string': unquote,
203 'int': int,
204 'float': float,
205 'file': file_validator,
206 'font': unquote,
207 'color': color_validator,
208 'regexp': re.compile,
209 'csv': csv_validator,
210 'yn': yn_validator,
211 'bool': yn_validator,
212 'named': named_validator,
213 'password': password_validator,
214 'date': date_validator,
215 'time': time_validator,
216 'bytes': bytes_validator,
217 'choice': choice_validator,
218 'multiple_choice': multiple_choice_validator,
219 }
220
222 if opttype not in VALIDATORS:
223 raise Exception('Unsupported type "%s"' % opttype)
224 try:
225 return VALIDATORS[opttype](optdict, option, value)
226 except TypeError:
227 try:
228 return VALIDATORS[opttype](value)
229 except optik_ext.OptionValueError:
230 raise
231 except:
232 raise optik_ext.OptionValueError('%s value (%r) should be of type %s' %
233 (option, value, opttype))
234
235
236
237
238
239
240
249
253
265 return input_validator
266
267 INPUT_FUNCTIONS = {
268 'string': input_string,
269 'password': input_password,
270 }
271
272 for opttype in list(VALIDATORS.keys()):
273 INPUT_FUNCTIONS.setdefault(opttype, _make_input_function(opttype))
274
275
276
278 """monkey patch OptionParser.expand_default since we have a particular
279 way to handle defaults to avoid overriding values in the configuration
280 file
281 """
282 if self.parser is None or not self.default_tag:
283 return option.help
284 optname = option._long_opts[0][2:]
285 try:
286 provider = self.parser.options_manager._all_options[optname]
287 except KeyError:
288 value = None
289 else:
290 optdict = provider.get_option_def(optname)
291 optname = provider.option_attrname(optname, optdict)
292 value = getattr(provider.config, optname, optdict)
293 value = format_option_value(optdict, value)
294 if value is optik_ext.NO_DEFAULT or not value:
295 value = self.NO_DEFAULT_VALUE
296 return option.help.replace(self.default_tag, str(value))
297
298
300 """return a validated value for an option according to its type
301
302 optional argument name is only used for error message formatting
303 """
304 try:
305 _type = optdict['type']
306 except KeyError:
307
308 return value
309 return _call_validator(_type, optdict, name, value)
310 convert = deprecated('[0.60] convert() was renamed _validate()')(_validate)
311
312
313
318
335
350
369
377
394
395 format_section = ini_format_section
396
416
417
418
420 """MixIn to handle a configuration from both a configuration file and
421 command line options
422 """
423
424 - def __init__(self, usage, config_file=None, version=None, quiet=0):
425 self.config_file = config_file
426 self.reset_parsers(usage, version=version)
427
428 self.options_providers = []
429
430 self._all_options = {}
431 self._short_options = {}
432 self._nocallback_options = {}
433 self._mygroups = dict()
434
435 self.quiet = quiet
436 self._maxlevel = 0
437
439
440 self.cfgfile_parser = ConfigParser()
441
442 self.cmdline_parser = optik_ext.OptionParser(usage=usage, version=version)
443 self.cmdline_parser.options_manager = self
444 self._optik_option_attrs = set(self.cmdline_parser.option_class.ATTRS)
445
447 """register an options provider"""
448 assert provider.priority <= 0, "provider's priority can't be >= 0"
449 for i in range(len(self.options_providers)):
450 if provider.priority > self.options_providers[i].priority:
451 self.options_providers.insert(i, provider)
452 break
453 else:
454 self.options_providers.append(provider)
455 non_group_spec_options = [option for option in provider.options
456 if 'group' not in option[1]]
457 groups = getattr(provider, 'option_groups', ())
458 if own_group and non_group_spec_options:
459 self.add_option_group(provider.name.upper(), provider.__doc__,
460 non_group_spec_options, provider)
461 else:
462 for opt, optdict in non_group_spec_options:
463 self.add_optik_option(provider, self.cmdline_parser, opt, optdict)
464 for gname, gdoc in groups:
465 gname = gname.upper()
466 goptions = [option for option in provider.options
467 if option[1].get('group', '').upper() == gname]
468 self.add_option_group(gname, gdoc, goptions, provider)
469
471 """add an option group including the listed options
472 """
473 assert options
474
475 if group_name in self._mygroups:
476 group = self._mygroups[group_name]
477 else:
478 group = optik_ext.OptionGroup(self.cmdline_parser,
479 title=group_name.capitalize())
480 self.cmdline_parser.add_option_group(group)
481 group.level = provider.level
482 self._mygroups[group_name] = group
483
484 if group_name != "DEFAULT":
485 self.cfgfile_parser.add_section(group_name)
486
487 for opt, optdict in options:
488 self.add_optik_option(provider, group, opt, optdict)
489
491 if 'inputlevel' in optdict:
492 warn('[0.50] "inputlevel" in option dictionary for %s is deprecated,'
493 ' use "level"' % opt, DeprecationWarning)
494 optdict['level'] = optdict.pop('inputlevel')
495 args, optdict = self.optik_option(provider, opt, optdict)
496 option = optikcontainer.add_option(*args, **optdict)
497 self._all_options[opt] = provider
498 self._maxlevel = max(self._maxlevel, option.level or 0)
499
501 """get our personal option definition and return a suitable form for
502 use with optik/optparse
503 """
504 optdict = copy(optdict)
505 others = {}
506 if 'action' in optdict:
507 self._nocallback_options[provider] = opt
508 else:
509 optdict['action'] = 'callback'
510 optdict['callback'] = self.cb_set_provider_option
511
512
513 if 'default' in optdict:
514 if ('help' in optdict
515 and optdict.get('default') is not None
516 and not optdict['action'] in ('store_true', 'store_false')):
517 optdict['help'] += ' [current: %default]'
518 del optdict['default']
519 args = ['--' + str(opt)]
520 if 'short' in optdict:
521 self._short_options[optdict['short']] = opt
522 args.append('-' + optdict['short'])
523 del optdict['short']
524
525 for key in list(optdict.keys()):
526 if not key in self._optik_option_attrs:
527 optdict.pop(key)
528 return args, optdict
529
531 """optik callback for option setting"""
532 if opt.startswith('--'):
533
534 opt = opt[2:]
535 else:
536
537 opt = self._short_options[opt[1:]]
538
539 if value is None:
540 value = 1
541 self.global_set_option(opt, value)
542
544 """set option on the correct option provider"""
545 self._all_options[opt].set_option(opt, value)
546
548 """write a configuration file according to the current configuration
549 into the given stream or stdout
550 """
551 options_by_section = {}
552 sections = []
553 for provider in self.options_providers:
554 for section, options in provider.options_by_section():
555 if section is None:
556 section = provider.name
557 if section in skipsections:
558 continue
559 options = [(n, d, v) for (n, d, v) in options
560 if d.get('type') is not None]
561 if not options:
562 continue
563 if not section in sections:
564 sections.append(section)
565 alloptions = options_by_section.setdefault(section, [])
566 alloptions += options
567 stream = stream or sys.stdout
568 encoding = _get_encoding(encoding, stream)
569 printed = False
570 for section in sections:
571 if printed:
572 print('\n', file=stream)
573 format_section(stream, section.upper(), options_by_section[section],
574 encoding)
575 printed = True
576
577 - def generate_manpage(self, pkginfo, section=1, stream=None):
578 """write a man page for the current configuration into the given
579 stream or stdout
580 """
581 self._monkeypatch_expand_default()
582 try:
583 optik_ext.generate_manpage(self.cmdline_parser, pkginfo,
584 section, stream=stream or sys.stdout,
585 level=self._maxlevel)
586 finally:
587 self._unmonkeypatch_expand_default()
588
589
590
592 """initialize configuration using default values"""
593 for provider in self.options_providers:
594 provider.load_defaults()
595
600
602 """read the configuration file but do not load it (i.e. dispatching
603 values to each options provider)
604 """
605 helplevel = 1
606 while helplevel <= self._maxlevel:
607 opt = '-'.join(['long'] * helplevel) + '-help'
608 if opt in self._all_options:
609 break
610 def helpfunc(option, opt, val, p, level=helplevel):
611 print(self.help(level))
612 sys.exit(0)
613 helpmsg = '%s verbose help.' % ' '.join(['more'] * helplevel)
614 optdict = {'action' : 'callback', 'callback' : helpfunc,
615 'help' : helpmsg}
616 provider = self.options_providers[0]
617 self.add_optik_option(provider, self.cmdline_parser, opt, optdict)
618 provider.options += ( (opt, optdict), )
619 helplevel += 1
620 if config_file is None:
621 config_file = self.config_file
622 if config_file is not None:
623 config_file = expanduser(config_file)
624 if config_file and exists(config_file):
625 parser = self.cfgfile_parser
626 parser.read([config_file])
627
628 for sect, values in list(parser._sections.items()):
629 if not sect.isupper() and values:
630 parser._sections[sect.upper()] = values
631 elif not self.quiet:
632 msg = 'No config file found, using default configuration'
633 print(msg, file=sys.stderr)
634 return
635
653
655 """dispatch values previously read from a configuration file to each
656 options provider)
657 """
658 parser = self.cfgfile_parser
659 for provider in self.options_providers:
660 for section, option, optdict in provider.all_options():
661 try:
662 value = parser.get(section, option)
663 provider.set_option(option, value, optdict=optdict)
664 except (NoSectionError, NoOptionError) as ex:
665 continue
666
668 """override configuration according to given parameters
669 """
670 for opt, opt_value in list(kwargs.items()):
671 opt = opt.replace('_', '-')
672 provider = self._all_options[opt]
673 provider.set_option(opt, opt_value)
674
676 """override configuration according to command line parameters
677
678 return additional arguments
679 """
680 self._monkeypatch_expand_default()
681 try:
682 if args is None:
683 args = sys.argv[1:]
684 else:
685 args = list(args)
686 (options, args) = self.cmdline_parser.parse_args(args=args)
687 for provider in list(self._nocallback_options.keys()):
688 config = provider.config
689 for attr in list(config.__dict__.keys()):
690 value = getattr(options, attr, None)
691 if value is None:
692 continue
693 setattr(config, attr, value)
694 return args
695 finally:
696 self._unmonkeypatch_expand_default()
697
698
699
700
709
711
712 try:
713 self.__expand_default_backup = optik_ext.HelpFormatter.expand_default
714 optik_ext.HelpFormatter.expand_default = expand_default
715 except AttributeError:
716
717 pass
719
720 if hasattr(optik_ext.HelpFormatter, 'expand_default'):
721
722 optik_ext.HelpFormatter.expand_default = self.__expand_default_backup
723
724 - def help(self, level=0):
725 """return the usage string for available options """
726 self.cmdline_parser.formatter.output_level = level
727 self._monkeypatch_expand_default()
728 try:
729 return self.cmdline_parser.format_help()
730 finally:
731 self._unmonkeypatch_expand_default()
732
733
735 """used to ease late binding of default method (so you can define options
736 on the class using default methods on the configuration instance)
737 """
739 self.method = methname
740 self._inst = None
741
742 - def bind(self, instance):
743 """bind the method to its instance"""
744 if self._inst is None:
745 self._inst = instance
746
748 assert self._inst, 'unbound method'
749 return getattr(self._inst, self.method)(*args, **kwargs)
750
751
752
754 """Mixin to provide options to an OptionsManager"""
755
756
757 priority = -1
758 name = 'default'
759 options = ()
760 level = 0
761
763 self.config = optik_ext.Values()
764 for option in self.options:
765 try:
766 option, optdict = option
767 except ValueError:
768 raise Exception('Bad option: %r' % option)
769 if isinstance(optdict.get('default'), Method):
770 optdict['default'].bind(self)
771 elif isinstance(optdict.get('callback'), Method):
772 optdict['callback'].bind(self)
773 self.load_defaults()
774
776 """initialize the provider using default values"""
777 for opt, optdict in self.options:
778 action = optdict.get('action')
779 if action != 'callback':
780
781 default = self.option_default(opt, optdict)
782 if default is REQUIRED:
783 continue
784 self.set_option(opt, default, action, optdict)
785
787 """return the default value for an option"""
788 if optdict is None:
789 optdict = self.get_option_def(opt)
790 default = optdict.get('default')
791 if isinstance(default, collections.Callable):
792 default = default()
793 return default
794
796 """get the config attribute corresponding to opt
797 """
798 if optdict is None:
799 optdict = self.get_option_def(opt)
800 return optdict.get('dest', opt.replace('-', '_'))
801 option_name = deprecated('[0.60] OptionsProviderMixIn.option_name() was renamed to option_attrname()')(option_attrname)
802
804 """get the current value for the given option"""
805 return getattr(self.config, self.option_attrname(opt), None)
806
807 - def set_option(self, opt, value, action=None, optdict=None):
808 """method called to set an option (registered in the options list)
809 """
810 if optdict is None:
811 optdict = self.get_option_def(opt)
812 if value is not None:
813 value = _validate(value, optdict, opt)
814 if action is None:
815 action = optdict.get('action', 'store')
816 if optdict.get('type') == 'named':
817 optname = self.option_attrname(opt, optdict)
818 currentvalue = getattr(self.config, optname, None)
819 if currentvalue:
820 currentvalue.update(value)
821 value = currentvalue
822 if action == 'store':
823 setattr(self.config, self.option_attrname(opt, optdict), value)
824 elif action in ('store_true', 'count'):
825 setattr(self.config, self.option_attrname(opt, optdict), 0)
826 elif action == 'store_false':
827 setattr(self.config, self.option_attrname(opt, optdict), 1)
828 elif action == 'append':
829 opt = self.option_attrname(opt, optdict)
830 _list = getattr(self.config, opt, None)
831 if _list is None:
832 if isinstance(value, (list, tuple)):
833 _list = value
834 elif value is not None:
835 _list = []
836 _list.append(value)
837 setattr(self.config, opt, _list)
838 elif isinstance(_list, tuple):
839 setattr(self.config, opt, _list + (value,))
840 else:
841 _list.append(value)
842 elif action == 'callback':
843 optdict['callback'](None, opt, value, None)
844 else:
845 raise UnsupportedAction(action)
846
867
869 """return the dictionary defining an option given it's name"""
870 assert self.options
871 for option in self.options:
872 if option[0] == opt:
873 return option[1]
874 raise OptionError('no such option %s in section %r'
875 % (opt, self.name), opt)
876
877
879 """return an iterator on available options for this provider
880 option are actually described by a 3-uple:
881 (section, option name, option dictionary)
882 """
883 for section, options in self.options_by_section():
884 if section is None:
885 if self.name is None:
886 continue
887 section = self.name.upper()
888 for option, optiondict, value in options:
889 yield section, option, optiondict
890
892 """return an iterator on options grouped by section
893
894 (section, [list of (optname, optdict, optvalue)])
895 """
896 sections = {}
897 for optname, optdict in self.options:
898 sections.setdefault(optdict.get('group'), []).append(
899 (optname, optdict, self.option_value(optname)))
900 if None in sections:
901 yield None, sections.pop(None)
902 for section, options in list(sections.items()):
903 yield section.upper(), options
904
910
911
912
914 """basic mixin for simple configurations which don't need the
915 manager / providers model
916 """
933
942
945
947 return iter(self.config.__dict__.items())
948
950 try:
951 return getattr(self.config, self.option_attrname(key))
952 except (optik_ext.OptionValueError, AttributeError):
953 raise KeyError(key)
954
957
958 - def get(self, key, default=None):
963
964
966 """class for simple configurations which don't need the
967 manager / providers model and prefer delegation to inheritance
968
969 configuration values are accessible through a dict like interface
970 """
971
972 - def __init__(self, config_file=None, options=None, name=None,
973 usage=None, doc=None, version=None):
981
982
984 """Adapt an option manager to behave like a
985 `logilab.common.configuration.Configuration` instance
986 """
988 self.config = provider
989
991 return getattr(self.config, key)
992
994 provider = self.config._all_options[key]
995 try:
996 return getattr(provider.config, provider.option_attrname(key))
997 except AttributeError:
998 raise KeyError(key)
999
1002
1003 - def get(self, key, default=None):
1004 provider = self.config._all_options[key]
1005 try:
1006 return getattr(provider.config, provider.option_attrname(key))
1007 except AttributeError:
1008 return default
1009
1010
1011
1013 """initialize newconfig from a deprecated configuration file
1014
1015 possible changes:
1016 * ('renamed', oldname, newname)
1017 * ('moved', option, oldgroup, newgroup)
1018 * ('typechanged', option, oldtype, newvalue)
1019 """
1020
1021 changesindex = {}
1022 for action in changes:
1023 if action[0] == 'moved':
1024 option, oldgroup, newgroup = action[1:]
1025 changesindex.setdefault(option, []).append((action[0], oldgroup, newgroup))
1026 continue
1027 if action[0] == 'renamed':
1028 oldname, newname = action[1:]
1029 changesindex.setdefault(newname, []).append((action[0], oldname))
1030 continue
1031 if action[0] == 'typechanged':
1032 option, oldtype, newvalue = action[1:]
1033 changesindex.setdefault(option, []).append((action[0], oldtype, newvalue))
1034 continue
1035 if action[1] in ('added', 'removed'):
1036 continue
1037 raise Exception('unknown change %s' % action[0])
1038
1039 options = []
1040 for optname, optdef in newconfig.options:
1041 for action in changesindex.pop(optname, ()):
1042 if action[0] == 'moved':
1043 oldgroup, newgroup = action[1:]
1044 optdef = optdef.copy()
1045 optdef['group'] = oldgroup
1046 elif action[0] == 'renamed':
1047 optname = action[1]
1048 elif action[0] == 'typechanged':
1049 oldtype = action[1]
1050 optdef = optdef.copy()
1051 optdef['type'] = oldtype
1052 options.append((optname, optdef))
1053 if changesindex:
1054 raise Exception('unapplied changes: %s' % changesindex)
1055 oldconfig = Configuration(options=options, name=newconfig.name)
1056
1057 oldconfig.load_file_configuration(configfile)
1058
1059 changes.reverse()
1060 done = set()
1061 for action in changes:
1062 if action[0] == 'renamed':
1063 oldname, newname = action[1:]
1064 newconfig[newname] = oldconfig[oldname]
1065 done.add(newname)
1066 elif action[0] == 'typechanged':
1067 optname, oldtype, newvalue = action[1:]
1068 newconfig[optname] = newvalue
1069 done.add(optname)
1070 for optname, optdef in newconfig.options:
1071 if optdef.get('type') and not optname in done:
1072 newconfig.set_option(optname, oldconfig[optname], optdict=optdef)
1073
1074
1076 """preprocess a list of options and remove duplicates, returning a new list
1077 (tuple actually) of options.
1078
1079 Options dictionaries are copied to avoid later side-effect. Also, if
1080 `otpgroup` argument is specified, ensure all options are in the given group.
1081 """
1082 alloptions = {}
1083 options = list(options)
1084 for i in range(len(options)-1, -1, -1):
1085 optname, optdict = options[i]
1086 if optname in alloptions:
1087 options.pop(i)
1088 alloptions[optname].update(optdict)
1089 else:
1090 optdict = optdict.copy()
1091 options[i] = (optname, optdict)
1092 alloptions[optname] = optdict
1093 if optgroup is not None:
1094 alloptions[optname]['group'] = optgroup
1095 return tuple(options)
1096