Package logilab :: Package common :: Module umessage
[frames] | no frames]

Source Code for Module logilab.common.umessage

  1  # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """Unicode email support (extends email from stdlib)""" 
 19   
 20  __docformat__ = "restructuredtext en" 
 21   
 22  import email 
 23  from encodings import search_function 
 24  import sys 
 25  if sys.version_info >= (2, 5): 
 26      from email.utils import parseaddr, parsedate 
 27      from email.header import decode_header 
 28  else: 
 29      from email.Utils import parseaddr, parsedate 
 30      from email.Header import decode_header 
 31   
 32  from datetime import datetime 
 33   
 34  try: 
 35      from mx.DateTime import DateTime 
 36  except ImportError: 
 37      DateTime = datetime 
 38   
 39  import logilab.common as lgc 
 40   
 41   
42 -def decode_QP(string):
43 parts = [] 44 for decoded, charset in decode_header(string): 45 if not charset : 46 charset = 'iso-8859-15' 47 parts.append(decoded.decode(charset, 'replace')) 48 49 if sys.version_info < (3, 3): 50 # decoding was non-RFC compliant wrt to whitespace handling 51 # see http://bugs.python.org/issue1079 52 return u' '.join(parts) 53 return u''.join(parts)
54
55 -def message_from_file(fd):
56 try: 57 return UMessage(email.message_from_file(fd)) 58 except email.Errors.MessageParseError: 59 return ''
60
61 -def message_from_string(string):
62 try: 63 return UMessage(email.message_from_string(string)) 64 except email.Errors.MessageParseError: 65 return ''
66
67 -class UMessage:
68 """Encapsulates an email.Message instance and returns only unicode objects. 69 """ 70
71 - def __init__(self, message):
72 self.message = message
73 74 # email.Message interface ################################################# 75
76 - def get(self, header, default=None):
77 value = self.message.get(header, default) 78 if value: 79 return decode_QP(value) 80 return value
81
82 - def __getitem__(self, header):
83 return self.get(header)
84
85 - def get_all(self, header, default=()):
86 return [decode_QP(val) for val in self.message.get_all(header, default) 87 if val is not None]
88
89 - def is_multipart(self):
90 return self.message.is_multipart()
91
92 - def get_boundary(self):
93 return self.message.get_boundary()
94
95 - def walk(self):
96 for part in self.message.walk(): 97 yield UMessage(part)
98 99 if sys.version_info < (3, 0): 100
101 - def get_payload(self, index=None, decode=False):
102 message = self.message 103 if index is None: 104 payload = message.get_payload(index, decode) 105 if isinstance(payload, list): 106 return [UMessage(msg) for msg in payload] 107 if message.get_content_maintype() != 'text': 108 return payload 109 110 charset = message.get_content_charset() or 'iso-8859-1' 111 if search_function(charset) is None: 112 charset = 'iso-8859-1' 113 return unicode(payload or '', charset, "replace") 114 else: 115 payload = UMessage(message.get_payload(index, decode)) 116 return payload
117
118 - def get_content_maintype(self):
119 return unicode(self.message.get_content_maintype())
120
121 - def get_content_type(self):
122 return unicode(self.message.get_content_type())
123
124 - def get_filename(self, failobj=None):
125 value = self.message.get_filename(failobj) 126 if value is failobj: 127 return value 128 try: 129 return unicode(value) 130 except UnicodeDecodeError: 131 return u'error decoding filename'
132 133 else: 134
135 - def get_payload(self, index=None, decode=False):
136 message = self.message 137 if index is None: 138 payload = message.get_payload(index, decode) 139 if isinstance(payload, list): 140 return [UMessage(msg) for msg in payload] 141 return payload 142 else: 143 payload = UMessage(message.get_payload(index, decode)) 144 return payload
145
146 - def get_content_maintype(self):
147 return self.message.get_content_maintype()
148
149 - def get_content_type(self):
150 return self.message.get_content_type()
151
152 - def get_filename(self, failobj=None):
153 return self.message.get_filename(failobj)
154 155 # other convenience methods ############################################### 156
157 - def headers(self):
158 """return an unicode string containing all the message's headers""" 159 values = [] 160 for header in self.message.keys(): 161 values.append(u'%s: %s' % (header, self.get(header))) 162 return '\n'.join(values)
163
164 - def multi_addrs(self, header):
165 """return a list of 2-uple (name, address) for the given address (which 166 is expected to be an header containing address such as from, to, cc...) 167 """ 168 persons = [] 169 for person in self.get_all(header, ()): 170 name, mail = parseaddr(person) 171 persons.append((name, mail)) 172 return persons
173
174 - def date(self, alternative_source=False, return_str=False):
175 """return a datetime object for the email's date or None if no date is 176 set or if it can't be parsed 177 """ 178 value = self.get('date') 179 if value is None and alternative_source: 180 unix_from = self.message.get_unixfrom() 181 if unix_from is not None: 182 try: 183 value = unix_from.split(" ", 2)[2] 184 except IndexError: 185 pass 186 if value is not None: 187 datetuple = parsedate(value) 188 if datetuple: 189 if lgc.USE_MX_DATETIME: 190 return DateTime(*datetuple[:6]) 191 return datetime(*datetuple[:6]) 192 elif not return_str: 193 return None 194 return value
195