py_scripts/get_lottery_data.py

106 lines
3.9 KiB
Python

from requests_html import HTMLSession, HTML
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import re
import time
import random
from datetime import date
from retrying import retry
from model.pls import PLS
from model.plw import PLW
from model.sd import SD
from model.klb import KLB
engine = create_engine("mysql+pymysql://root:123456@localhost/lottery?charset=utf8")
DbSession = sessionmaker(bind=engine)
db = DbSession()
session = HTMLSession()
pat1 = re.compile('开奖日期:(\d+)年(\d+)月(\d+)日.*')
pat2 = re.compile('开奖日期:(\d+)-(\d+)-(\d+)\s.*')
@retry(stop_max_attempt_number=3)
def _get_data(url, lottery_type):
if lottery_type.lower() == 'pls':
Model = PLS
elif lottery_type.lower() == 'sd':
Model = SD
elif lottery_type.lower() == 'plw':
Model = PLW
else:
Model = KLB
# 爬取数据
r = session.get(url)
table_list = r.html.find("table.kj_tablelist02", first=True)
issue = table_list.find("td.td_title01 span.span_left strong")[0].text
open_date = table_list.find("td.td_title01 span.span_right")[0].text
draw_code = table_list.find("div.ball_box01")[0].text.replace('\n', '')
m1 = re.match(pat1, open_date)
m2 = re.match(pat2, open_date)
# 查询数据库记录
result = db.query(Model).filter_by(draw_issue=issue).first()
if result is None:
record = Model()
record.draw_issue = issue
record.draw_code = draw_code
if m1 or m2 :
if m1:
record.draw_date = date(int(m1.group(1)), int(m1.group(2)), int(m1.group(3)))
else:
record.draw_date = date(int(m2.group(1)), int(m2.group(2)), int(m2.group(3)))
else:
raise Exception(f"issue:{issue}数据写入失败。。。")
# 如果是排列3和3D
if isinstance(record, PLS) or isinstance(record, SD):
record.hundred = draw_code[0]
record.ten = draw_code[1]
record.one = draw_code[2]
record.code_small = len(list(filter(lambda x: True if int(x) < 5 else False, draw_code)))
record.code_big = len(list(filter(lambda x: True if int(x) >= 5 else False, draw_code)))
record.code_single = len(list(filter(lambda x: True if int(x) % 2 == 1 else False, draw_code)))
record.code_double = len(list(filter(lambda x: True if int(x) % 2 == 0 else False, draw_code)))
record.draw_code = draw_code
record.sum_num = sum(map(int, draw_code))
record.sum_hundred_one = int(draw_code[2]) + int(draw_code[0])
record.sum_hundred_ten = int(draw_code[2]) + int(record.draw_code[1])
record.sum_ten_one = int(record.draw_code[1]) + int(record.draw_code[0])
if len(set(draw_code)) == 2:
record.group_type = 3
elif len(set(draw_code)) == 3:
record.group_type = 6
else:
record.group_type = 1
db.add(record)
db.commit()
print(f"issue:{issue}数据写入完成。。。")
else:
print(f'issue:{issue}已经存在')
def get_data(url, lottery_type):
try:
_get_data(url, lottery_type)
except Exception as e:
print(e)
print('异常出错重试后,依然报错')
raise e
def main(basic_url, lottery_type):
"""爬取相关数据"""
r = session.get(basic_url)
select_list = list(reversed(r.html.find('div.kjxq_box02_title_right span div a')))
for item in select_list:
html = HTML(html=item.html)
url = html.find('a', first=True).attrs['href']
try:
get_data(url, lottery_type)
except Exception as e:
print(e)
continue
if __name__ == '__main__':
# https://kaijiang.500.com/shtml/pls/04001.shtml
# https://kaijiang.500.com/shtml/sd/04001.shtml
basic_url = "https://kaijiang.500.com/shtml/plw/04001.shtml"
main(basic_url, lottery_type='plw')