Files
balises/balises.py

207 lines
5.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from argparse import ArgumentParser
from configparser import ConfigParser
import datetime as dt
import bs4
import requests
import os
import urllib.parse
from peewee import *
from bottle import hook, request, route, run, view
# Set the locale for all categories to the users default setting (eg. LANG
# environment variable)
import locale
locale.setlocale(locale.LC_ALL, '')
# ------------------------------------------------------------
conf = ConfigParser()
conf.read(os.path.dirname(__file__) + '/balises.ini')
db = MySQLDatabase(conf['mysql'].get('database'),
user=conf['mysql'].get('user'),
password=conf['mysql'].get('password'),
host=conf['mysql'].get('host', 'localhost'),
port=conf['mysql'].getint('port', 3306))
class BaseModel(Model):
class Meta:
database = db
class Song(BaseModel):
id = AutoField()
artist = CharField(default='')
title = CharField(default='')
class Meta:
indexes = (
(('artist', 'title'), True), # Unique on artist + title
)
class AirCast(BaseModel):
id = AutoField()
date = DateTimeField()
song = ForeignKeyField(Song, backref='dates')
# ----------------------------------------------------------------------
# Util functions
def http_get(url):
response = requests.get(url);
if response.status_code == 200:
pass
else:
print('Uh, oh, unable to fetch', url)
print('Http status code:', response.status_code)
raise Error('Download error')
return response
# ----------------------------------------------------------------------
# Get song informations
last_ten_url = 'https://radiobalises.com/Play-list/last10.html'
def get_last_ten():
response = http_get(last_ten_url)
soup = bs4.BeautifulSoup(response.content, 'html5lib')
dates = soup.select('p.rldj-cell span.post-date')
for elem in dates:
song_dt = dt.datetime.fromisoformat(elem.text)
artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)]
song, _ = Song.get_or_create(artist=artist, title=title)
# get_or_create does not play nice with the unique constraint on the date
# so we use a simple try/except instead
try:
aircast = AirCast.get(date=song_dt, song=song)
except DoesNotExist:
aircast = AirCast.create(date=song_dt, song=song)
line_template = '{:<10} | {:<25} {:<40}'
print(line_template.format(str(song_dt), artist, title))
# ----------------------------------------------------------------------
# Search song
def search_songs_between(start_dt, end_dt):
query = AirCast\
.select()\
.order_by(AirCast.date)\
.where(AirCast.date.between(start_dt, end_dt))
return [x for x in query]
def search_song(day, hour):
# query must be a datetime
# we search the songs withing the hour specified
start_dt = dt.datetime.combine(day, dt.time(hour=hour))
delta = dt.timedelta(hours=1)
return search_songs_between(start_dt, start_dt + delta)
def print_aircast(x):
line_template = '{:<10} | {:<25} {:<40}'
print(line_template.format(str(x.date), x.song.artist, x.song.title))
# ----------------------------------------------------------------------
# Web application
#
# Very simple, just a page with an input for a date/time, a query button and a
# list of results
@route('/', method=['GET', 'POST'])
@view('search_results')
def results_page():
try:
day = dt.date.fromisoformat(request.forms.date)
except ValueError:
day = dt.datetime.now().date()
try:
hour = int(request.forms.time.split(':')[0])
except ValueError:
hour = dt.datetime.now().hour
delta = int(request.forms.timedelta or 0)
hour = hour + delta
time = '{:0>2}:00'.format(hour)
results = [
{ 'date': x.date,
'song': x.song,
'youtube_url': 'https://www.youtube.com/results?search_query=' +
urllib.parse.quote_plus(x.song.artist + ' ' + x.song.title)
}
for x in search_song(day, hour)
]
return dict(results=results,
date=day,
time=time,
start_time=time,
end_time = '{:0>2}:00'.format(hour+1))
@hook('before_request')
def connect_to_db():
db.connect()
@hook('after_request')
def close_db_connection():
db.close()
# ----------------------------------------------------------------------
# Argument parsing
# use a decorator to simplify argparse usage, as suggested by
# https://mike.depalatis.net/blog/simplifying-argparse.html
cli = ArgumentParser(description='Balises')
subparsers = cli.add_subparsers(dest="subcommand")
def subcommand(args=[], parent=subparsers):
def decorator(func):
parser = parent.add_parser(func.__name__, description=func.__doc__)
for arg in args:
parser.add_argument(*arg[0], **arg[1])
parser.set_defaults(func=func)
return decorator
def argument(*name_or_flags, **kwargs):
return ([*name_or_flags], kwargs)
@subcommand([argument('query', help='Search query')])
def search(args):
results = search_song(datetime.fromisoformat(args.query))
for res in results:
print_aircast(res)
@subcommand()
def update(args):
get_last_ten()
@subcommand()
def serve(args):
run(host=conf['server'].get('host', 'localhost'),
port=conf['server'].getint('port', 9980),
debug=conf['server'].getboolean('debug', False),
reloader=conf['server'].getboolean('reloader', False))
# ----------------------------------------------------------------------
# Main
def main():
args = cli.parse_args()
if args.subcommand is None:
cli.print_help()
else:
args.func(args)
if __name__ == '__main__':
main()