#!/usr/bin/env python3 from argparse import ArgumentParser import datetime as dt import bs4 import locale import requests import os import urllib.parse from bottle import hook, request, route, run, static_file, view from config import conf import models from models import Song, AirCast, DoesNotExist project_dir = os.path.dirname(__file__) # ---------------------------------------------------------------------- # Util functions def http_get(url): response = requests.get(url); if response.status_code == 200: pass else: print('Uh, oh, unable to fetch', url) print('Http status code:', response.status_code) raise Error('Download error') return response # ---------------------------------------------------------------------- # Get song informations last_ten_url = 'https://radiobalises.com/Play-list/last10.html' def get_last_ten(): response = http_get(last_ten_url) soup = bs4.BeautifulSoup(response.content, 'html5lib') dates = soup.select('p.rldj-cell span.post-date') for elem in dates: song_dt = dt.datetime.fromisoformat(elem.text) artist, title = [x.strip() for x in elem.previous.previous.split(' - ', 1)] song, _ = Song.get_or_create(artist=artist, title=title) # get_or_create does not play nice with the unique constraint on the date # so we use a simple try/except instead try: aircast = AirCast.get(date=song_dt, song=song) except DoesNotExist: aircast = AirCast.create(date=song_dt, song=song) line_template = '{:<10} | {:<25} {:<40}' print(line_template.format(str(song_dt), artist, title)) # ---------------------------------------------------------------------- # Search song def search_songs_between(start_dt, end_dt): query = AirCast\ .select()\ .order_by(AirCast.date)\ .where(AirCast.date.between(start_dt, end_dt)) return [x for x in query] def search_song(day, hour): # query must be a datetime # we search the songs withing the hour specified start_dt = dt.datetime.combine(day, dt.time(hour=hour)) delta = dt.timedelta(hours=1) return search_songs_between(start_dt, start_dt + delta) def print_aircast(x): line_template = '{:<10} | {:<25} {:<40}' print(line_template.format(str(x.date), x.song.artist, x.song.title)) # ---------------------------------------------------------------------- # Web application # # Very simple, just a page with an input for a date/time, a query button and a # list of results @route('/', method=['GET', 'POST']) @view('search_results') def results_page(): try: day = dt.date.fromisoformat(request.forms.date) except ValueError: day = dt.datetime.now().date() try: hour = int(request.forms.time.split(':')[0]) except ValueError: hour = dt.datetime.now().hour delta = int(request.forms.timedelta or 0) hour = hour + delta time = '{:0>2}:00'.format(hour) results = [ { 'date': x.date, 'song': x.song, 'youtube_url': 'https://www.youtube.com/results?search_query=' + urllib.parse.quote_plus(x.song.artist + ' ' + x.song.title) } for x in search_song(day, hour) ] return dict(results=results, date=day, time=time, start_time=time, end_time = '{:0>2}:00'.format(hour+1)) @route('/static/') def serve_static_file(filename): static_root = os.path.join(project_dir, 'static') return static_file(filename, root=static_root) @hook('before_request') def set_locale(): """Set the locale for all categories to the first lang in Accept-Language header. Default to fr_FR.UTF-8 """ accept_language = request.get_header('Accept-Language', 'fr-FR') first_lang = accept_language.split(';')[0].split(',')[0] lang = first_lang.translate(str.maketrans('-', '_')) + '.UTF-8' locale.setlocale(locale.LC_ALL, lang) @hook('before_request') def connect_to_db(): models.db.connect() @hook('after_request') def close_db_connection(): models.db.close() # ---------------------------------------------------------------------- # Argument parsing # use a decorator to simplify argparse usage, as suggested by # https://mike.depalatis.net/blog/simplifying-argparse.html cli = ArgumentParser(description='Balises') subparsers = cli.add_subparsers(dest="subcommand") def subcommand(args=[], parent=subparsers): def decorator(func): parser = parent.add_parser(func.__name__, description=func.__doc__) for arg in args: parser.add_argument(*arg[0], **arg[1]) parser.set_defaults(func=func) return decorator def argument(*name_or_flags, **kwargs): return ([*name_or_flags], kwargs) @subcommand([argument('query', help='Search query')]) def search(args): results = search_song(datetime.fromisoformat(args.query)) for res in results: print_aircast(res) @subcommand() def update(args): get_last_ten() @subcommand() def serve(args): run(host=conf['server'].get('host', 'localhost'), port=conf['server'].getint('port', 9980), debug=conf['server'].getboolean('debug', False), reloader=conf['server'].getboolean('reloader', False)) # ---------------------------------------------------------------------- # Main def main(): args = cli.parse_args() if args.subcommand is None: cli.print_help() else: args.func(args) if __name__ == '__main__': main()