Files
balises/balises.py

185 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python3
from argparse import ArgumentParser
import datetime as dt
import bs4
import locale
import requests
import os
import urllib.parse
from bottle import hook, request, route, run, view
from config import conf
import models
from models import Song, AirCast, DoesNotExist
# ----------------------------------------------------------------------
# Util functions
def http_get(url):
response = requests.get(url);
if response.status_code == 200:
pass
else:
print('Uh, oh, unable to fetch', url)
print('Http status code:', response.status_code)
raise Error('Download error')
return response
# ----------------------------------------------------------------------
# Get song informations
last_ten_url = 'https://radiobalises.com/Play-list/last10.html'
def get_last_ten():
response = http_get(last_ten_url)
soup = bs4.BeautifulSoup(response.content, 'html5lib')
dates = soup.select('p.rldj-cell span.post-date')
for elem in dates:
song_dt = dt.datetime.fromisoformat(elem.text)
artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)]
song, _ = Song.get_or_create(artist=artist, title=title)
# get_or_create does not play nice with the unique constraint on the date
# so we use a simple try/except instead
try:
aircast = AirCast.get(date=song_dt, song=song)
except DoesNotExist:
aircast = AirCast.create(date=song_dt, song=song)
line_template = '{:<10} | {:<25} {:<40}'
print(line_template.format(str(song_dt), artist, title))
# ----------------------------------------------------------------------
# Search song
def search_songs_between(start_dt, end_dt):
query = AirCast\
.select()\
.order_by(AirCast.date)\
.where(AirCast.date.between(start_dt, end_dt))
return [x for x in query]
def search_song(day, hour):
# query must be a datetime
# we search the songs withing the hour specified
start_dt = dt.datetime.combine(day, dt.time(hour=hour))
delta = dt.timedelta(hours=1)
return search_songs_between(start_dt, start_dt + delta)
def print_aircast(x):
line_template = '{:<10} | {:<25} {:<40}'
print(line_template.format(str(x.date), x.song.artist, x.song.title))
# ----------------------------------------------------------------------
# Web application
#
# Very simple, just a page with an input for a date/time, a query button and a
# list of results
@route('/', method=['GET', 'POST'])
@view('search_results')
def results_page():
try:
day = dt.date.fromisoformat(request.forms.date)
except ValueError:
day = dt.datetime.now().date()
try:
hour = int(request.forms.time.split(':')[0])
except ValueError:
hour = dt.datetime.now().hour
delta = int(request.forms.timedelta or 0)
hour = hour + delta
time = '{:0>2}:00'.format(hour)
results = [
{ 'date': x.date,
'song': x.song,
'youtube_url': 'https://www.youtube.com/results?search_query=' +
urllib.parse.quote_plus(x.song.artist + ' ' + x.song.title)
}
for x in search_song(day, hour)
]
return dict(results=results,
date=day,
time=time,
start_time=time,
end_time = '{:0>2}:00'.format(hour+1))
@hook('before_request')
def set_locale():
"""Set the locale for all categories to the first lang in Accept-Language
header. Default to fr_FR.UTF-8
"""
accept_language = request.get_header('Accept-Language', 'fr-FR')
first_lang = accept_language.split(';')[0].split(',')[0]
lang = first_lang.translate(str.maketrans('-', '_')) + '.UTF-8'
locale.setlocale(locale.LC_ALL, lang)
@hook('before_request')
def connect_to_db():
models.db.connect()
@hook('after_request')
def close_db_connection():
models.db.close()
# ----------------------------------------------------------------------
# Argument parsing
# use a decorator to simplify argparse usage, as suggested by
# https://mike.depalatis.net/blog/simplifying-argparse.html
cli = ArgumentParser(description='Balises')
subparsers = cli.add_subparsers(dest="subcommand")
def subcommand(args=[], parent=subparsers):
def decorator(func):
parser = parent.add_parser(func.__name__, description=func.__doc__)
for arg in args:
parser.add_argument(*arg[0], **arg[1])
parser.set_defaults(func=func)
return decorator
def argument(*name_or_flags, **kwargs):
return ([*name_or_flags], kwargs)
@subcommand([argument('query', help='Search query')])
def search(args):
results = search_song(datetime.fromisoformat(args.query))
for res in results:
print_aircast(res)
@subcommand()
def update(args):
get_last_ten()
@subcommand()
def serve(args):
run(host=conf['server'].get('host', 'localhost'),
port=conf['server'].getint('port', 9980),
debug=conf['server'].getboolean('debug', False),
reloader=conf['server'].getboolean('reloader', False))
# ----------------------------------------------------------------------
# Main
def main():
args = cli.parse_args()
if args.subcommand is None:
cli.print_help()
else:
args.func(args)
if __name__ == '__main__':
main()