#!/usr/bin/env python3 from os.path import join, expanduser from itertools import zip_longest import urwid import gmusicapi import logging def sec_to_min_sec(sec_tot): if sec_tot is None: return 0, 0 else: min_ = int(sec_tot // 60) sec = int(sec_tot % 60) return min_, sec class MusicObject: @staticmethod def to_ui(*txts, weights=()): first, *rest = [(weight, str(txt)) for weight, txt in zip_longest(weights, txts, fillvalue=1)] items = [('weight', first[0], urwid.SelectableIcon(first[1], 0))] for weight, line in rest: items.append(('weight', weight, urwid.Text(line))) line = urwid.Columns(items) line = urwid.AttrMap(line, 'search normal', 'search select') return line @staticmethod def header_ui(*txts, weights=()): header = urwid.Columns([('weight', weight, urwid.Text(('header', txt))) for weight, txt in zip_longest(weights, txts, fillvalue=1)]) return urwid.AttrMap(header, 'header_bg') class Song(MusicObject): ui_weights = (1, 1, 1, 0.2) def __init__(self, title, album, albumId, artist, artistId, id_, type_, length): self.title = title self.album = album self.albumId = albumId self.artist = artist self.artistId = artistId self.id = id_ self.type = type_ self.length = length def __repr__(self): return f'' def __str__(self): return f'{self.title} by {self.artist}' def fmt_str(self): return [('np_song', f'{self.title} '), 'by ', ('np_artist', f'{self.artist}')] def ui(self): return self.to_ui(self.title, self.album, self.artist, '{:02d}:{:02d}'.format(*self.length), weights=self.ui_weights) @classmethod def header(cls): return MusicObject.header_ui('Title', 'Album', 'Artist', 'Length', weights=cls.ui_weights) @staticmethod def from_dict(d): try: title = d['title'] album = d['album'] albumId = d['albumId'] artist = d['artist'] artistId = d['artistId'][0] try: id_ = d['id'] type_ = 'library' except KeyError: id_ = d['storeId'] type_ = 'store' length = sec_to_min_sec(int(d['durationMillis']) / 1000) return Song(title, album, albumId, artist, artistId, id_, type_, length) except KeyError as e: logging.exception(f"Missing Key {e} in dict \n{d}") return None class Album(MusicObject): def __init__(self, title, artist, artistId, year, id_): self.title = title self.artist = artist self.artistId = artistId self.year = year self.id = id_ def __repr__(self): return f'' def ui(self): return self.to_ui(self.title, self.artist, self.year) @staticmethod def header(): return MusicObject.header_ui('Album', 'Artist', 'Year') @staticmethod def from_dict(d): try: title = d['name'] artist = d['albumArtist'] artistId = d['artistId'][0] try: year = d['year'] except KeyError: year = '' id_ = d['albumId'] return Album(title, artist, artistId, year, id_) except KeyError as e: logging.exception(f"Missing Key {e} in dict \n{d}") return None class Artist(MusicObject): def __init__(self, name, id_): self.name = name self.id = id_ def __repr__(self): return f'' def ui(self): return self.to_ui(self.name) @staticmethod def header(): return MusicObject.header_ui('Artist') @staticmethod def from_dict(d): try: name = d['name'] id_ = d['artistId'] return Artist(name, id_) except KeyError as e: logging.exception(f"Missing Key {e} in dict \n{d}") return None class SearchInput(urwid.Edit): def __init__(self, app): self.app = app super().__init__('search > ', multiline=False, allow_tab=False) def keypress(self, size, key): if key == 'enter': txt = self.edit_text if txt: self.set_edit_text('') self.app.search(txt) return None else: size = (size[0],) return super().keypress(size, key) class SearchPanel(urwid.ListBox): def __init__(self, app): self.app = app self.walker = urwid.SimpleFocusListWalker([]) self.history = [] self.search_results = ([], [], []) super().__init__(self.walker) def keypress(self, size, key): if key == 'q': selected = self.selected_search_obj() if selected: if type(selected) == Song: self.app.queue_panel.add_song_to_queue(selected) elif type(selected) == Album: self.app.queue_panel.add_album_to_queue(selected) elif key == 'e': if self.selected_search_obj() is not None: self.app.expand(self.selected_search_obj()) elif key == 'backspace': self.back() elif key == 'r': if self.selected_search_obj() is not None: self.app.create_radio_station(self.selected_search_obj()) elif key == 'j': super().keypress(size, 'down') elif key == 'k': super().keypress(size, 'up') else: super().keypress(size, key) def back(self): if self.history: self.set_search_results(*self.history.pop()) def update_search_results(self, songs, albums, artists): self.history.append(self.search_results) self.set_search_results(songs, albums, artists) def set_search_results(self, songs, albums, artists): songs = [obj for obj in songs if obj is not None] albums = [obj for obj in albums if obj is not None] artists = [obj for obj in artists if obj is not None] self.search_results = (songs, albums, artists) self.walker.clear() if artists: self.walker.append(Artist.header()) for artist in artists: self.walker.append(artist.ui()) if albums: self.walker.append(Album.header()) for album in albums: self.walker.append(album.ui()) if songs: self.walker.append(Song.header()) for song in songs: self.walker.append(song.ui()) if self.walker: self.walker.set_focus(1) def selected_search_obj(self): focus_id = self.walker.get_focus()[1] songs, albums, artists = self.search_results try: if artists: focus_id -= 1 if focus_id < len(artists): return artists[focus_id] focus_id -= len(artists) if albums: focus_id -= 1 if focus_id < len(albums): return albums[focus_id] focus_id -= len(albums) focus_id -= 1 return songs[focus_id] except (IndexError, TypeError): return None class PlayBar(urwid.ProgressBar): # Class stolen from: https://github.com/and3rson/clay/blob/master/clay/playbar.py vol_inds = [' ', '▁', '▂', '▃', '▄', '▅', '▆', '▇', '█'] def __init__(self, app, *args, **kwargs): super(PlayBar, self).__init__(*args, **kwargs) self.app = app def get_prog_tot(self): curr_time_s = self.app.player.time_pos rem_time_s = self.app.player.time_remaining if curr_time_s is None or rem_time_s is None: return 0, 0 else: progress = int(curr_time_s) total = int(curr_time_s + rem_time_s) return progress, total def get_text(self): if self.app.current_song is None: return 'Idle' progress, total = self.get_prog_tot() # return [u' \u25B6 ' if self.app.play_state == 'play' else u' \u25A0 ', # ('np_artist', self.app.current_song.artist), # ' - ', # ('np_song', self.app.current_song.title), # ' [{:02d}:{:02d} / {:02d}:{:02d}]'.format(progress // 60, # progress % 60, # total // 60, # total % 60) # ] return ' {} {} - {} [{:02d}:{:02d} / {:02d}:{:02d}] {}'.format( '▶' if self.app.play_state == 'play' else '■', self.app.current_song.artist, self.app.current_song.title, progress // 60, progress % 60, total // 60, total % 60, self.vol_inds[self.app.volume] ) def update(self): self._invalidate() progress, total = self.get_prog_tot() if progress >= 0 and total > 0: percent = progress / total * 100 self.set_completion(percent) else: self.set_completion(0) def render(self, size, focus=False): """ Render the progress bar - fixed implementation. For details see https://github.com/urwid/urwid/pull/261 """ (maxcol,) = size txt = urwid.Text(self.get_text(), self.text_align, urwid.widget.CLIP) c = txt.render((maxcol,)) cf = float(self.current) * maxcol / self.done ccol_dirty = int(cf) ccol = len(c._text[0][:ccol_dirty].decode( 'utf-8', 'ignore' ).encode( 'utf-8' )) cs = 0 if self.satt is not None: cs = int((cf - ccol) * 8) if ccol < 0 or (ccol == 0 and cs == 0): c._attr = [[(self.normal, maxcol)]] elif ccol >= maxcol: c._attr = [[(self.complete, maxcol)]] elif cs and c._text[0][ccol] == " ": t = c._text[0] cenc = self.eighths[cs].encode("utf-8") c._text[0] = t[:ccol] + cenc + t[ccol + 1:] a = [] if ccol > 0: a.append((self.complete, ccol)) a.append((self.satt, len(cenc))) if maxcol - ccol - 1 > 0: a.append((self.normal, maxcol - ccol - 1)) c._attr = [a] c._cs = [[(None, len(c._text[0]))]] else: c._attr = [[(self.complete, ccol), (self.normal, maxcol - ccol)]] return c class QueuePanel(urwid.ListBox): def __init__(self, app): self.app = app self.walker = urwid.SimpleFocusListWalker([]) self.queue = [] super().__init__(self.walker) def add_song_to_queue(self, song): if song: self.queue.append(song) self.walker.append(song.ui()) def add_album_to_queue(self, album): album_info = self.app.g_api.get_album_info(album.id) for track in album_info['tracks']: song = Song.from_dict(track) if song: self.queue.append(song) self.walker.append(song.ui()) def drop(self, idx): if 0 <= idx < len(self.queue): self.queue.pop(idx) self.walker.pop(idx) def clear(self, idx): self.queue.clear() self.walker.clear() def swap(self, idx1, idx2): if (0 <= idx1 < len(self.queue)) and (0 <= idx2 < len(self.queue)): obj1, obj2 = self.queue[idx1], self.queue[idx2] self.queue[idx1], self.queue[idx2] = obj2, obj1 ui1, ui2 = self.walker[idx1], self.walker[idx2] self.walker[idx1], self.walker[idx2] = ui2, ui1 def shuffle(self): from random import shuffle as rshuffle rshuffle(self.queue) self.walker.clear() for s in self.queue: self.walker.append(s.ui()) def play_next(self): if self.walker: self.walker.pop(0) self.app.play(self.queue.pop(0)) else: self.app.stop() def selected_queue_obj(self): try: focus_id = self.walker.get_focus()[1] return self.queue[focus_id] except (IndexError, TypeError): return None def keypress(self, size, key): focus_id = self.walker.get_focus()[1] if focus_id is None: return super().keypress(size, key) if key == 'u': self.swap(focus_id, focus_id-1) self.keypress(size, 'up') elif key == 'd': self.swap(focus_id, focus_id+1) self.keypress(size, 'down') elif key == 'delete': self.drop(focus_id) elif key == 'j': super().keypress(size, 'down') elif key == 'k': super().keypress(size, 'up') elif key == 'e': self.app.expand(self.selected_queue_obj()) elif key == ' ': if self.app.play_state == 'stop': self.play_next() else: self.app.toggle_play() else: return super().keypress(size, key) class App(urwid.Pile): palette = [ ('header', 'white,underline', 'black'), ('header_bg', 'white', 'black'), ('line', 'white', ''), ('search normal', 'white', ''), ('search select', '', '', '', 'white', '#D32'), ('region_bg normal', '', ''), ('region_bg select', '', 'black'), ('progress', '', '', '', '#FFF', '#F54'), ('progress_remaining', '', '', '', '#FFF', '#444'), ] def __init__(self): self.read_config() self.g_api = gmusicapi.Mobileclient(debug_logging=False) self.g_api.login(self.email, self.password, self.device_id) import mpv self.player = mpv.MPV() self.player.volume = 100 self.volume = 8 @self.player.event_callback('end_file') def callback(event): if event['event']['reason'] == 0: self.queue_panel.play_next() self.search_panel = SearchPanel(self) search_panel_wrapped = urwid.LineBox(self.search_panel, title='Search Results') search_panel_wrapped = urwid.AttrMap(search_panel_wrapped, 'region_bg normal', 'region_bg select') self.search_panel_wrapped = search_panel_wrapped self.playbar = PlayBar(self, 'progress_remaining', 'progress', current=0, done=100) # self.now_playing = urwid.Text('') # self.progress = urwid.Text('0:00/0:00', align='right') # status_line = urwid.Columns([('weight', 3, self.now_playing), # ('weight', 1, self.progress)]) self.queue_panel = QueuePanel(self) queue_panel_wrapped = urwid.LineBox(self.queue_panel, title='Queue') queue_panel_wrapped = urwid.AttrMap(queue_panel_wrapped, 'region_bg normal', 'region_bg select') self.queue_panel_wrapped = queue_panel_wrapped self.search_input = urwid.Edit('> ', multiline=False) self.search_input = SearchInput(self) urwid.Pile.__init__(self, [('weight', 12, search_panel_wrapped), ('pack', self.playbar), ('weight', 7, queue_panel_wrapped), ('pack', self.search_input) ]) self.set_focus(self.search_input) self.play_state = 'stop' self.current_song = None self.history = [] def read_config(self): import yaml config_file = join(expanduser('~'), '.config', 'tuijam', 'config.yaml') with open(config_file) as f: config = yaml.load(f.read()) self.email = config['email'] self.password = config['password'] self.device_id = config['device_id'] def refresh(self, *args, **kwargs): if self.play_state == 'play' and self.player.eof_reached: self.queue_panel.play_next() self.playbar.update() self.loop.draw_screen() if self.play_state == 'play': self.schedule_refresh() def schedule_refresh(self, dt=0.5): self.loop.set_alarm_in(dt, self.refresh) def play(self, song): self.current_song = song url = self.g_api.get_stream_url(song.id) self.player.play(url) self.player.pause = False self.play_state = 'play' self.playbar.update() self.history.append(song) self.schedule_refresh() def stop(self): self.current_song = None self.player.pause = True self.play_state = 'stop' self.playbar.update() def seek(self, dt): try: self.player.seek(dt) except SystemError: pass def toggle_play(self): if self.play_state == 'play': self.player.pause = True self.play_state = 'pause' self.playbar.update() elif self.play_state == 'pause': self.player.pause = False self.play_state = 'play' self.playbar.update() self.schedule_refresh() elif self.play_state == 'stop': self.queue_panel.play_next() def volume_down(self): self.volume = max([0, self.volume-1]) self.player.volume = int(self.volume * 100 / 8) self.playbar.update() def volume_up(self): self.volume = min([8, self.volume+1]) self.player.volume = int(self.volume * 100 / 8) self.playbar.update() def keypress(self, size, key): if key == 'tab': current_focus = self.focus if current_focus == self.search_panel_wrapped: self.set_focus(self.queue_panel_wrapped) elif current_focus == self.queue_panel_wrapped: self.set_focus(self.search_input) else: self.set_focus(self.search_panel_wrapped) elif key == 'shift tab': current_focus = self.focus if current_focus == self.search_panel_wrapped: self.set_focus(self.search_input) elif current_focus == self.queue_panel_wrapped: self.set_focus(self.search_panel_wrapped) else: self.set_focus(self.queue_panel_wrapped) elif key == 'ctrl p': self.toggle_play() elif key == 'ctrl q': self.stop() elif key == 'ctrl n': self.queue_panel.play_next() elif key == 'ctrl r': self.search_panel.update_search_results(self.history, [], []) elif key == 'ctrl s': self.queue_panel.shuffle() elif self.focus != self.search_input: if key == '>': self.seek(10) elif key == '<': self.seek(-10) elif key in '-_': self.volume_down() elif key in '+=': self.volume_up() else: return self.focus.keypress(size, key) else: return self.focus.keypress(size, key) def expand(self, obj): if obj is None: return if type(obj) == Song: album_info = self.g_api.get_album_info(obj.albumId) songs = [Song.from_dict(track) for track in album_info['tracks']] albums = [Album.from_dict(album_info)] artists = [Artist(obj.artist, obj.artistId)] elif type(obj) == Album: album_info = self.g_api.get_album_info(obj.id) songs = [Song.from_dict(track) for track in album_info['tracks']] albums = [obj] artists = [Artist(obj.artist, obj.artistId)] else: # Artist artist_info = self.g_api.get_artist_info(obj.id) songs = [Song.from_dict(track) for track in artist_info['topTracks']] albums = [Album.from_dict(album) for album in artist_info.get('albums', [])] artists = [Artist.from_dict(artist) for artist in artist_info['related_artists']] artists.insert(0, obj) self.search_panel.update_search_results(songs, albums, artists) def search(self, query): results = self.g_api.search(query) songs = [Song.from_dict(hit['track']) for hit in results['song_hits']] albums = [Album.from_dict(hit['album']) for hit in results['album_hits']] artists = [Artist.from_dict(hit['artist']) for hit in results['artist_hits']] self.search_panel.update_search_results(songs, albums, artists) self.set_focus(self.search_panel_wrapped) def create_radio_station(self, obj): if type(obj) == Song: station_name = obj.title + " radio" station_id = self.g_api.create_station(station_name, track_id=obj.id) elif type(obj) == Album: station_name = obj.title + " radio" station_id = self.g_api.create_station(station_name, album_id=obj.id) else: # Artist station_name = obj.name + " radio" station_id = self.g_api.create_station(station_name, artist_id=obj.id) song_dicts = self.g_api.get_station_tracks(station_id, num_tracks=50) for song_dict in song_dicts: self.queue_panel.add_song_to_queue(Song.from_dict(song_dict)) def cleanup(self): self.player.quit() del self.player self.g_api.logout() if __name__ == '__main__': log_file = join(expanduser('~'), '.config', 'tuijam', 'log.txt') logging.basicConfig(filename=log_file, filemode='w', level=logging.ERROR) app = App() import signal def handle_sigint(signum, frame): raise urwid.ExitMainLoop() signal.signal(signal.SIGINT, handle_sigint) loop = urwid.MainLoop(app, app.palette) app.loop = loop loop.screen.set_terminal_properties(256) try: loop.run() except Exception as e: logging.exception(e) print("Something bad happened! :( see log file ($HOME/.config/tuijam/log.txt) for more information.") app.cleanup()