当前位置:首页> AI教程> 使用Python和PyTorch实现自动验证码识别工具

使用Python和PyTorch实现自动验证码识别工具

释放双眼,带上耳机,听听看~!
学习如何使用Python和PyTorch实现自动验证码识别工具,包括模拟登录获取验证码,人工标记和保存数据用于模型训练,以及整合函数到工具类中。

我有一个网站,免费的流量是每天签到会给100~500MB流量。
我的想法是写一个自动脚本放到服务器上,每天自动签到——但是它签到的时候会有一个验证码识别如下

使用Python和PyTorch实现自动验证码识别工具

1. 自己配置蟒蛇和PyTroch环境(最好是cuda版本)

我的是Python 3.8.6 torch 2.0.1
官网pytorch.org/,教程自己百度。

2. 先使用requests模拟登录来获取验证码


data = {"email": "*",
        "passwd": "*",
        "remember_me": "week"}  # Post请求发送的数据,字典格式
session = requests.session()
log_res = session.post(url=r"/_login.php", data=data)  
#获取验证码
get_code = requests.get(url=r"/captcha.php", cookies=session.cookies)

3. 分割数据并人工标记,保存本地用于之后的模型训练 。

size = int(time.time())
for i in range(100):
    get_code = requests.get(url=r"/captcha.php", cookies=session.cookies)
    img = plt.imread(io.BytesIO(get_code.content))
    #按照图片的格式去自定义分割线的位置
    for a in np.split(np.concatenate([img[10:54, 0:150, :], img[10:54, 165:315, :]], axis=1), 4, axis=1):
        f = plt.gcf()
        plt.axis('off')  # 去坐标轴
        plt.xticks([])  # 去 x 轴刻度
        plt.yticks([])  # 去 y 轴刻度
        plt.imshow(a)
        f.show()
        size += 1
        captcha = raw_input('验证码:')[0]
        f.savefig(r"Img/val/" + captcha + "/"+captcha+"_" + str(size))
        f.clear()

使用Python和PyTorch实现自动验证码识别工具

使用Python和PyTorch实现自动验证码识别工具

4. 将用到的函数整合到一个工具类中

训练工具类

from __future__ import print_function, division

import copy
import os
import time

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.backends.cudnn as cudnn
from PIL import Image
from torch import device
from torch.autograd import Variable
from torchvision import transforms

cudnn.benchmark = True
plt.ion()  # interactive mode

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#这里不是10+26个,因为那个网站已经将数字字母混淆的字母去掉了
number = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u',
            'v', 'w', 'x', 'y', 'z']

def labels():
    labels = []
    for dir in os.listdir(r"Img"):
        if os.path.getsize(os.path.join(r"Img", dir)):
            labels.append(dir)
    return labels


loader = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


class My_dataset(torch.utils.data.Dataset):
    # 每个图片的加载器
    def __init__(self, imgList, lableList):
        self.lableList = lableList
        self.img_path = imgList

    def __len__(self):
        return len(self.img_path)

    def __getitem__(self, idx):
        image = image_loader(self.img_path[idx])
        label = torch.tensor(int(self.lableList.index(self.img_path[idx].split("")[-1].split("-")[0])))
        return image, label


def image_loader(image_name):
    image = Image.open(image_name).convert('RGB')
    image = loader(image)
    return Variable(image.to(device, torch.float), requires_grad=False)


def imshow(img):
    # print(img)
    # img = img / 2 + 0.5  # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


def update_lr(optimizer, lr):
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


"""
通过四个坐标点在任意位置切割图片,主要用于将大图片分割成多个小图片
img_path:需要切割图片的路径
"""


def cut_image(path):
    img = Image.open(path)
    w, h = img.size
    # 坐标点可以根据自己的需要进行调整
    cut = [(0, 0, 120, h), (120, 0, 240, h), (240, 0, 360, h), (360, 0, w, h)]
    for i, n in enumerate(cut, 1):
        temp = img.crop(n)
        # 分别保存多个小图片,路径可以根据自己的需要设计
        temp.save(path.replace(".jpg", str(i - 1) + '.jpg'))
    return True


"""
通过坐标xy的最大最小值对图片进行整体切割
path1:需要切割图片的路径
path2:切割后保存图片的位置
x_min:切割矩形左边x值对应原图的x坐标
x_max:切割矩形右边x值对应原图的x坐标
y_min:切割矩形上边y值对应原图的y坐标
y_max:切割矩形下边y值对应原图的y坐标
"""


def cut_img_by_xy(path1, x_min, x_max, y_min, y_max, path2):
    img = Image.open(path1)

    crop = img.crop((x_min, y_min, x_max, y_max))

    crop.save(path2)


def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    # inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


class My_dataset(torch.utils.data.Dataset):
    # 每个图片的加载器
    def __init__(self, imgList, lableList):
        self.lableList = lableList
        self.img_path = imgList

    def __len__(self):
        return len(self.img_path)

    def __getitem__(self, idx):
        image = image_loader(self.img_path[idx])
        label = torch.tensor(self.lableList[idx])
        return image, label


def train_model(model,modelPath, dataLoaders, dataset_sizes, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()
    model.train()  # Set model to training mode
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)
        for phase in ['train', 'val']:
            # if phase == 'train':
                # model = torch.load(modelPath)
                # model.train()  # Set model to training mode
            # else:
                # torch.save(model, modelPath)
                # model.eval()  # Set model to evaluate mode
            running_loss = 0.0
            running_corrects = 0
            # Iterate over data.
            for inputs, _labels in dataLoaders[phase]:
                inputs = inputs.to(device)
                _labels = _labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(mode=True):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, _labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == _labels.data)

                # if phase == 'train':
                #     scheduler.step()
                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f' Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

    print()

    time_elapsed = time.time() - since
    print(f'培训完成于 {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    return model


def visualize_model(model, dataLoaders, class_names, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, _labels) in enumerate(dataLoaders['val']):
            inputs = inputs.to(device)
            _labels = _labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images // 2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'label: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

获取邮件的工具类,因为它有安全认证——验证码发到邮箱,登录的时候需要携带此验证码

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
# File       : getemail.py
# Time       :2022/8/11 15:43
# version    :python 3.6
# Description:
"""
import base64
import calendar
import poplib
# 输入邮件地址, 口令和POP3服务器地址:
import re
import time
from email.header import decode_header
from email.parser import Parser
from email.utils import parseaddr

poplib._MAXLINE = 20480


class Email:
    def __init__(self, email, password, pop3_server='pop.163.com'):
        # 连接到POP3服务器:
        self.server = poplib.POP3(pop3_server)
        # 可以打开或关闭调试信息:
        # self.server.set_debuglevel(1)
        # 可选:打印POP3服务器的欢迎文字:
        # print(server.getwelcome().decode('utf-8'))

        # 身份认证:
        self.server.user(email)
        self.server.pass_(password)

        # stat()返回邮件数量和占用空间:
        # print('Messages: %s. Size: %s' % server.stat())
        # list()返回所有邮件的编号:
        resp, mails, octets = self.server.list()
        # 可以查看返回的列表类似[b'1 82923', b'2 2184', ...]
        # print(mails)
        # 获取最新一封邮件, 注意索引号从1开始:
        index = len(mails)
        if index < 1:
            self.server.quit()
            self.__init__(email=email, password=password)
        else:
            resp, lines, octets = self.server.retr(index)
            # lines存储了邮件的原始文本的每一行,
            # 可以获得整个邮件的原始文本:
            self.msg_content = b'rn'.join(lines).decode('utf-8')
            # print(get_date)
            # 稍后解析出邮件:
            self.msg = Parser().parsestr(self.msg_content)
            # 关闭连接:
            for i in range(index):
                self.server.dele(1)
            self.body = ''
            self.header_dict = {}
            self.server.quit()



    def get_email_time(self):
        get_date = re.search(r'Date:s([A-Za-z]{1,3}),s([0-9]{1,2})s([A-Za-z]{1,3})s([0-9]{1,4})s([0-9]{1,2}):',
                             self.msg_content)
        return '{}-{}-{}'.format(get_date.group(4),
                                 str(list(calendar.month_abbr).index(get_date.group(3))).zfill(2),
                                 str(get_date.group(2)).zfill(2))

    def get_header_info(self, msg, indent=0):
        self.msg = msg
        if indent == 0:
            for header in ['From', 'To', 'Subject']:
                value = self.msg.get(header, '')
                if value:
                    if header == 'Subject':
                        value = self.decode_str(value)
                    else:
                        hdr, addr = parseaddr(value)
                        name = self.decode_str(hdr)
                        value = u'%s <%s>' % (name, addr)
                self.header_dict[header] = value
                # print('%s%s: %s' % ('  ' * indent, header, value))
            return self.header_dict

    def get_body_info(self, msg, indent=0):
        self.msg = msg
        if not self.msg.is_multipart():
            content_type = self.msg.get_content_type()
            if content_type == 'text/plain':
                content = self.msg.get_payload(decode=True)
                charset = self.guess_charset(self.msg)
                if charset:
                    content = content.decode(charset)
                self.body = content
        else:
            parts = self.msg.get_payload()
            for n, part in enumerate(parts):
                self.get_body_info(part, indent=indent + 1)
        return self.body

    @staticmethod
    def decode_str(s):
        value, charset = decode_header(s)[0]
        if charset:
            value = value.decode(charset)
        return value

    @staticmethod
    def guess_charset(msg):
        charset = msg.get_charset()
        if charset is None:
            content_type = msg.get('Content-Type', '').lower()
            pos = content_type.find('charset=')
            if pos >= 0:
                charset = content_type[pos + 8:].strip()
        return charset


#根据返回的格式,获取到验证码
def email_captcha(email, code, pop3_server):
    ema = Email(email, code, pop3_server)
    d = re.search('color: #c7254e;">.*</span></p>',
                  base64.b64decode(re.sub("[^A-z0-9+/=]", "", ema.msg_content[ema.msg_content.find(
                      "base64rnrn") + 10:ema.msg_content.rfind("-")])).decode(
                      'UTF-8')).group()[17:25]
    print("验证码:"+d)
    return d

5. 训练模型 等到有一个良好的正常率(训练数据/测试数据)

我认为resnet18就够了(图片小,显卡垃圾),你可以上50甚至更高的。

optimizer_ft、exp_lr_scheduler的最后一个个参数可以适当调整,在刚训练的时候大一点,到准确率80%以上可以调小一点。

我最后是以训练数据99%,预测测试数据95%结束训练,

from __future__ import print_function, division
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, transforms

from MyCaptcha import Utils
#标准化
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
data_dir = 'Img'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms)
                  for x in ['train', 'val']}
dataLoaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=8,
                                              shuffle=True, num_workers=0)
               for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
modelPath="model_18.pth"
# model_ft = models.resnet18(num_classes=len(class_names)) #第一次用这个
model_ft=torch.load(modelPath)
# for name, para in model_ft.named_parameters():
#     # 除最后的全连接层外,其他权重全部冻结
#     if "fc" not in name:
#         para.requires_grad_(False)
criterion = nn.CrossEntropyLoss()

optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=20, gamma=0.01)
model_ft = model_ft.to(Utils.device)
model_ft = Utils.train_model(model_ft,modelPath ,dataLoaders, dataset_sizes, criterion, optimizer_ft, exp_lr_scheduler,
                             num_epochs=100)
model_ft.eval()
torch.save(model_ft,modelPath)
Utils.visualize_model(model_ft,dataLoaders,class_names)

6. 在模拟登录的基础上,对验证码进行识别并提交请求

import datetime
import io
import os
import re
import shutil
import time

import matplotlib.pyplot as plt
import numpy as np
import requests
import torch
from torchvision import transforms, datasets

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

import MyEmail

basePath = ""
tly_Http = "https://???"

logs = open("logs.txt", "w+", encoding='utf-8')
#本地的代理
proxies = {
    'https': 'http://127.0.0.1:7890'
}
data_transforms = {
    'train': transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224, 224)),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'cache': transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((224, 224)),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}
#这里是因为服务器上没有cuda
device = torch.device("cpu")
image_datasets = {x: datasets.ImageFolder(os.path.join(basePath, x),
                                          data_transforms[x])
                  for x in ['train']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=2,
                                              shuffle=False, num_workers=0)
               for x in ['train']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train']}
class_names = image_datasets['train'].classes
#以上仅仅是读取标签的个数和名称
model_ft = torch.load(os.path.join(basePath, "test"), map_location=torch.device('cpu'))


def visualize_model(model, num_images=4):
    was_training = model.training
    model.eval()
    images_so_far = 0
    _label = ""
    _datasets = {x: datasets.ImageFolder(os.path.join(basePath, x),
                                         data_transforms[x])
                 for x in ['cache']}
    _dataloaders = {x: torch.utils.data.DataLoader(_datasets[x], batch_size=2,
                                                   shuffle=False, num_workers=0)
                    for x in ['cache']}
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(_dataloaders['cache']):
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            for j in range(inputs.size()[0]):
                _label = _label + class_names[preds[j]]

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)
    return _label


class MyData:
    email = ""
    passwd = ""
    code = ""
    session = None
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/110.0',
               "Content-Type": "application/json"}

    def __init__(self, email, passwd, code):
        self.email = email
        self.passwd = passwd
        self.code = code
        self.session = requests.session()
        logs.write("n开始登录")
        _log_res = self.session.post(url=tly_Http + r"/_login.php", data=self._login_data(), proxies=proxies
                                     )
        if _log_res.text.__contains__("/login2.php"):
            time.sleep(3)
            logs.write("开始邮箱登录")
            log2_res = self.session.post(url=tly_Http + r"/login2.php", proxies=proxies, verify=False)
            time.sleep(10)
            log_res = self.session.post(url=tly_Http + r"/_login.php?two=1", data=self.login_data(), proxies=proxies,
                                        verify=False)

    def _login_data(self):
        return {"email": self.email,
                "passwd": self.passwd,
                "remember_me": "week"}

    def login_data(self):
        return {"email": self.email,
                "passwd": self.passwd,
                "remember_me": "week",
                "emailcode3": MyEmail.email_captcha(email=self.email, code=self.code,
                                                    pop3_server='pop.163.com')}

    def run(self):
        success = False
        for i in range(10):
            try:
                if bool(1 - success):
                    logs.write("开始获取验证码")
                    get_code = requests.get(url=tly_Http + r"/other/captcha.php", cookies=self.session.cookies,
                                            proxies=proxies, verify=False)
                    img = plt.imread(io.BytesIO(get_code.content))
                    for _imgPath in os.listdir(os.path.join("cache", "test")):
                        os.remove(os.path.join(os.path.join("cache", "test"), _imgPath))
                    img_index = int(time.time())
                    for a in np.split(np.concatenate([img[10:54, 0:150, :], img[10:54, 165:315, :]], axis=1), 4,
                                      axis=1):
                        f = plt.gcf()
                        plt.axis('off')  # 去坐标轴
                        plt.xticks([])  # 去 x 轴刻度
                        plt.yticks([])  # 去 y 轴刻度
                        plt.imshow(a)
                        f.savefig(os.path.join(os.path.join(os.path.join(basePath, "cache"), "test"), str(img_index)))
                        img_index += 1
                        f.clear()
                    label = visualize_model(model_ft)
                    get_code = requests.get(url=tly_Http + r"/modules/_checkin.php?captcha=" + label,
                                            cookies=self.session.cookies, proxies=proxies, verify=False)
                    result = re.search("'.*'", get_code.text).group()
                    logs.write(
                        "n" + self.email + "nt" + get_code.text + "-------------------------------"
                        + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
                    i = 0
                    if re.match(r"'获得了.*MB流量!'", result):
                        for _imgPath in os.listdir(os.path.join("cache", "test")):
                            shutil.move(os.path.join(os.path.join("cache", "test"), _imgPath),
                                        os.path.join(os.path.join(r"train", label[i]), _imgPath))
                        logs.write("数据已保存,get_code.text")
                        success = True
                    else:
                        logs.write("即将重试")

                    logs.write("脚本已完成")
            except:
                logs.write("脚本出错")



if __name__ == '__main__':
    dataMap = [
        MyData("username1", "possword", "???"),
        MyData("username2", "possword", "???")
    ]

    while True:
        for data in dataMap:
            try:
                data.run()
            except ConnectionError as e:
                pass
        time.sleep(24 * 60 * 60 + 10)
本网站的内容主要来自互联网上的各种资源,仅供参考和信息分享之用,不代表本网站拥有相关版权或知识产权。如您认为内容侵犯您的权益,请联系我们,我们将尽快采取行动,包括删除或更正。
AI教程

如何更自然地与AI交流:优化沟通效果的方法和技巧

2023-12-19 15:37:00

AI教程

PyTorch安装心得:解决显卡和CUDA兼容性问题

2023-12-19 15:39:00

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索