191 lines
5.5 KiB
Python
Executable File
191 lines
5.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from argparse import ArgumentParser
|
|
import datetime as dt
|
|
|
|
import bs4
|
|
import locale
|
|
import requests
|
|
import os
|
|
import urllib.parse
|
|
|
|
from bottle import hook, request, route, run, static_file, view
|
|
|
|
from config import conf
|
|
import models
|
|
from models import Song, AirCast, DoesNotExist
|
|
|
|
project_dir = os.path.dirname(__file__)
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Util functions
|
|
|
|
def http_get(url):
|
|
response = requests.get(url);
|
|
if response.status_code == 200:
|
|
pass
|
|
else:
|
|
print('Uh, oh, unable to fetch', url)
|
|
print('Http status code:', response.status_code)
|
|
raise Error('Download error')
|
|
return response
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Get song informations
|
|
|
|
last_ten_url = 'https://radiobalises.com/Play-list/last10.html'
|
|
|
|
def get_last_ten():
|
|
response = http_get(last_ten_url)
|
|
soup = bs4.BeautifulSoup(response.content, 'html5lib')
|
|
dates = soup.select('p.rldj-cell span.post-date')
|
|
for elem in dates:
|
|
song_dt = dt.datetime.fromisoformat(elem.text)
|
|
artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)]
|
|
song, _ = Song.get_or_create(artist=artist, title=title)
|
|
# get_or_create does not play nice with the unique constraint on the date
|
|
# so we use a simple try/except instead
|
|
try:
|
|
aircast = AirCast.get(date=song_dt, song=song)
|
|
except DoesNotExist:
|
|
aircast = AirCast.create(date=song_dt, song=song)
|
|
|
|
line_template = '{:<10} | {:<25} {:<40}'
|
|
print(line_template.format(str(song_dt), artist, title))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Search song
|
|
|
|
def search_songs_between(start_dt, end_dt):
|
|
query = AirCast\
|
|
.select()\
|
|
.order_by(AirCast.date)\
|
|
.where(AirCast.date.between(start_dt, end_dt))
|
|
return [x for x in query]
|
|
|
|
def search_song(day, hour):
|
|
# query must be a datetime
|
|
# we search the songs withing the hour specified
|
|
start_dt = dt.datetime.combine(day, dt.time(hour=hour))
|
|
delta = dt.timedelta(hours=1)
|
|
return search_songs_between(start_dt, start_dt + delta)
|
|
|
|
def print_aircast(x):
|
|
line_template = '{:<10} | {:<25} {:<40}'
|
|
print(line_template.format(str(x.date), x.song.artist, x.song.title))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Web application
|
|
#
|
|
# Very simple, just a page with an input for a date/time, a query button and a
|
|
# list of results
|
|
|
|
@route('/', method=['GET', 'POST'])
|
|
@view('search_results')
|
|
def results_page():
|
|
try:
|
|
day = dt.date.fromisoformat(request.forms.date)
|
|
except ValueError:
|
|
day = dt.datetime.now().date()
|
|
|
|
try:
|
|
hour = int(request.forms.time.split(':')[0])
|
|
except ValueError:
|
|
hour = dt.datetime.now().hour
|
|
|
|
delta = int(request.forms.timedelta or 0)
|
|
hour = hour + delta
|
|
time = '{:0>2}:00'.format(hour)
|
|
results = [
|
|
{ 'date': x.date,
|
|
'song': x.song,
|
|
'youtube_url': 'https://www.youtube.com/results?search_query=' +
|
|
urllib.parse.quote_plus(x.song.artist + ' ' + x.song.title)
|
|
}
|
|
for x in search_song(day, hour)
|
|
]
|
|
return dict(results=results,
|
|
date=day,
|
|
time=time,
|
|
start_time=time,
|
|
end_time = '{:0>2}:00'.format(hour+1))
|
|
|
|
@route('/static/<filename>')
|
|
def serve_static_file(filename):
|
|
static_root = os.path.join(project_dir, 'static')
|
|
return static_file(filename, root=static_root)
|
|
|
|
@hook('before_request')
|
|
def set_locale():
|
|
"""Set the locale for all categories to the first lang in Accept-Language
|
|
header. Default to fr_FR.UTF-8
|
|
"""
|
|
accept_language = request.get_header('Accept-Language', 'fr-FR')
|
|
first_lang = accept_language.split(';')[0].split(',')[0]
|
|
lang = first_lang.translate(str.maketrans('-', '_')) + '.UTF-8'
|
|
locale.setlocale(locale.LC_ALL, lang)
|
|
|
|
@hook('before_request')
|
|
def connect_to_db():
|
|
models.db.connect()
|
|
|
|
@hook('after_request')
|
|
def close_db_connection():
|
|
models.db.close()
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Argument parsing
|
|
# use a decorator to simplify argparse usage, as suggested by
|
|
# https://mike.depalatis.net/blog/simplifying-argparse.html
|
|
|
|
cli = ArgumentParser(description='Balises')
|
|
subparsers = cli.add_subparsers(dest="subcommand")
|
|
|
|
def subcommand(args=[], parent=subparsers):
|
|
def decorator(func):
|
|
parser = parent.add_parser(func.__name__, description=func.__doc__)
|
|
for arg in args:
|
|
parser.add_argument(*arg[0], **arg[1])
|
|
parser.set_defaults(func=func)
|
|
return decorator
|
|
|
|
def argument(*name_or_flags, **kwargs):
|
|
return ([*name_or_flags], kwargs)
|
|
|
|
|
|
@subcommand([argument('query', help='Search query')])
|
|
def search(args):
|
|
results = search_song(datetime.fromisoformat(args.query))
|
|
for res in results:
|
|
print_aircast(res)
|
|
|
|
|
|
@subcommand()
|
|
def update(args):
|
|
get_last_ten()
|
|
|
|
|
|
@subcommand()
|
|
def serve(args):
|
|
run(host=conf['server'].get('host', 'localhost'),
|
|
port=conf['server'].getint('port', 9980),
|
|
debug=conf['server'].getboolean('debug', False),
|
|
reloader=conf['server'].getboolean('reloader', False))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Main
|
|
|
|
def main():
|
|
args = cli.parse_args()
|
|
if args.subcommand is None:
|
|
cli.print_help()
|
|
else:
|
|
args.func(args)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|