Files
balises/balises.py
2021-05-29 00:52:25 +02:00

182 lines
5.0 KiB
Python
Executable File

#!/usr/bin/env python3
from argparse import ArgumentParser
from configparser import ConfigParser
from datetime import datetime, timedelta
import bs4
import requests
from peewee import *
from bottle import hook, request, route, run, view
# ------------------------------------------------------------
conf = ConfigParser()
conf.read('balises.ini')
db = MySQLDatabase(conf['mysql'].get('database'),
user=conf['mysql'].get('user'),
password=conf['mysql'].get('password'),
host=conf['mysql'].get('host', 'localhost'),
port=conf['mysql'].getint('port', 3306))
class BaseModel(Model):
class Meta:
database = db
class Song(BaseModel):
id = AutoField()
artist = CharField(default='')
title = CharField(default='')
class Meta:
indexes = (
(('artist', 'title'), True), # Unique on artist + title
)
class AirCast(BaseModel):
id = AutoField()
date = DateTimeField()
song = ForeignKeyField(Song, backref='dates')
# ----------------------------------------------------------------------
# Util functions
def http_get(url):
response = requests.get(url);
if response.status_code == 200:
pass
else:
print('Uh, oh, unable to fetch', url)
print('Http status code:', response.status_code)
raise Error('Download error')
return response
# ----------------------------------------------------------------------
# Get song informations
last_ten_url = 'https://radiobalises.com/Play-list/last10.html'
def get_last_ten():
response = http_get(last_ten_url)
soup = bs4.BeautifulSoup(response.content, 'html5lib')
dates = soup.select('p.rldj-cell span.post-date')
for elem in dates:
dt = datetime.fromisoformat(elem.text)
artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)]
song, _ = Song.get_or_create(artist=artist, title=title)
# get_or_create does not play nice with the unique constraint on the date
# so we use a simple try/except instead
try:
aircast = AirCast.get(date=dt, song=song)
except DoesNotExist:
aircast = AirCast.create(date=dt, song=song)
line_template = '{:<10} | {:<25} {:<40}'
print(line_template.format(str(dt), artist, title))
# ----------------------------------------------------------------------
# Search song
def search_song(query):
# query must be a datetime
delta = timedelta(minutes=30)
query = AirCast.select().order_by(AirCast.date).where(
(AirCast.date > query - delta) &
(AirCast.date < query + delta))
results = [x for x in query]
return results
def print_aircast(x):
line_template = '{:<10} | {:<25} {:<40}'
print(line_template.format(str(x.date), x.song.artist, x.song.title))
# ----------------------------------------------------------------------
# Web application
#
# Very simple, just a page with an input for a date/time, a query button and a
# list of results
@route('/', method='GET')
@view('search_form')
def main_page():
now = datetime.now()
date = '{}'.format(now.date())
time = '{}:{}'.format(now.hour, now.minute)
return dict(title='', date=date, time=time)
@route('/', method='POST')
@view('search_results')
def results_page():
date = request.forms.date
time = request.forms.time
dt = datetime.fromisoformat('{} {}'.format(date, time))
results = search_song(dt)
return dict(results=[x for x in results], date=date, time=time)
@hook('before_request')
def connect_to_db():
db.connect()
@hook('after_request')
def close_db_connection():
db.close()
# ----------------------------------------------------------------------
# Argument parsing
# use a decorator to simplify argparse usage, as suggested by
# https://mike.depalatis.net/blog/simplifying-argparse.html
cli = ArgumentParser(description='Balises')
subparsers = cli.add_subparsers(dest="subcommand")
def subcommand(args=[], parent=subparsers):
def decorator(func):
parser = parent.add_parser(func.__name__, description=func.__doc__)
for arg in args:
parser.add_argument(*arg[0], **arg[1])
parser.set_defaults(func=func)
return decorator
def argument(*name_or_flags, **kwargs):
return ([*name_or_flags], kwargs)
@subcommand([argument('query', help='Search query')])
def search(args):
results = search_song(datetime.fromisoformat(args.query))
for res in results:
print_aircast(res)
@subcommand()
def update(args):
get_last_ten()
@subcommand()
def serve(args):
run(host=conf['server'].get('host', 'localhost'),
port=conf['server'].getint('port', 9980),
debug=conf['server'].getboolean('debug', False))
# ----------------------------------------------------------------------
# Main
def main():
args = cli.parse_args()
if args.subcommand is None:
cli.print_help()
else:
args.func(args)
if __name__ == '__main__':
main()