py_scripts/tasks/cron_tasks.py

55 lines
1.6 KiB
Python

import sys
sys.path.append("/root/documents/py_scripts")
import time
from schedule import every, repeat, run_pending
from requests_html import HTML, HTMLSession
from lottery import Lottery
session = HTMLSession()
def get_data_job(lottery_type='pls'):
"""
:param lottery_type
"""
url = f"https://kaijiang.500.com/{lottery_type}.shtml"
r = session.get(url)
if lottery_type == 'pls':
issue_elem = r.html.search('排列3 第 {}')
elif lottery_type == 'plw':
issue_elem = r.html.search('排列5 第 {}')
else:
issue_elem = r.html.search('福彩3D 第 {}')
draw_date = r.html.xpath('//td[@class="td_title01"]/span[@class="span_right"]/text()', first=True)
result = r.html.xpath('//li[@class="ball_orange"]/text()')
if issue_elem is not None and draw_date is not None and result is not None and len(result) != 0 :
draw_issue = HTML(html=issue_elem[0]).text
draw_code = "".join(result)
print(f"result:{draw_code}")
with Lottery(lottery_type=lottery_type) as lottery:
last_id = lottery.insert(draw_issue, draw_date, draw_code)
if last_id:
print(f"issue:{draw_issue}数据写入完成。。。")
else:
print(f'issue:{draw_issue}已经存在')
@repeat(every().day.at("18:01"))
def pls_job():
get_data_job(lottery_type='pls')
@repeat(every().day.at("18:02"))
def plw_job():
get_data_job(lottery_type='plw')
@repeat(every().day.at("18:03"))
def sd_job():
get_data_job(lottery_type='sd')
while True:
run_pending()
time.sleep(1)