106 lines
3.9 KiB
Python
106 lines
3.9 KiB
Python
from requests_html import HTMLSession, HTML
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
import re
|
|
import time
|
|
import random
|
|
from datetime import date
|
|
from retrying import retry
|
|
|
|
from model.pls import PLS
|
|
from model.plw import PLW
|
|
from model.sd import SD
|
|
from model.klb import KLB
|
|
|
|
engine = create_engine("mysql+pymysql://root:123456@localhost/lottery?charset=utf8")
|
|
DbSession = sessionmaker(bind=engine)
|
|
db = DbSession()
|
|
|
|
session = HTMLSession()
|
|
pat1 = re.compile('开奖日期:(\d+)年(\d+)月(\d+)日.*')
|
|
pat2 = re.compile('开奖日期:(\d+)-(\d+)-(\d+)\s.*')
|
|
|
|
|
|
@retry(stop_max_attempt_number=3)
|
|
def _get_data(url, lottery_type):
|
|
if lottery_type.lower() == 'pls':
|
|
Model = PLS
|
|
elif lottery_type.lower() == 'sd':
|
|
Model = SD
|
|
elif lottery_type.lower() == 'plw':
|
|
Model = PLW
|
|
else:
|
|
Model = KLB
|
|
# 爬取数据
|
|
r = session.get(url)
|
|
table_list = r.html.find("table.kj_tablelist02", first=True)
|
|
issue = table_list.find("td.td_title01 span.span_left strong")[0].text
|
|
open_date = table_list.find("td.td_title01 span.span_right")[0].text
|
|
draw_code = table_list.find("div.ball_box01")[0].text.replace('\n', '')
|
|
m1 = re.match(pat1, open_date)
|
|
m2 = re.match(pat2, open_date)
|
|
# 查询数据库记录
|
|
result = db.query(Model).filter_by(draw_issue=issue).first()
|
|
if result is None:
|
|
record = Model()
|
|
record.draw_issue = issue
|
|
record.draw_code = draw_code
|
|
if m1 or m2 :
|
|
if m1:
|
|
record.draw_date = date(int(m1.group(1)), int(m1.group(2)), int(m1.group(3)))
|
|
else:
|
|
record.draw_date = date(int(m2.group(1)), int(m2.group(2)), int(m2.group(3)))
|
|
else:
|
|
raise Exception(f"issue:{issue}数据写入失败。。。")
|
|
# 如果是排列3和3D
|
|
if isinstance(record, PLS) or isinstance(record, SD):
|
|
record.hundred = draw_code[0]
|
|
record.ten = draw_code[1]
|
|
record.one = draw_code[2]
|
|
record.code_small = len(list(filter(lambda x: True if int(x) < 5 else False, draw_code)))
|
|
record.code_big = len(list(filter(lambda x: True if int(x) >= 5 else False, draw_code)))
|
|
record.code_single = len(list(filter(lambda x: True if int(x) % 2 == 1 else False, draw_code)))
|
|
record.code_double = len(list(filter(lambda x: True if int(x) % 2 == 0 else False, draw_code)))
|
|
record.draw_code = draw_code
|
|
record.sum_num = sum(map(int, draw_code))
|
|
record.sum_hundred_one = int(draw_code[2]) + int(draw_code[0])
|
|
record.sum_hundred_ten = int(draw_code[2]) + int(record.draw_code[1])
|
|
record.sum_ten_one = int(record.draw_code[1]) + int(record.draw_code[0])
|
|
if len(set(draw_code)) == 2:
|
|
record.group_type = 3
|
|
elif len(set(draw_code)) == 3:
|
|
record.group_type = 6
|
|
else:
|
|
record.group_type = 1
|
|
db.add(record)
|
|
db.commit()
|
|
print(f"issue:{issue}数据写入完成。。。")
|
|
else:
|
|
print(f'issue:{issue}已经存在')
|
|
|
|
def get_data(url, lottery_type):
|
|
try:
|
|
_get_data(url, lottery_type)
|
|
except Exception as e:
|
|
print(e)
|
|
print('异常出错重试后,依然报错')
|
|
raise e
|
|
|
|
def main(basic_url, lottery_type):
|
|
"""爬取相关数据"""
|
|
r = session.get(basic_url)
|
|
select_list = list(reversed(r.html.find('div.kjxq_box02_title_right span div a')))
|
|
for item in select_list:
|
|
html = HTML(html=item.html)
|
|
url = html.find('a', first=True).attrs['href']
|
|
try:
|
|
get_data(url, lottery_type)
|
|
except Exception as e:
|
|
print(e)
|
|
continue
|
|
|
|
if __name__ == '__main__':
|
|
# https://kaijiang.500.com/shtml/pls/04001.shtml
|
|
# https://kaijiang.500.com/shtml/sd/04001.shtml
|
|
basic_url = "https://kaijiang.500.com/shtml/plw/04001.shtml"
|
|
main(basic_url, lottery_type='plw') |