#!/usr/bin/env python3 from argparse import ArgumentParser from configparser import ConfigParser from datetime import datetime, timedelta import bs4 import requests import os from peewee import * from bottle import hook, request, route, run, view # Set the locale for all categories to the user’s default setting (eg. LANG # environment variable) import locale locale.setlocale(locale.LC_ALL, '') # ------------------------------------------------------------ conf = ConfigParser() conf.read(os.path.dirname(__file__) + '/balises.ini') db = MySQLDatabase(conf['mysql'].get('database'), user=conf['mysql'].get('user'), password=conf['mysql'].get('password'), host=conf['mysql'].get('host', 'localhost'), port=conf['mysql'].getint('port', 3306)) class BaseModel(Model): class Meta: database = db class Song(BaseModel): id = AutoField() artist = CharField(default='') title = CharField(default='') class Meta: indexes = ( (('artist', 'title'), True), # Unique on artist + title ) class AirCast(BaseModel): id = AutoField() date = DateTimeField() song = ForeignKeyField(Song, backref='dates') # ---------------------------------------------------------------------- # Util functions def http_get(url): response = requests.get(url); if response.status_code == 200: pass else: print('Uh, oh, unable to fetch', url) print('Http status code:', response.status_code) raise Error('Download error') return response # ---------------------------------------------------------------------- # Get song informations last_ten_url = 'https://radiobalises.com/Play-list/last10.html' def get_last_ten(): response = http_get(last_ten_url) soup = bs4.BeautifulSoup(response.content, 'html5lib') dates = soup.select('p.rldj-cell span.post-date') for elem in dates: dt = datetime.fromisoformat(elem.text) artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)] song, _ = Song.get_or_create(artist=artist, title=title) # get_or_create does not play nice with the unique constraint on the date # so we use a simple try/except instead try: aircast = AirCast.get(date=dt, song=song) except DoesNotExist: aircast = AirCast.create(date=dt, song=song) line_template = '{:<10} | {:<25} {:<40}' print(line_template.format(str(dt), artist, title)) # ---------------------------------------------------------------------- # Search song def search_song(query): # query must be a datetime delta = timedelta(minutes=30) query = AirCast.select().order_by(AirCast.date).where( (AirCast.date > query - delta) & (AirCast.date < query + delta)) results = [x for x in query] return results def print_aircast(x): line_template = '{:<10} | {:<25} {:<40}' print(line_template.format(str(x.date), x.song.artist, x.song.title)) # ---------------------------------------------------------------------- # Web application # # Very simple, just a page with an input for a date/time, a query button and a # list of results @route('/', method='GET') @view('search_form') def main_page(): now = datetime.now() date = '{}'.format(now.date()) time = '{}:{}'.format(now.hour, now.minute) return dict(title='', date=date, time=time) @route('/', method='POST') @view('search_results') def results_page(): date = request.forms.date time = request.forms.time dt = datetime.fromisoformat('{} {}'.format(date, time)) results = search_song(dt) return dict(results=[x for x in results], date=date, time=time) @hook('before_request') def connect_to_db(): db.connect() @hook('after_request') def close_db_connection(): db.close() # ---------------------------------------------------------------------- # Argument parsing # use a decorator to simplify argparse usage, as suggested by # https://mike.depalatis.net/blog/simplifying-argparse.html cli = ArgumentParser(description='Balises') subparsers = cli.add_subparsers(dest="subcommand") def subcommand(args=[], parent=subparsers): def decorator(func): parser = parent.add_parser(func.__name__, description=func.__doc__) for arg in args: parser.add_argument(*arg[0], **arg[1]) parser.set_defaults(func=func) return decorator def argument(*name_or_flags, **kwargs): return ([*name_or_flags], kwargs) @subcommand([argument('query', help='Search query')]) def search(args): results = search_song(datetime.fromisoformat(args.query)) for res in results: print_aircast(res) @subcommand() def update(args): get_last_ten() @subcommand() def serve(args): run(host=conf['server'].get('host', 'localhost'), port=conf['server'].getint('port', 9980), debug=conf['server'].getboolean('debug', False)) # ---------------------------------------------------------------------- # Main def main(): args = cli.parse_args() if args.subcommand is None: cli.print_help() else: args.func(args) if __name__ == '__main__': main()