以下功能實現 讀取 email_list.txt 文件列表並發送郵件
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import threading
import time
# 配置SMTP服务器和登录信息
SMTP_SERVER = ''
SMTP_PORT = 587 # 默认使用STARTTLS协议
USERNAME = ''
PASSWORD = ''
SENDER_EMAIL = ''
# 配置邮件内容
SUBJECT = 'Your Subject'
MESSAGE = 'Your Message'
# 配置失败重试次数和等待时间
MAX_RETRIES = 3
WAIT_TIME = 1
# 读取邮箱列表
def read_email_list(file_path):
with open(file_path, 'r') as f:
return f.read().splitlines()
# 发送邮件函数
def send_email(recipient_email, use_ssl=False):
try_count = 0
success = False
while try_count < MAX_RETRIES and not success:
try:
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = SENDER_EMAIL
msg['To'] = recipient_email
msg['Subject'] = SUBJECT
# 添加邮件内容
msg.attach(MIMEText(MESSAGE, 'plain'))
# 连接SMTP服务器并发送邮件
if use_ssl:
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(USERNAME, PASSWORD)
server.sendmail(SENDER_EMAIL, recipient_email, msg.as_string())
else:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # 启用TLS加密
server.login(USERNAME, PASSWORD)
server.sendmail(SENDER_EMAIL, recipient_email, msg.as_string())
print(f"Email sent to {recipient_email}")
success = True
except Exception as e:
print(f"Failed to send email to {recipient_email}. Retrying...")
try_count += 1
time.sleep(WAIT_TIME)
# 主函数
def main():
# 读取邮箱列表
email_list = read_email_list('email_list.txt')
# 创建线程列表
threads = []
# 创建并启动线程
for email in email_list:
# 根据需要选择是否使用SSL加密
use_ssl = True if email.endswith('@secure-domain.com') else False
thread = threading.Thread(target=send_email, args=(email, use_ssl))
threads.append(thread)
thread.start()
# 等待所有线程执行完毕
for thread in threads:
thread.join()
print("All emails sent successfully!")
if __name__ == "__main__":
main()