1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """Unicode email support (extends email from stdlib)"""
19
20 __docformat__ = "restructuredtext en"
21
22 import email
23 from encodings import search_function
24 import sys
25 from email.utils import parseaddr, parsedate
26 from email.header import decode_header
27
28 from datetime import datetime
29
30 from six import text_type, binary_type
31
32 try:
33 from mx.DateTime import DateTime
34 except ImportError:
35 DateTime = datetime
36
37 import logilab.common as lgc
38
39
41 parts = []
42 for decoded, charset in decode_header(string):
43 if not charset :
44 charset = 'iso-8859-15'
45
46
47
48
49 if isinstance(decoded, binary_type):
50 decoded = decoded.decode(charset, 'replace')
51 assert isinstance(decoded, text_type)
52 parts.append(decoded)
53
54 if sys.version_info < (3, 3):
55
56
57 return u' '.join(parts)
58 return u''.join(parts)
59
65
71
73 """Encapsulates an email.Message instance and returns only unicode objects.
74 """
75
77 self.message = message
78
79
80
81 - def get(self, header, default=None):
82 value = self.message.get(header, default)
83 if value:
84 return decode_QP(value)
85 return value
86
88 return self.get(header)
89
90 - def get_all(self, header, default=()):
91 return [decode_QP(val) for val in self.message.get_all(header, default)
92 if val is not None]
93
96
99
101 for part in self.message.walk():
102 yield UMessage(part)
103
105 message = self.message
106 if index is None:
107 payload = message.get_payload(index, decode)
108 if isinstance(payload, list):
109 return [UMessage(msg) for msg in payload]
110 if message.get_content_maintype() != 'text':
111 return payload
112 if isinstance(payload, text_type):
113 return payload
114
115 charset = message.get_content_charset() or 'iso-8859-1'
116 if search_function(charset) is None:
117 charset = 'iso-8859-1'
118 return text_type(payload or b'', charset, "replace")
119 else:
120 payload = UMessage(message.get_payload(index, decode))
121 return payload
122
124 return text_type(self.message.get_content_maintype())
125
127 return text_type(self.message.get_content_type())
128
130 value = self.message.get_filename(failobj)
131 if value is failobj:
132 return value
133 try:
134 return text_type(value)
135 except UnicodeDecodeError:
136 return u'error decoding filename'
137
138
139
141 """return an unicode string containing all the message's headers"""
142 values = []
143 for header in self.message.keys():
144 values.append(u'%s: %s' % (header, self.get(header)))
145 return '\n'.join(values)
146
148 """return a list of 2-uple (name, address) for the given address (which
149 is expected to be an header containing address such as from, to, cc...)
150 """
151 persons = []
152 for person in self.get_all(header, ()):
153 name, mail = parseaddr(person)
154 persons.append((name, mail))
155 return persons
156
157 - def date(self, alternative_source=False, return_str=False):
158 """return a datetime object for the email's date or None if no date is
159 set or if it can't be parsed
160 """
161 value = self.get('date')
162 if value is None and alternative_source:
163 unix_from = self.message.get_unixfrom()
164 if unix_from is not None:
165 try:
166 value = unix_from.split(" ", 2)[2]
167 except IndexError:
168 pass
169 if value is not None:
170 datetuple = parsedate(value)
171 if datetuple:
172 if lgc.USE_MX_DATETIME:
173 return DateTime(*datetuple[:6])
174 return datetime(*datetuple[:6])
175 elif not return_str:
176 return None
177 return value
178