183 lines
5.0 KiB
Python
Executable File
183 lines
5.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from argparse import ArgumentParser
|
|
from configparser import ConfigParser
|
|
from datetime import datetime, timedelta
|
|
|
|
import bs4
|
|
import requests
|
|
import os
|
|
|
|
from peewee import *
|
|
from bottle import hook, request, route, run, view
|
|
|
|
# ------------------------------------------------------------
|
|
conf = ConfigParser()
|
|
conf.read(os.path.dirname(__file__) + '/balises.ini')
|
|
|
|
db = MySQLDatabase(conf['mysql'].get('database'),
|
|
user=conf['mysql'].get('user'),
|
|
password=conf['mysql'].get('password'),
|
|
host=conf['mysql'].get('host', 'localhost'),
|
|
port=conf['mysql'].getint('port', 3306))
|
|
|
|
class BaseModel(Model):
|
|
class Meta:
|
|
database = db
|
|
|
|
class Song(BaseModel):
|
|
id = AutoField()
|
|
artist = CharField(default='')
|
|
title = CharField(default='')
|
|
|
|
class Meta:
|
|
indexes = (
|
|
(('artist', 'title'), True), # Unique on artist + title
|
|
)
|
|
|
|
class AirCast(BaseModel):
|
|
id = AutoField()
|
|
date = DateTimeField()
|
|
song = ForeignKeyField(Song, backref='dates')
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Util functions
|
|
|
|
def http_get(url):
|
|
response = requests.get(url);
|
|
if response.status_code == 200:
|
|
pass
|
|
else:
|
|
print('Uh, oh, unable to fetch', url)
|
|
print('Http status code:', response.status_code)
|
|
raise Error('Download error')
|
|
return response
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Get song informations
|
|
|
|
last_ten_url = 'https://radiobalises.com/Play-list/last10.html'
|
|
|
|
def get_last_ten():
|
|
response = http_get(last_ten_url)
|
|
soup = bs4.BeautifulSoup(response.content, 'html5lib')
|
|
dates = soup.select('p.rldj-cell span.post-date')
|
|
for elem in dates:
|
|
dt = datetime.fromisoformat(elem.text)
|
|
artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)]
|
|
song, _ = Song.get_or_create(artist=artist, title=title)
|
|
# get_or_create does not play nice with the unique constraint on the date
|
|
# so we use a simple try/except instead
|
|
try:
|
|
aircast = AirCast.get(date=dt, song=song)
|
|
except DoesNotExist:
|
|
aircast = AirCast.create(date=dt, song=song)
|
|
|
|
line_template = '{:<10} | {:<25} {:<40}'
|
|
print(line_template.format(str(dt), artist, title))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Search song
|
|
|
|
def search_song(query):
|
|
# query must be a datetime
|
|
delta = timedelta(minutes=30)
|
|
|
|
query = AirCast.select().order_by(AirCast.date).where(
|
|
(AirCast.date > query - delta) &
|
|
(AirCast.date < query + delta))
|
|
results = [x for x in query]
|
|
|
|
return results
|
|
|
|
def print_aircast(x):
|
|
line_template = '{:<10} | {:<25} {:<40}'
|
|
print(line_template.format(str(x.date), x.song.artist, x.song.title))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Web application
|
|
#
|
|
# Very simple, just a page with an input for a date/time, a query button and a
|
|
# list of results
|
|
|
|
@route('/', method='GET')
|
|
@view('search_form')
|
|
def main_page():
|
|
now = datetime.now()
|
|
date = '{}'.format(now.date())
|
|
time = '{}:{}'.format(now.hour, now.minute)
|
|
return dict(title='', date=date, time=time)
|
|
|
|
@route('/', method='POST')
|
|
@view('search_results')
|
|
def results_page():
|
|
date = request.forms.date
|
|
time = request.forms.time
|
|
dt = datetime.fromisoformat('{} {}'.format(date, time))
|
|
results = search_song(dt)
|
|
return dict(results=[x for x in results], date=date, time=time)
|
|
|
|
@hook('before_request')
|
|
def connect_to_db():
|
|
db.connect()
|
|
|
|
@hook('after_request')
|
|
def close_db_connection():
|
|
db.close()
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Argument parsing
|
|
# use a decorator to simplify argparse usage, as suggested by
|
|
# https://mike.depalatis.net/blog/simplifying-argparse.html
|
|
|
|
cli = ArgumentParser(description='Balises')
|
|
subparsers = cli.add_subparsers(dest="subcommand")
|
|
|
|
def subcommand(args=[], parent=subparsers):
|
|
def decorator(func):
|
|
parser = parent.add_parser(func.__name__, description=func.__doc__)
|
|
for arg in args:
|
|
parser.add_argument(*arg[0], **arg[1])
|
|
parser.set_defaults(func=func)
|
|
return decorator
|
|
|
|
def argument(*name_or_flags, **kwargs):
|
|
return ([*name_or_flags], kwargs)
|
|
|
|
|
|
@subcommand([argument('query', help='Search query')])
|
|
def search(args):
|
|
results = search_song(datetime.fromisoformat(args.query))
|
|
for res in results:
|
|
print_aircast(res)
|
|
|
|
|
|
@subcommand()
|
|
def update(args):
|
|
get_last_ten()
|
|
|
|
|
|
@subcommand()
|
|
def serve(args):
|
|
run(host=conf['server'].get('host', 'localhost'),
|
|
port=conf['server'].getint('port', 9980),
|
|
debug=conf['server'].getboolean('debug', False))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Main
|
|
|
|
def main():
|
|
args = cli.parse_args()
|
|
if args.subcommand is None:
|
|
cli.print_help()
|
|
else:
|
|
args.func(args)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|