-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol.py
More file actions
286 lines (258 loc) · 9.73 KB
/
protocol.py
File metadata and controls
286 lines (258 loc) · 9.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/usr/bin/python3
#coding=utf-8
"""
File: protocol.py
Description: the email protocol wrapper, implement SMTP / POP3
Author: 0x7F@knownsec404
Time: 2021.06.23
"""
import smtplib
import poplib
import config
import mime
from utils import logger
#**********************************************************************
# @Class: SMTP
# @Description: implement and warpper SMTP protocol multi commands, and provide
# external sending interface
#**********************************************************************
class SMTP:
#**********************************************************************
# @Function: __init__(self, address, port, ssl, user, passwd)
# @Description: SMTP object initialize
# @Parameter: address, the SMTP server address
# @Parameter: port, the SMTP server port
# @Parameter: ssl, ssl is required to connect to the SMTP
# @Parameter: username, the mailbox username
# @Parameter: password, the mailbox password
# @Return: None
#**********************************************************************
def __init__(self, address, port, ssl, user, passwd):
self.address = address
self.port = port
self.ssl = ssl
self.user = user
self.passwd = passwd
# set value by "check_status()"
self.status = False
# end __init__()
#**********************************************************************
# @Function: _login_server(self)
# @Description: connect and login in SMTP server
# @Parameter: None
# @Return: smtp, the connectd SMTP object
#**********************************************************************
def _login_server(self):
# connect smtp server
try:
if self.ssl:
smtp = smtplib.SMTP_SSL(self.address, self.port)
else:
smtp = smtplib.SMTP(self.address, self.port)
except Exception as e:
logger.error(e)
return False, None
# if the log level is DEBUG
import logging
if config.LOG_LEVEL <= logging.DEBUG:
smtp.set_debuglevel(1)
# login in smtp server
try:
smtp.login(self.user, self.passwd)
except Exception as e:
smtp.close()
logger.error(e)
return False, None
return True, smtp
# end _login_server()
#**********************************************************************
# @Function: check_status(self)
# @Description: check SMTP server and user/pass status
# @Parameter: None
# @Return: status, return True when SMTP server is ok and user/pass is authed
#**********************************************************************
def check_status(self):
status, smtp = self._login_server()
if status:
self.status = True
smtp.quit()
return status
# end check_status()
#**********************************************************************
# @Function: send(self, email)
# @Description: send email through SMTP server
# @Parameter: email, the mime email object
# @Return: status, return True when email send success
#**********************************************************************
def send(self, email):
# connect/auth smtp server and send
status, smtp = self._login_server()
if status == False:
return False
try:
smtp.sendmail(email.sender, email.receiver+email.cc, email.MIME.as_string())
smtp.quit()
except Exception as e:
logger.error(e)
smtp.close()
return False
return True
# end send()
# end class
#**********************************************************************
# @Class: POP3
# @Description: implement and warpper POP3 protocol multi commands, and provide
# external sending interface
#**********************************************************************
class POP3:
#**********************************************************************
# @Function: __init__(self, address, port, ssl, user, passwd)
# @Description: POP3 object initialize
# @Parameter: address, the POP3 server address
# @Parameter: port, the POP3 server port
# @Parameter: ssl, ssl is required to connect to the POP3
# @Parameter: username, the mailbox username
# @Parameter: password, the mailbox password
# @Return: None
#**********************************************************************
def __init__(self, address, port, ssl, user, passwd):
self.address = address
self.port = port
self.ssl = ssl
self.user = user
self.passwd = passwd
# set value by "check_status()"
self.status = False
# end __init__()
#**********************************************************************
# @Function: _login_server(self)
# @Description: connect and login in POP3 server
# @Parameter: None
# @Return: pop3, the connectd POP3 object
#**********************************************************************
def _login_server(self):
# connect pop3 server
try:
if self.ssl:
pop3 = poplib.POP3_SSL(self.address, self.port)
else:
pop3 = poplib.POP3(self.address, self.port)
except Exception as e:
logger.error(e)
return False, None
# if the log level is DEBUG
import logging
if config.LOG_LEVEL <= logging.DEBUG:
pop3.set_debuglevel(1)
# login in pop3 server
try:
pop3.user(self.user)
pop3.pass_(self.passwd)
except Exception as e:
logger.error(e)
pop3.close()
return False, None
return True, pop3
# end _login_server()
#**********************************************************************
# @Function: check_status(self)
# @Description: check POP3 server and user/pass status
# @Parameter: None
# @Return: status, return True when POP3 server is ok and user/pass is authed
#**********************************************************************
def check_status(self):
status, pop3 = self._login_server()
if status:
self.status = True
pop3.quit()
return status
# end check_status()
#**********************************************************************
# @Function: stat(self)
# @Description: get mailbox status, include message count and mailbox size,
# we just return message count
# @Parameter: None
# @Return: count, the message count, while error will return -1
#**********************************************************************
def stat(self):
# connect/auth pop3 server
status, pop3 = self._login_server()
if status == False:
return -1
# get email status
try:
count, octets = pop3.stat()
pop3.quit()
except Exception as e:
logger.error(e)
pop3.close()
return -1
return count
# end stat()
#**********************************************************************
# @Function: uidl(self, which=0)
# @Description: get all message hash or get the hash of the mail with the
# specified id.
# @Parameter: which=0, message id, when the value less than or equal to 0,
# return the specified mail hash.
# @Return: result, the email hash list or hash string
# the single hash example: b'1 ZC3130-wi6DhoW5iDuIEDhYOkraUbh'
#**********************************************************************
def uidl(self, which=0):
# connect/auth pop3 server
status, pop3 = self._login_server()
if status == False:
return None
# get all email message digest (unique id) list
result = None
try:
if which > 0:
# get the uuid of the specified mail
line = pop3.uidl(which)
# check response with "+OK"
if line.decode("utf-8").startswith("+OK"):
result = line[4:]
else:
# get the uuid of all emails
resp, lines, octets = pop3.uidl()
# check response with "+OK"
if resp.decode("utf-8").startswith("+OK"):
result = lines
# end if-else
pop3.quit()
except Exception as e:
logger.error(e)
pop3.close()
return None
return result
# end uidl()
#**********************************************************************
# @Function: recv(self, which)
# @Description: get the email content of the specified id, the original
# content of the received email is parsed through MIME.
# @Parameter: which, the email id
# @Return: email, our internal warpper Email object
#**********************************************************************
def recv(self, which):
# connect/auth pop3 server
status, pop3 = self._login_server()
if status == False:
return None
# get email message by id
try:
resp, lines, octets = pop3.retr(which)
pop3.quit()
except Exception as e:
logger.error(e)
pop3.close()
return None
# check resp
if resp.decode("utf-8").startswith("-ERR"):
logger.warning("pop3 response ERR: %s" % resp)
return None
# join each line of email message content and decode the data with
# utf-8 charset encoding.
content = b'\r\n'.join(lines).decode("utf-8", "ignore")
return mime.Email(source=content)
# end recv()
# end class