123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678 |
- #!/usr/bin/env python3
- from os.path import join, expanduser
- from itertools import zip_longest
- import urwid
- import gmusicapi
- import logging
- def sec_to_min_sec(sec_tot):
- if sec_tot is None:
- return 0, 0
- else:
- min_ = int(sec_tot // 60)
- sec = int(sec_tot % 60)
- return min_, sec
- class MusicObject:
- @staticmethod
- def to_ui(*txts, weights=()):
- first, *rest = [(weight, str(txt)) for weight, txt in zip_longest(weights, txts, fillvalue=1)]
- items = [('weight', first[0], urwid.SelectableIcon(first[1], 0))]
- for weight, line in rest:
- items.append(('weight', weight, urwid.Text(line)))
- line = urwid.Columns(items)
- line = urwid.AttrMap(line, 'search normal', 'search select')
- return line
- @staticmethod
- def header_ui(*txts, weights=()):
- header = urwid.Columns([('weight', weight, urwid.Text(('header', txt)))
- for weight, txt in zip_longest(weights, txts, fillvalue=1)])
- return urwid.AttrMap(header, 'header_bg')
- class Song(MusicObject):
- ui_weights = (1, 1, 1, 0.2)
- def __init__(self, title, album, albumId, artist, artistId, id_, type_, length):
- self.title = title
- self.album = album
- self.albumId = albumId
- self.artist = artist
- self.artistId = artistId
- self.id = id_
- self.type = type_
- self.length = length
- def __repr__(self):
- return f'<Song title:{self.title}, album:{self.album}, artist:{self.artist}>'
- def __str__(self):
- return f'{self.title} by {self.artist}'
- def fmt_str(self):
- return [('np_song', f'{self.title} '), 'by ', ('np_artist', f'{self.artist}')]
- def ui(self):
- return self.to_ui(self.title, self.album, self.artist,
- '{:02d}:{:02d}'.format(*self.length),
- weights=self.ui_weights)
- @classmethod
- def header(cls):
- return MusicObject.header_ui('Title', 'Album', 'Artist', 'Length', weights=cls.ui_weights)
- @staticmethod
- def from_dict(d):
- try:
- title = d['title']
- album = d['album']
- albumId = d['albumId']
- artist = d['artist']
- artistId = d['artistId'][0]
- try:
- id_ = d['id']
- type_ = 'library'
- except KeyError:
- id_ = d['storeId']
- type_ = 'store'
- length = sec_to_min_sec(int(d['durationMillis']) / 1000)
- return Song(title, album, albumId, artist, artistId, id_, type_, length)
- except KeyError as e:
- logging.exception(f"Missing Key {e} in dict \n{d}")
- return None
- class Album(MusicObject):
- def __init__(self, title, artist, artistId, year, id_):
- self.title = title
- self.artist = artist
- self.artistId = artistId
- self.year = year
- self.id = id_
- def __repr__(self):
- return f'<Album title:{self.title}, artist:{self.artist}, year:{self.year}>'
- def ui(self):
- return self.to_ui(self.title, self.artist, self.year)
- @staticmethod
- def header():
- return MusicObject.header_ui('Album', 'Artist', 'Year')
- @staticmethod
- def from_dict(d):
- try:
- title = d['name']
- artist = d['albumArtist']
- artistId = d['artistId'][0]
- try:
- year = d['year']
- except KeyError:
- year = ''
- id_ = d['albumId']
- return Album(title, artist, artistId, year, id_)
- except KeyError as e:
- logging.exception(f"Missing Key {e} in dict \n{d}")
- return None
- class Artist(MusicObject):
- def __init__(self, name, id_):
- self.name = name
- self.id = id_
- def __repr__(self):
- return f'<Artist name:{self.name}>'
- def ui(self):
- return self.to_ui(self.name)
- @staticmethod
- def header():
- return MusicObject.header_ui('Artist')
- @staticmethod
- def from_dict(d):
- try:
- name = d['name']
- id_ = d['artistId']
- return Artist(name, id_)
- except KeyError as e:
- logging.exception(f"Missing Key {e} in dict \n{d}")
- return None
- class SearchInput(urwid.Edit):
- def __init__(self, app):
- self.app = app
- super().__init__('search > ', multiline=False, allow_tab=False)
- def keypress(self, size, key):
- if key == 'enter':
- txt = self.edit_text
- if txt:
- self.set_edit_text('')
- self.app.search(txt)
- return None
- else:
- size = (size[0],)
- return super().keypress(size, key)
- class SearchPanel(urwid.ListBox):
- def __init__(self, app):
- self.app = app
- self.walker = urwid.SimpleFocusListWalker([])
- self.history = []
- self.search_results = ([], [], [])
- super().__init__(self.walker)
- def keypress(self, size, key):
- if key == 'q':
- selected = self.selected_search_obj()
- if selected:
- if type(selected) == Song:
- self.app.queue_panel.add_song_to_queue(selected)
- elif type(selected) == Album:
- self.app.queue_panel.add_album_to_queue(selected)
- elif key == 'e':
- if self.selected_search_obj() is not None:
- self.app.expand(self.selected_search_obj())
- elif key == 'backspace':
- self.back()
- elif key == 'r':
- if self.selected_search_obj() is not None:
- self.app.create_radio_station(self.selected_search_obj())
- elif key == 'j':
- super().keypress(size, 'down')
- elif key == 'k':
- super().keypress(size, 'up')
- else:
- super().keypress(size, key)
- def back(self):
- if self.history:
- self.set_search_results(*self.history.pop())
- def update_search_results(self, songs, albums, artists):
- self.history.append(self.search_results)
- self.set_search_results(songs, albums, artists)
- def set_search_results(self, songs, albums, artists):
- songs = [obj for obj in songs if obj is not None]
- albums = [obj for obj in albums if obj is not None]
- artists = [obj for obj in artists if obj is not None]
- self.search_results = (songs, albums, artists)
- self.walker.clear()
- if artists:
- self.walker.append(Artist.header())
- for artist in artists:
- self.walker.append(artist.ui())
- if albums:
- self.walker.append(Album.header())
- for album in albums:
- self.walker.append(album.ui())
- if songs:
- self.walker.append(Song.header())
- for song in songs:
- self.walker.append(song.ui())
- if self.walker:
- self.walker.set_focus(1)
- def selected_search_obj(self):
- focus_id = self.walker.get_focus()[1]
- songs, albums, artists = self.search_results
- try:
- if artists:
- focus_id -= 1
- if focus_id < len(artists):
- return artists[focus_id]
- focus_id -= len(artists)
- if albums:
- focus_id -= 1
- if focus_id < len(albums):
- return albums[focus_id]
- focus_id -= len(albums)
- focus_id -= 1
- return songs[focus_id]
- except (IndexError, TypeError):
- return None
- class PlayBar(urwid.ProgressBar):
- # Class stolen from: https://github.com/and3rson/clay/blob/master/clay/playbar.py
- vol_inds = [' ', '▁', '▂', '▃', '▄', '▅', '▆', '▇', '█']
- def __init__(self, app, *args, **kwargs):
- super(PlayBar, self).__init__(*args, **kwargs)
- self.app = app
- def get_prog_tot(self):
- curr_time_s = self.app.player.time_pos
- rem_time_s = self.app.player.time_remaining
- if curr_time_s is None or rem_time_s is None:
- return 0, 0
- else:
- progress = int(curr_time_s)
- total = int(curr_time_s + rem_time_s)
- return progress, total
- def get_text(self):
- if self.app.current_song is None:
- return 'Idle'
- progress, total = self.get_prog_tot()
- # return [u' \u25B6 ' if self.app.play_state == 'play' else u' \u25A0 ',
- # ('np_artist', self.app.current_song.artist),
- # ' - ',
- # ('np_song', self.app.current_song.title),
- # ' [{:02d}:{:02d} / {:02d}:{:02d}]'.format(progress // 60,
- # progress % 60,
- # total // 60,
- # total % 60)
- # ]
- return ' {} {} - {} [{:02d}:{:02d} / {:02d}:{:02d}] {}'.format(
- '▶' if self.app.play_state == 'play' else '■',
- self.app.current_song.artist,
- self.app.current_song.title,
- progress // 60,
- progress % 60,
- total // 60,
- total % 60,
- self.vol_inds[self.app.volume]
- )
- def update(self):
- self._invalidate()
- progress, total = self.get_prog_tot()
- if progress >= 0 and total > 0:
- percent = progress / total * 100
- self.set_completion(percent)
- else:
- self.set_completion(0)
- def render(self, size, focus=False):
- """
- Render the progress bar - fixed implementation.
- For details see https://github.com/urwid/urwid/pull/261
- """
- (maxcol,) = size
- txt = urwid.Text(self.get_text(), self.text_align, urwid.widget.CLIP)
- c = txt.render((maxcol,))
- cf = float(self.current) * maxcol / self.done
- ccol_dirty = int(cf)
- ccol = len(c._text[0][:ccol_dirty].decode(
- 'utf-8', 'ignore'
- ).encode(
- 'utf-8'
- ))
- cs = 0
- if self.satt is not None:
- cs = int((cf - ccol) * 8)
- if ccol < 0 or (ccol == 0 and cs == 0):
- c._attr = [[(self.normal, maxcol)]]
- elif ccol >= maxcol:
- c._attr = [[(self.complete, maxcol)]]
- elif cs and c._text[0][ccol] == " ":
- t = c._text[0]
- cenc = self.eighths[cs].encode("utf-8")
- c._text[0] = t[:ccol] + cenc + t[ccol + 1:]
- a = []
- if ccol > 0:
- a.append((self.complete, ccol))
- a.append((self.satt, len(cenc)))
- if maxcol - ccol - 1 > 0:
- a.append((self.normal, maxcol - ccol - 1))
- c._attr = [a]
- c._cs = [[(None, len(c._text[0]))]]
- else:
- c._attr = [[(self.complete, ccol),
- (self.normal, maxcol - ccol)]]
- return c
- class QueuePanel(urwid.ListBox):
- def __init__(self, app):
- self.app = app
- self.walker = urwid.SimpleFocusListWalker([])
- self.queue = []
- super().__init__(self.walker)
- def add_song_to_queue(self, song):
- if song:
- self.queue.append(song)
- self.walker.append(song.ui())
- def add_album_to_queue(self, album):
- album_info = self.app.g_api.get_album_info(album.id)
- for track in album_info['tracks']:
- song = Song.from_dict(track)
- if song:
- self.queue.append(song)
- self.walker.append(song.ui())
- def drop(self, idx):
- if 0 <= idx < len(self.queue):
- self.queue.pop(idx)
- self.walker.pop(idx)
- def clear(self, idx):
- self.queue.clear()
- self.walker.clear()
- def swap(self, idx1, idx2):
- if (0 <= idx1 < len(self.queue)) and (0 <= idx2 < len(self.queue)):
- obj1, obj2 = self.queue[idx1], self.queue[idx2]
- self.queue[idx1], self.queue[idx2] = obj2, obj1
- ui1, ui2 = self.walker[idx1], self.walker[idx2]
- self.walker[idx1], self.walker[idx2] = ui2, ui1
- def shuffle(self):
- from random import shuffle as rshuffle
- rshuffle(self.queue)
- self.walker.clear()
- for s in self.queue:
- self.walker.append(s.ui())
- def play_next(self):
- if self.walker:
- self.walker.pop(0)
- self.app.play(self.queue.pop(0))
- else:
- self.app.stop()
- def selected_queue_obj(self):
- try:
- focus_id = self.walker.get_focus()[1]
- return self.queue[focus_id]
- except (IndexError, TypeError):
- return None
- def keypress(self, size, key):
- focus_id = self.walker.get_focus()[1]
- if focus_id is None:
- return super().keypress(size, key)
- if key == 'u':
- self.swap(focus_id, focus_id-1)
- self.keypress(size, 'up')
- elif key == 'd':
- self.swap(focus_id, focus_id+1)
- self.keypress(size, 'down')
- elif key == 'delete':
- self.drop(focus_id)
- elif key == 'j':
- super().keypress(size, 'down')
- elif key == 'k':
- super().keypress(size, 'up')
- elif key == 'e':
- self.app.expand(self.selected_queue_obj())
- elif key == ' ':
- if self.app.play_state == 'stop':
- self.play_next()
- else:
- self.app.toggle_play()
- else:
- return super().keypress(size, key)
- class App(urwid.Pile):
- palette = [
- ('header', 'white,underline', 'black'),
- ('header_bg', 'white', 'black'),
- ('line', 'white', ''),
- ('search normal', 'white', ''),
- ('search select', '', '', '', 'white', '#D32'),
- ('region_bg normal', '', ''),
- ('region_bg select', '', 'black'),
- ('progress', '', '', '', '#FFF', '#F54'),
- ('progress_remaining', '', '', '', '#FFF', '#444'),
- ]
- def __init__(self):
- self.read_config()
- self.g_api = gmusicapi.Mobileclient(debug_logging=False)
- self.g_api.login(self.email, self.password, self.device_id)
- import mpv
- self.player = mpv.MPV()
- self.player.volume = 100
- self.volume = 8
- @self.player.event_callback('end_file')
- def callback(event):
- if event['event']['reason'] == 0:
- self.queue_panel.play_next()
- self.search_panel = SearchPanel(self)
- search_panel_wrapped = urwid.LineBox(self.search_panel, title='Search Results')
- search_panel_wrapped = urwid.AttrMap(search_panel_wrapped, 'region_bg normal', 'region_bg select')
- self.search_panel_wrapped = search_panel_wrapped
- self.playbar = PlayBar(self, 'progress_remaining', 'progress', current=0, done=100)
- # self.now_playing = urwid.Text('')
- # self.progress = urwid.Text('0:00/0:00', align='right')
- # status_line = urwid.Columns([('weight', 3, self.now_playing),
- # ('weight', 1, self.progress)])
- self.queue_panel = QueuePanel(self)
- queue_panel_wrapped = urwid.LineBox(self.queue_panel, title='Queue')
- queue_panel_wrapped = urwid.AttrMap(queue_panel_wrapped, 'region_bg normal', 'region_bg select')
- self.queue_panel_wrapped = queue_panel_wrapped
- self.search_input = urwid.Edit('> ', multiline=False)
- self.search_input = SearchInput(self)
- urwid.Pile.__init__(self, [('weight', 12, search_panel_wrapped),
- ('pack', self.playbar),
- ('weight', 7, queue_panel_wrapped),
- ('pack', self.search_input)
- ])
- self.set_focus(self.search_input)
- self.play_state = 'stop'
- self.current_song = None
- self.history = []
- def read_config(self):
- import yaml
- config_file = join(expanduser('~'), '.config', 'tuijam', 'config.yaml')
- with open(config_file) as f:
- config = yaml.load(f.read())
- self.email = config['email']
- self.password = config['password']
- self.device_id = config['device_id']
- def refresh(self, *args, **kwargs):
- if self.play_state == 'play' and self.player.eof_reached:
- self.queue_panel.play_next()
- self.playbar.update()
- self.loop.draw_screen()
- if self.play_state == 'play':
- self.schedule_refresh()
- def schedule_refresh(self, dt=0.5):
- self.loop.set_alarm_in(dt, self.refresh)
- def play(self, song):
- self.current_song = song
- url = self.g_api.get_stream_url(song.id)
- self.player.play(url)
- self.player.pause = False
- self.play_state = 'play'
- self.playbar.update()
- self.history.append(song)
- self.schedule_refresh()
- def stop(self):
- self.current_song = None
- self.player.pause = True
- self.play_state = 'stop'
- self.playbar.update()
- def seek(self, dt):
- try:
- self.player.seek(dt)
- except SystemError:
- pass
- def toggle_play(self):
- if self.play_state == 'play':
- self.player.pause = True
- self.play_state = 'pause'
- self.playbar.update()
- elif self.play_state == 'pause':
- self.player.pause = False
- self.play_state = 'play'
- self.playbar.update()
- self.schedule_refresh()
- elif self.play_state == 'stop':
- self.queue_panel.play_next()
- def volume_down(self):
- self.volume = max([0, self.volume-1])
- self.player.volume = int(self.volume * 100 / 8)
- self.playbar.update()
- def volume_up(self):
- self.volume = min([8, self.volume+1])
- self.player.volume = int(self.volume * 100 / 8)
- self.playbar.update()
- def keypress(self, size, key):
- if key == 'tab':
- current_focus = self.focus
- if current_focus == self.search_panel_wrapped:
- self.set_focus(self.queue_panel_wrapped)
- elif current_focus == self.queue_panel_wrapped:
- self.set_focus(self.search_input)
- else:
- self.set_focus(self.search_panel_wrapped)
- elif key == 'shift tab':
- current_focus = self.focus
- if current_focus == self.search_panel_wrapped:
- self.set_focus(self.search_input)
- elif current_focus == self.queue_panel_wrapped:
- self.set_focus(self.search_panel_wrapped)
- else:
- self.set_focus(self.queue_panel_wrapped)
- elif key == 'ctrl p':
- self.toggle_play()
- elif key == 'ctrl q':
- self.stop()
- elif key == 'ctrl n':
- self.queue_panel.play_next()
- elif key == 'ctrl r':
- self.search_panel.update_search_results(self.history, [], [])
- elif key == 'ctrl s':
- self.queue_panel.shuffle()
- elif self.focus != self.search_input:
- if key == '>':
- self.seek(10)
- elif key == '<':
- self.seek(-10)
- elif key in '-_':
- self.volume_down()
- elif key in '+=':
- self.volume_up()
- else:
- return self.focus.keypress(size, key)
- else:
- return self.focus.keypress(size, key)
- def expand(self, obj):
- if obj is None:
- return
- if type(obj) == Song:
- album_info = self.g_api.get_album_info(obj.albumId)
- songs = [Song.from_dict(track) for track in album_info['tracks']]
- albums = [Album.from_dict(album_info)]
- artists = [Artist(obj.artist, obj.artistId)]
- elif type(obj) == Album:
- album_info = self.g_api.get_album_info(obj.id)
- songs = [Song.from_dict(track) for track in album_info['tracks']]
- albums = [obj]
- artists = [Artist(obj.artist, obj.artistId)]
- else: # Artist
- artist_info = self.g_api.get_artist_info(obj.id)
- songs = [Song.from_dict(track) for track in artist_info['topTracks']]
- albums = [Album.from_dict(album) for album in artist_info.get('albums', [])]
- artists = [Artist.from_dict(artist) for artist in artist_info['related_artists']]
- artists.insert(0, obj)
- self.search_panel.update_search_results(songs, albums, artists)
- def search(self, query):
- results = self.g_api.search(query)
- songs = [Song.from_dict(hit['track']) for hit in results['song_hits']]
- albums = [Album.from_dict(hit['album']) for hit in results['album_hits']]
- artists = [Artist.from_dict(hit['artist']) for hit in results['artist_hits']]
- self.search_panel.update_search_results(songs, albums, artists)
- self.set_focus(self.search_panel_wrapped)
- def create_radio_station(self, obj):
- if type(obj) == Song:
- station_name = obj.title + " radio"
- station_id = self.g_api.create_station(station_name, track_id=obj.id)
- elif type(obj) == Album:
- station_name = obj.title + " radio"
- station_id = self.g_api.create_station(station_name, album_id=obj.id)
- else: # Artist
- station_name = obj.name + " radio"
- station_id = self.g_api.create_station(station_name, artist_id=obj.id)
- song_dicts = self.g_api.get_station_tracks(station_id, num_tracks=50)
- for song_dict in song_dicts:
- self.queue_panel.add_song_to_queue(Song.from_dict(song_dict))
- def cleanup(self):
- self.player.quit()
- del self.player
- self.g_api.logout()
- if __name__ == '__main__':
- log_file = join(expanduser('~'), '.config', 'tuijam', 'log.txt')
- logging.basicConfig(filename=log_file, filemode='w', level=logging.ERROR)
- app = App()
- import signal
- def handle_sigint(signum, frame):
- raise urwid.ExitMainLoop()
- signal.signal(signal.SIGINT, handle_sigint)
- loop = urwid.MainLoop(app, app.palette)
- app.loop = loop
- loop.screen.set_terminal_properties(256)
- try:
- loop.run()
- except Exception as e:
- logging.exception(e)
- print("Something bad happened! :( see log file ($HOME/.config/tuijam/log.txt) for more information.")
- app.cleanup()
|