schedule_tasks/lottery/__init__.py

122 lines
5.4 KiB
Python

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import re
from datetime import date, datetime
from lottery.model.pls import PLS
from lottery.model.plw import PLW
from lottery.model.sd import SD
from lottery.model.klb import KLB
from lottery.model.football_sfc import FootballSfc
class Lottery(object):
"""
Lottery Base Object
"""
def __init__(self, lottery_type='pls'):
self._pat1 = re.compile('开奖日期:(\d+)年(\d+)月(\d+)日.*')
self._pat2 = re.compile('开奖日期:(\d+)-(\d+)-(\d+)\s.*')
self._lottery_type = lottery_type
self.db = self._get_db_session()
if lottery_type.lower() == 'pls':
self._Model = PLS
elif lottery_type.lower() == 'sd':
self._Model = SD
elif lottery_type.lower() == 'plw':
self._Model = PLW
elif lottery_type.lower() == 'sfc':
self._Model = FootballSfc
elif lottery_type.lower() == 'klb':
self._Model = KLB
else:
raise Exception("未知的lottery_type")
def _get_db_session(self):
_engine = create_engine("mysql+pymysql://root:Chenweijia113!@172.17.0.1/lottery?charset=utf8", pool_pre_ping=True, pool_recycle=3600)
_DbSession = sessionmaker(bind=_engine)
return _DbSession()
def __enter__(self):
return self
def __exit__(self, type, value, trace):
print("close the db session...")
self.db.close()
def get_last_id_by_sum_num(self, sum_num):
"""
根据总和获取id
"""
last_id = self.db.query(self._Model.id).filter_by(sum_num=sum_num).order_by(self._Model.id.desc()).first()
if last_id is not None:
return last_id[0]
return None
def insert(self, draw_issue, draw_date, draw_code, **kwargs):
"""
插入3d、排列三、排列五
"""
result = self.db.query(self._Model).filter_by(draw_issue=draw_issue).first()
if result is None:
record = self._Model()
record.draw_issue = draw_issue
record.draw_code = draw_code
record.created_at = datetime.now()
m1 = re.match(self._pat1, draw_date)
m2 = re.match(self._pat2, draw_date)
if m1 or m2 :
if m1:
record.draw_date = date(int(m1.group(1)), int(m1.group(2)), int(m1.group(3)))
else:
record.draw_date = date(int(m2.group(1)), int(m2.group(2)), int(m2.group(3)))
else:
raise Exception(f"issue:{draw_issue}数据写入失败。。。")
# 如果是排列五、排列五和3D
if isinstance(record, (PLS, SD, PLW)):
record.sum_num = sum(map(int, draw_code))
record.code_small = len(list(filter(lambda x: True if int(x) < 5 else False, draw_code)))
record.code_big = len(list(filter(lambda x: True if int(x) >= 5 else False, draw_code)))
record.code_single = len(list(filter(lambda x: True if int(x) % 2 == 1 else False, draw_code)))
record.code_double = len(list(filter(lambda x: True if int(x) % 2 == 0 else False, draw_code)))
if isinstance(record, (PLS, SD)):
record.hundred = draw_code[0]
record.ten = draw_code[1]
record.one = draw_code[2]
record.sum_hundred_one = int(draw_code[2]) + int(draw_code[0])
record.sum_hundred_ten = int(draw_code[2]) + int(record.draw_code[1])
record.sum_ten_one = int(record.draw_code[1]) + int(record.draw_code[0])
if len(set(draw_code)) == 2:
record.group_type = 3
elif len(set(draw_code)) == 3:
record.group_type = 6
else:
record.group_type = 1
elif isinstance(record, PLW):
record.ten_thousand = draw_code[0]
record.thousand = draw_code[1]
record.hundred = draw_code[2]
record.ten = draw_code[3]
record.one = draw_code[4]
record.sum_hundred_one = int(draw_code[2]) + int(draw_code[4])
record.sum_hundred_ten = int(draw_code[2]) + int(draw_code[3])
record.sum_ten_one = int(draw_code[3]) + int(draw_code[4])
record.sum_first_3 = int(draw_code[0]) + int(draw_code[1]) + int(draw_code[2])
record.sum_next_3 = int(draw_code[2]) + int(draw_code[3]) + int(draw_code[4])
elif isinstance(record, FootballSfc):
record.open_result = draw_code
record.sale_money = kwargs["sfc_sale"]
record.r9_sale_money = kwargs["rj_sale"]
record.award_1_money = kwargs["award_1_money"]
record.award_1_num = kwargs["award_1_num"]
record.award_2_money = kwargs["award_2_money"]
record.award_2_num = kwargs["award_2_num"]
record.r9_award_money = kwargs["r9_award_money"]
record.r9_award_num = kwargs["r9_award_num"]
self.db.add(record)
self.db.commit()
return record.id
else:
print(f"{draw_issue}开奖结果已经写入数据库中...")
return None