我有一个网站,免费的流量是每天签到会给100~500MB流量。
我的想法是写一个自动脚本放到服务器上,每天自动签到——但是它签到的时候会有一个验证码识别如下
1. 自己配置蟒蛇和PyTroch环境(最好是cuda版本)
我的是Python 3.8.6 torch 2.0.1
官网pytorch.org/,教程自己百度。
2. 先使用requests模拟登录来获取验证码
data = {"email": "*",
"passwd": "*",
"remember_me": "week"} # Post请求发送的数据,字典格式
session = requests.session()
log_res = session.post(url=r"/_login.php", data=data)
#获取验证码
get_code = requests.get(url=r"/captcha.php", cookies=session.cookies)
3. 分割数据并人工标记,保存本地用于之后的模型训练 。
size = int(time.time())
for i in range(100):
get_code = requests.get(url=r"/captcha.php", cookies=session.cookies)
img = plt.imread(io.BytesIO(get_code.content))
#按照图片的格式去自定义分割线的位置
for a in np.split(np.concatenate([img[10:54, 0:150, :], img[10:54, 165:315, :]], axis=1), 4, axis=1):
f = plt.gcf()
plt.axis('off') # 去坐标轴
plt.xticks([]) # 去 x 轴刻度
plt.yticks([]) # 去 y 轴刻度
plt.imshow(a)
f.show()
size += 1
captcha = raw_input('验证码:')[0]
f.savefig(r"Img/val/" + captcha + "/"+captcha+"_" + str(size))
f.clear()
4. 将用到的函数整合到一个工具类中
训练工具类
from __future__ import print_function, division
import copy
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.backends.cudnn as cudnn
from PIL import Image
from torch import device
from torch.autograd import Variable
from torchvision import transforms
cudnn.benchmark = True
plt.ion() # interactive mode
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#这里不是10+26个,因为那个网站已经将数字字母混淆的字母去掉了
number = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u',
'v', 'w', 'x', 'y', 'z']
def labels():
labels = []
for dir in os.listdir(r"Img"):
if os.path.getsize(os.path.join(r"Img", dir)):
labels.append(dir)
return labels
loader = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
class My_dataset(torch.utils.data.Dataset):
# 每个图片的加载器
def __init__(self, imgList, lableList):
self.lableList = lableList
self.img_path = imgList
def __len__(self):
return len(self.img_path)
def __getitem__(self, idx):
image = image_loader(self.img_path[idx])
label = torch.tensor(int(self.lableList.index(self.img_path[idx].split("")[-1].split("-")[0])))
return image, label
def image_loader(image_name):
image = Image.open(image_name).convert('RGB')
image = loader(image)
return Variable(image.to(device, torch.float), requires_grad=False)
def imshow(img):
# print(img)
# img = img / 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def update_lr(optimizer, lr):
for param_group in optimizer.param_groups:
param_group['lr'] = lr
"""
通过四个坐标点在任意位置切割图片,主要用于将大图片分割成多个小图片
img_path:需要切割图片的路径
"""
def cut_image(path):
img = Image.open(path)
w, h = img.size
# 坐标点可以根据自己的需要进行调整
cut = [(0, 0, 120, h), (120, 0, 240, h), (240, 0, 360, h), (360, 0, w, h)]
for i, n in enumerate(cut, 1):
temp = img.crop(n)
# 分别保存多个小图片,路径可以根据自己的需要设计
temp.save(path.replace(".jpg", str(i - 1) + '.jpg'))
return True
"""
通过坐标xy的最大最小值对图片进行整体切割
path1:需要切割图片的路径
path2:切割后保存图片的位置
x_min:切割矩形左边x值对应原图的x坐标
x_max:切割矩形右边x值对应原图的x坐标
y_min:切割矩形上边y值对应原图的y坐标
y_max:切割矩形下边y值对应原图的y坐标
"""
def cut_img_by_xy(path1, x_min, x_max, y_min, y_max, path2):
img = Image.open(path1)
crop = img.crop((x_min, y_min, x_max, y_max))
crop.save(path2)
def imshow(inp, title=None):
"""Imshow for Tensor."""
inp = inp.numpy().transpose((1, 2, 0))
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
# inp = std * inp + mean
inp = np.clip(inp, 0, 1)
plt.imshow(inp)
if title is not None:
plt.title(title)
plt.pause(0.001) # pause a bit so that plots are updated
class My_dataset(torch.utils.data.Dataset):
# 每个图片的加载器
def __init__(self, imgList, lableList):
self.lableList = lableList
self.img_path = imgList
def __len__(self):
return len(self.img_path)
def __getitem__(self, idx):
image = image_loader(self.img_path[idx])
label = torch.tensor(self.lableList[idx])
return image, label
def train_model(model,modelPath, dataLoaders, dataset_sizes, criterion, optimizer, scheduler, num_epochs=25):
since = time.time()
model.train() # Set model to training mode
for epoch in range(num_epochs):
print(f'Epoch {epoch}/{num_epochs - 1}')
print('-' * 10)
for phase in ['train', 'val']:
# if phase == 'train':
# model = torch.load(modelPath)
# model.train() # Set model to training mode
# else:
# torch.save(model, modelPath)
# model.eval() # Set model to evaluate mode
running_loss = 0.0
running_corrects = 0
# Iterate over data.
for inputs, _labels in dataLoaders[phase]:
inputs = inputs.to(device)
_labels = _labels.to(device)
optimizer.zero_grad()
with torch.set_grad_enabled(mode=True):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, _labels)
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == _labels.data)
# if phase == 'train':
# scheduler.step()
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double() / dataset_sizes[phase]
print(f' Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
print()
time_elapsed = time.time() - since
print(f'培训完成于 {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
return model
def visualize_model(model, dataLoaders, class_names, num_images=6):
was_training = model.training
model.eval()
images_so_far = 0
fig = plt.figure()
with torch.no_grad():
for i, (inputs, _labels) in enumerate(dataLoaders['val']):
inputs = inputs.to(device)
_labels = _labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
for j in range(inputs.size()[0]):
images_so_far += 1
ax = plt.subplot(num_images // 2, 2, images_so_far)
ax.axis('off')
ax.set_title(f'label: {class_names[preds[j]]}')
imshow(inputs.cpu().data[j])
if images_so_far == num_images:
model.train(mode=was_training)
return
model.train(mode=was_training)
获取邮件的工具类,因为它有安全认证——验证码发到邮箱,登录的时候需要携带此验证码
# !/usr/bin/env python
# -*-coding:utf-8 -*-
"""
# File : getemail.py
# Time :2022/8/11 15:43
# version :python 3.6
# Description:
"""
import base64
import calendar
import poplib
# 输入邮件地址, 口令和POP3服务器地址:
import re
import time
from email.header import decode_header
from email.parser import Parser
from email.utils import parseaddr
poplib._MAXLINE = 20480
class Email:
def __init__(self, email, password, pop3_server='pop.163.com'):
# 连接到POP3服务器:
self.server = poplib.POP3(pop3_server)
# 可以打开或关闭调试信息:
# self.server.set_debuglevel(1)
# 可选:打印POP3服务器的欢迎文字:
# print(server.getwelcome().decode('utf-8'))
# 身份认证:
self.server.user(email)
self.server.pass_(password)
# stat()返回邮件数量和占用空间:
# print('Messages: %s. Size: %s' % server.stat())
# list()返回所有邮件的编号:
resp, mails, octets = self.server.list()
# 可以查看返回的列表类似[b'1 82923', b'2 2184', ...]
# print(mails)
# 获取最新一封邮件, 注意索引号从1开始:
index = len(mails)
if index < 1:
self.server.quit()
self.__init__(email=email, password=password)
else:
resp, lines, octets = self.server.retr(index)
# lines存储了邮件的原始文本的每一行,
# 可以获得整个邮件的原始文本:
self.msg_content = b'rn'.join(lines).decode('utf-8')
# print(get_date)
# 稍后解析出邮件:
self.msg = Parser().parsestr(self.msg_content)
# 关闭连接:
for i in range(index):
self.server.dele(1)
self.body = ''
self.header_dict = {}
self.server.quit()
def get_email_time(self):
get_date = re.search(r'Date:s([A-Za-z]{1,3}),s([0-9]{1,2})s([A-Za-z]{1,3})s([0-9]{1,4})s([0-9]{1,2}):',
self.msg_content)
return '{}-{}-{}'.format(get_date.group(4),
str(list(calendar.month_abbr).index(get_date.group(3))).zfill(2),
str(get_date.group(2)).zfill(2))
def get_header_info(self, msg, indent=0):
self.msg = msg
if indent == 0:
for header in ['From', 'To', 'Subject']:
value = self.msg.get(header, '')
if value:
if header == 'Subject':
value = self.decode_str(value)
else:
hdr, addr = parseaddr(value)
name = self.decode_str(hdr)
value = u'%s <%s>' % (name, addr)
self.header_dict[header] = value
# print('%s%s: %s' % (' ' * indent, header, value))
return self.header_dict
def get_body_info(self, msg, indent=0):
self.msg = msg
if not self.msg.is_multipart():
content_type = self.msg.get_content_type()
if content_type == 'text/plain':
content = self.msg.get_payload(decode=True)
charset = self.guess_charset(self.msg)
if charset:
content = content.decode(charset)
self.body = content
else:
parts = self.msg.get_payload()
for n, part in enumerate(parts):
self.get_body_info(part, indent=indent + 1)
return self.body
@staticmethod
def decode_str(s):
value, charset = decode_header(s)[0]
if charset:
value = value.decode(charset)
return value
@staticmethod
def guess_charset(msg):
charset = msg.get_charset()
if charset is None:
content_type = msg.get('Content-Type', '').lower()
pos = content_type.find('charset=')
if pos >= 0:
charset = content_type[pos + 8:].strip()
return charset
#根据返回的格式,获取到验证码
def email_captcha(email, code, pop3_server):
ema = Email(email, code, pop3_server)
d = re.search('color: #c7254e;">.*</span></p>',
base64.b64decode(re.sub("[^A-z0-9+/=]", "", ema.msg_content[ema.msg_content.find(
"base64rnrn") + 10:ema.msg_content.rfind("-")])).decode(
'UTF-8')).group()[17:25]
print("验证码:"+d)
return d
5. 训练模型 等到有一个良好的正常率(训练数据/测试数据)
我认为resnet18就够了(图片小,显卡垃圾),你可以上50甚至更高的。
optimizer_ft、exp_lr_scheduler的最后一个个参数可以适当调整,在刚训练的时候大一点,到准确率80%以上可以调小一点。
我最后是以训练数据99%,预测测试数据95%结束训练,
from __future__ import print_function, division
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, transforms
from MyCaptcha import Utils
#标准化
data_transforms = transforms.Compose([
transforms.Resize((224, 224)),
transforms.Grayscale(num_output_channels=3),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
data_dir = 'Img'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
data_transforms)
for x in ['train', 'val']}
dataLoaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=8,
shuffle=True, num_workers=0)
for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
modelPath="model_18.pth"
# model_ft = models.resnet18(num_classes=len(class_names)) #第一次用这个
model_ft=torch.load(modelPath)
# for name, para in model_ft.named_parameters():
# # 除最后的全连接层外,其他权重全部冻结
# if "fc" not in name:
# para.requires_grad_(False)
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=20, gamma=0.01)
model_ft = model_ft.to(Utils.device)
model_ft = Utils.train_model(model_ft,modelPath ,dataLoaders, dataset_sizes, criterion, optimizer_ft, exp_lr_scheduler,
num_epochs=100)
model_ft.eval()
torch.save(model_ft,modelPath)
Utils.visualize_model(model_ft,dataLoaders,class_names)
6. 在模拟登录的基础上,对验证码进行识别并提交请求
import datetime
import io
import os
import re
import shutil
import time
import matplotlib.pyplot as plt
import numpy as np
import requests
import torch
from torchvision import transforms, datasets
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import MyEmail
basePath = ""
tly_Http = "https://???"
logs = open("logs.txt", "w+", encoding='utf-8')
#本地的代理
proxies = {
'https': 'http://127.0.0.1:7890'
}
data_transforms = {
'train': transforms.Compose([
transforms.ToTensor(),
transforms.Resize((224, 224)),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'cache': transforms.Compose([
transforms.ToTensor(),
transforms.Resize((224, 224)),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
#这里是因为服务器上没有cuda
device = torch.device("cpu")
image_datasets = {x: datasets.ImageFolder(os.path.join(basePath, x),
data_transforms[x])
for x in ['train']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=2,
shuffle=False, num_workers=0)
for x in ['train']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train']}
class_names = image_datasets['train'].classes
#以上仅仅是读取标签的个数和名称
model_ft = torch.load(os.path.join(basePath, "test"), map_location=torch.device('cpu'))
def visualize_model(model, num_images=4):
was_training = model.training
model.eval()
images_so_far = 0
_label = ""
_datasets = {x: datasets.ImageFolder(os.path.join(basePath, x),
data_transforms[x])
for x in ['cache']}
_dataloaders = {x: torch.utils.data.DataLoader(_datasets[x], batch_size=2,
shuffle=False, num_workers=0)
for x in ['cache']}
with torch.no_grad():
for i, (inputs, labels) in enumerate(_dataloaders['cache']):
inputs = inputs.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
for j in range(inputs.size()[0]):
_label = _label + class_names[preds[j]]
if images_so_far == num_images:
model.train(mode=was_training)
return
model.train(mode=was_training)
return _label
class MyData:
email = ""
passwd = ""
code = ""
session = None
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/110.0',
"Content-Type": "application/json"}
def __init__(self, email, passwd, code):
self.email = email
self.passwd = passwd
self.code = code
self.session = requests.session()
logs.write("n开始登录")
_log_res = self.session.post(url=tly_Http + r"/_login.php", data=self._login_data(), proxies=proxies
)
if _log_res.text.__contains__("/login2.php"):
time.sleep(3)
logs.write("开始邮箱登录")
log2_res = self.session.post(url=tly_Http + r"/login2.php", proxies=proxies, verify=False)
time.sleep(10)
log_res = self.session.post(url=tly_Http + r"/_login.php?two=1", data=self.login_data(), proxies=proxies,
verify=False)
def _login_data(self):
return {"email": self.email,
"passwd": self.passwd,
"remember_me": "week"}
def login_data(self):
return {"email": self.email,
"passwd": self.passwd,
"remember_me": "week",
"emailcode3": MyEmail.email_captcha(email=self.email, code=self.code,
pop3_server='pop.163.com')}
def run(self):
success = False
for i in range(10):
try:
if bool(1 - success):
logs.write("开始获取验证码")
get_code = requests.get(url=tly_Http + r"/other/captcha.php", cookies=self.session.cookies,
proxies=proxies, verify=False)
img = plt.imread(io.BytesIO(get_code.content))
for _imgPath in os.listdir(os.path.join("cache", "test")):
os.remove(os.path.join(os.path.join("cache", "test"), _imgPath))
img_index = int(time.time())
for a in np.split(np.concatenate([img[10:54, 0:150, :], img[10:54, 165:315, :]], axis=1), 4,
axis=1):
f = plt.gcf()
plt.axis('off') # 去坐标轴
plt.xticks([]) # 去 x 轴刻度
plt.yticks([]) # 去 y 轴刻度
plt.imshow(a)
f.savefig(os.path.join(os.path.join(os.path.join(basePath, "cache"), "test"), str(img_index)))
img_index += 1
f.clear()
label = visualize_model(model_ft)
get_code = requests.get(url=tly_Http + r"/modules/_checkin.php?captcha=" + label,
cookies=self.session.cookies, proxies=proxies, verify=False)
result = re.search("'.*'", get_code.text).group()
logs.write(
"n" + self.email + "nt" + get_code.text + "-------------------------------"
+ datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
i = 0
if re.match(r"'获得了.*MB流量!'", result):
for _imgPath in os.listdir(os.path.join("cache", "test")):
shutil.move(os.path.join(os.path.join("cache", "test"), _imgPath),
os.path.join(os.path.join(r"train", label[i]), _imgPath))
logs.write("数据已保存,get_code.text")
success = True
else:
logs.write("即将重试")
logs.write("脚本已完成")
except:
logs.write("脚本出错")
if __name__ == '__main__':
dataMap = [
MyData("username1", "possword", "???"),
MyData("username2", "possword", "???")
]
while True:
for data in dataMap:
try:
data.run()
except ConnectionError as e:
pass
time.sleep(24 * 60 * 60 + 10)