"""
https://github.com/np1/mps-youtube
Copyright (C) 2014, 2015 np1 and contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from xml.etree import ElementTree as ET
import unicodedata
import subprocess
import traceback
import threading
import difflib
import logging
import base64
import random
import locale
import socket
import shlex
import time
import math
import json
import copy
import sys
import re
import os
import pickle
import webbrowser
from urllib.request import urlopen, build_opener
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
import pafy
from pafy import call_gdata, GdataError
from . import g, c, commands, cache, streams, screen, content
from . import __version__, __url__
from .playlist import Playlist, Video
from .config import Config, known_player_set
from .util import has_exefile, dbg, get_near_name
from .util import get_pafy, getxy
from .util import xenc, xprint, mswinfn, set_window_title, F
from .helptext import get_help
from .player import launch_player
try:
import readline
readline.set_history_length(2000)
has_readline = True
except ImportError:
has_readline = False
try:
# pylint: disable=F0401
import pyperclip
has_pyperclip = True
except ImportError:
has_pyperclip = False
mswin = os.name == "nt"
not_utf8_environment = mswin or "UTF-8" not in sys.stdout.encoding
locale.setlocale(locale.LC_ALL, "") # for date formatting
ISO8601_TIMEDUR_EX = re.compile(r'PT((\d{1,3})H)?((\d{1,3})M)?((\d{1,2})S)?')
class IterSlicer():
""" Class that takes an iterable and allows slicing,
loading from the iterable as needed."""
def __init__(self, iterable):
self.ilist = []
self.iterable = iter(iterable)
def __getitem__(self, sliced):
if isinstance(sliced, slice):
stop = sliced.stop
else:
stop = sliced
# To get the last item in an iterable, must iterate over all items
if (stop is None) or (stop < 0):
stop = None
while (stop is None) or (stop > len(self.ilist) - 1):
try:
self.ilist.append(next(self.iterable))
except StopIteration:
break
return self.ilist[sliced]
def get_content_length(url, preloading=False):
""" Return content length of a url. """
prefix = "preload: " if preloading else ""
dbg(c.y + prefix + "getting content-length header" + c.w)
response = urlopen(url)
headers = response.headers
cl = headers['content-length']
return int(cl)
def get_size(ytid, url, preloading=False):
""" Get size of stream, try stream cache first. """
# try cached value
stream = [x for x in g.streams[ytid]['meta'] if x['url'] == url][0]
size = stream['size']
prefix = "preload: " if preloading else ""
if not size == -1:
dbg("%s%susing cached size: %s%s", c.g, prefix, size, c.w)
else:
screen.writestatus("Getting content length", mute=preloading)
stream['size'] = get_content_length(url, preloading=preloading)
dbg("%s%s - content-length: %s%s", c.y, prefix, stream['size'], c.w)
return stream['size']
@commands.command(r'set|showconfig')
def showconfig():
""" Dump config data. """
width = getxy().width
width -= 30
s = " %s%-17s%s : %s\n"
out = " %s%-17s %s%s%s\n" % (c.ul, "Key", "Value", " " * width, c.w)
for setting in Config:
val = Config[setting]
# don't show player specific settings if unknown player
if not known_player_set() and val.require_known_player:
continue
# don't show max_results if auto determined
if g.detectable_size and setting == "MAX_RESULTS":
continue
if g.detectable_size and setting == "CONSOLE_WIDTH":
continue
out += s % (c.g, setting.lower(), c.w, val.display)
g.content = out
g.message = "Enter %sset <key> <value>%s to change\n" % (c.g, c.w)
g.message += "Enter %sset all default%s to reset all" % (c.g, c.w)
@commands.command(r'set\s+([-\w]+)\s*(.*)')
def setconfig(key, val):
""" Set configuration variable. """
key = key.replace("-", "_")
if key.upper() == "ALL" and val.upper() == "DEFAULT":
for ci in Config:
Config[ci].value = Config[ci].default
Config.save()
message = "Default configuration reinstated"
elif not key.upper() in Config:
message = "Unknown config item: %s%s%s" % (c.r, key, c.w)
elif val.upper() == "DEFAULT":
att = Config[key.upper()]
att.value = att.default
message = "%s%s%s set to %s%s%s (default)"
dispval = att.display or "None"
message = message % (c.y, key, c.w, c.y, dispval, c.w)
Config.save()
else:
# Config.save() will be called by Config.set() method
message = Config[key.upper()].set(val)
showconfig()
g.message = message
def save_to_file():
""" Save playlists. Called each time a playlist is saved or deleted. """
with open(g.PLFILE, "wb") as plf:
pickle.dump(g.userpl, plf, protocol=2)
dbg(c.r + "Playlist saved\n---" + c.w)
def open_from_file():
""" Open playlists. Called once on script invocation. """
try:
with open(g.PLFILE, "rb") as plf:
g.userpl = pickle.load(plf)
except IOError:
# no playlist found, create a blank one
if not os.path.isfile(g.PLFILE):
g.userpl = {}
save_to_file()
except AttributeError:
# playlist is from a time when this module was __main__
# https://github.com/np1/mps-youtube/issues/214
import __main__
__main__.Playlist = Playlist
__main__.Video = Video
with open(g.PLFILE, "rb") as plf:
g.userpl = pickle.load(plf)
save_to_file()
screen.msgexit("Updated playlist file. Please restart mpsyt", 1)
except EOFError:
screen.msgexit("Error opening playlists from %s" % g.PLFILE, 1)
# remove any cached urls from playlist file, these are now
# stored in a separate cache file
save = False
for k, v in g.userpl.items():
for song in v.songs:
if hasattr(song, "urls"):
dbg("remove %s: %s", k, song.urls)
del song.urls
save = True
if save:
save_to_file()
def convert_playlist_to_v2():
""" Convert previous playlist file to v2 playlist. """
# skip if previously done
if os.path.isfile(g.PLFILE):
return
# skip if no playlist files exist
elif not os.path.isfile(g.OLD_PLFILE):
return
try:
with open(g.OLD_PLFILE, "rb") as plf:
old_playlists = pickle.load(plf)
except IOError:
sys.exit("Couldn't open old playlist file")
# rename old playlist file
backup = g.OLD_PLFILE + "_v1_backup"
if os.path.isfile(backup):
sys.exit("Error, backup exists but new playlist exists not!")
os.rename(g.OLD_PLFILE, backup)
# do the conversion
for plname, plitem in old_playlists.items():
songs = []
for video in plitem.songs:
v = Video(video['link'], video['title'], video['duration'])
songs.append(v)
g.userpl[plname] = Playlist(plname, songs)
# save as v2
save_to_file()
def logo(col=None, version=""):
""" Return text logo. """
col = col if col else random.choice((c.g, c.r, c.y, c.b, c.p, c.w))
logo_txt = r""" _ _
_ __ ___ _ __ ___ _ _ ___ _ _| |_ _ _| |__ ___
| '_ ` _ \| '_ \/ __|_____| | | |/ _ \| | | | __| | | | '_ \ / _ \
| | | | | | |_) \__ \_____| |_| | (_) | |_| | |_| |_| | |_) | __/
|_| |_| |_| .__/|___/ \__, |\___/ \__,_|\__|\__,_|_.__/ \___|
|_| |___/"""
version = " v" + version if version else ""
logo_txt = col + logo_txt + c.w + version
lines = logo_txt.split("\n")
length = max(len(x) for x in lines)
x, y, _ = getxy()
indent = (x - length - 1) // 2
newlines = (y - 12) // 2
indent, newlines = (0 if x < 0 else x for x in (indent, newlines))
lines = [" " * indent + l for l in lines]
logo_txt = "\n".join(lines) + "\n" * newlines
return "" if g.debug_mode else logo_txt
def playlists_display():
""" Produce a list of all playlists. """
if not g.userpl:
g.message = F("no playlists")
return generate_songlist_display() if g.model else (logo(c.y) + "\n\n")
maxname = max(len(a) for a in g.userpl)
out = " {0}Local Playlists{1}\n".format(c.ul, c.w)
start = " "
fmt = "%s%s%-3s %-" + str(maxname + 3) + "s%s %s%-7s%s %-5s%s"
head = (start, c.b, "ID", "Name", c.b, c.b, "Count", c.b, "Duration", c.w)
out += "\n" + fmt % head + "\n\n"
for v, z in enumerate(sorted(g.userpl)):
n, p = z, g.userpl[z]
l = fmt % (start, c.g, v + 1, n, c.w, c.y, str(len(p)), c.y,
p.duration, c.w) + "\n"
out += l
return out
def mplayer_help(short=True):
""" Mplayer help. """
# pylint: disable=W1402
volume = "[{0}9{1}] volume [{0}0{1}]"
volume = volume if short else volume + " [{0}q{1}] return"
seek = "[{0}\u2190{1}] seek [{0}\u2192{1}]"
pause = "[{0}\u2193{1}] SEEK [{0}\u2191{1}] [{0}space{1}] pause"
if not_utf8_environment:
seek = "[{0}<-{1}] seek [{0}->{1}]"
pause = "[{0}DN{1}] SEEK [{0}UP{1}] [{0}space{1}] pause"
single = "[{0}q{1}] return"
next_prev = "[{0}>{1}] next/prev [{0}<{1}]"
# ret = "[{0}q{1}] %s" % ("return" if short else "next track")
ret = single if short else next_prev
fmt = " %-20s %-20s"
lines = fmt % (seek, volume) + "\n" + fmt % (pause, ret)
return lines.format(c.g, c.w)
def fmt_time(seconds):
""" Format number of seconds to %H:%M:%S. """
hms = time.strftime('%H:%M:%S', time.gmtime(int(seconds)))
H, M, S = hms.split(":")
if H == "00":
hms = M + ":" + S
elif H == "01" and int(M) < 40:
hms = str(int(M) + 60) + ":" + S
elif H.startswith("0"):
hms = ":".join([H[1], M, S])
return hms
def get_track_id_from_json(item):
""" Try to extract video Id from various response types """
fields = ['contentDetails/videoId',
'snippet/resourceId/videoId',
'id/videoId',
'id']
for field in fields:
node = item
for p in field.split('/'):
if node and type(node) is dict:
node = node.get(p)
if node:
return node
return ''
def get_tracks_from_json(jsons):
""" Get search results from API response """
items = jsons.get("items")
if not items:
dbg("got unexpected data or no search results")
return ()
# fetch detailed information about items from videos API
qs = {'part':'contentDetails,statistics,snippet',
'id': ','.join([get_track_id_from_json(i) for i in items])}
wdata = call_gdata('videos', qs)
items_vidinfo = wdata.get('items', [])
# enhance search results by adding information from videos API response
for searchresult, vidinfoitem in zip(items, items_vidinfo):
searchresult.update(vidinfoitem)
# populate list of video objects
songs = []
for item in items:
try:
ytid = get_track_id_from_json(item)
duration = item.get('contentDetails', {}).get('duration')
if duration:
duration = ISO8601_TIMEDUR_EX.findall(duration)
if len(duration) > 0:
_, hours, _, minutes, _, seconds = duration[0]
duration = [seconds, minutes, hours]
duration = [int(v) if len(v) > 0 else 0 for v in duration]
duration = sum([60**p*v for p, v in enumerate(duration)])
else:
duration = 30
else:
duration = 30
stats = item.get('statistics', {})
snippet = item.get('snippet', {})
title = snippet.get('title', '').strip()
# instantiate video representation in local model
cursong = Video(ytid=ytid, title=title, length=duration)
likes = int(stats.get('likeCount', 0))
dislikes = int(stats.get('dislikeCount', 0))
#XXX this is a very poor attempt to calculate a rating value
rating = 5.*likes/(likes+dislikes) if (likes+dislikes) > 0 else 0
category = snippet.get('categoryId')
# cache video information in custom global variable store
g.meta[ytid] = dict(
# tries to get localized title first, fallback to normal title
title=snippet.get('localized',
{'title':snippet.get('title',
'[!!!]')}).get('title',
'[!]'),
length=str(fmt_time(cursong.length)),
rating=str('{}'.format(rating))[:4].ljust(4, "0"),
uploader=snippet.get('channelId'),
uploaderName=snippet.get('channelTitle'),
category=category,
aspect="custom", #XXX
uploaded=yt_datetime(snippet.get('publishedAt', ''))[1],
likes=str(num_repr(likes)),
dislikes=str(num_repr(dislikes)),
commentCount=str(num_repr(int(stats.get('commentCount', 0)))),
viewCount=str(num_repr(int(stats.get('viewCount', 0)))))
except Exception as e:
dbg(json.dumps(item, indent=2))
dbg('Error during metadata extraction/instantiation of search ' +
'result {}\n{}'.format(ytid, e))
songs.append(cursong)
# return video objects
return songs
def playback_progress(idx, allsongs, repeat=False):
""" Generate string to show selected tracks, indicate current track. """
# pylint: disable=R0914
# too many local variables
cw = getxy().width
out = " %s%-XXs%s%s\n".replace("XX", str(cw - 9))
out = out % (c.ul, "Title", "Time", c.w)
show_key_help = (known_player_set and Config.SHOW_MPLAYER_KEYS.get)
multi = len(allsongs) > 1
for n, song in enumerate(allsongs):
length_orig = fmt_time(song.length)
length = " " * (8 - len(length_orig)) + length_orig
i = uea_pad(cw - 14, song.title), length, length_orig
fmt = (c.w, " ", c.b, i[0], c.w, c.y, i[1], c.w)
if n == idx:
fmt = (c.y, "> ", c.p, i[0], c.w, c.p, i[1], c.w)
cur = i
out += "%s%s%s%s%s %s%s%s\n" % fmt
out += "\n" * (3 - len(allsongs))
pos = 8 * " ", c.y, idx + 1, c.w, c.y, len(allsongs), c.w
playing = "{}{}{}{} of {}{}{}\n\n".format(*pos) if multi else "\n\n"
keys = mplayer_help(short=(not multi and not repeat))
out = out if multi else generate_songlist_display(song=allsongs[0])
if show_key_help:
out += "\n" + keys
else:
playing = "{}{}{}{} of {}{}{}\n".format(*pos) if multi else "\n"
out += "\n" + " " * (cw - 19) if multi else ""
fmt = playing, c.r, cur[0].strip()[:cw - 19], c.w, c.w, cur[2], c.w
out += "%s %s%s%s %s[%s]%s" % fmt
out += " REPEAT MODE" if repeat else ""
return out
def num_repr(num):
""" Return up to four digit string representation of a number, eg 2.6m. """
if num <= 9999:
return str(num)
def digit_count(x):
""" Return number of digits. """
return int(math.floor(math.log10(x)) + 1)
digits = digit_count(num)
sig = 3 if digits % 3 == 0 else 2
rounded = int(round(num, int(sig - digits)))
digits = digit_count(rounded)
suffix = "_kmBTqXYX"[(digits - 1) // 3]
front = 3 if digits % 3 == 0 else digits % 3
if not front == 1:
return str(rounded)[0:front] + suffix
return str(rounded)[0] + "." + str(rounded)[1] + suffix
def real_len(u, alt=False):
""" Try to determine width of strings displayed with monospace font. """
if not isinstance(u, str):
u = u.decode("utf8")
u = xenc(u) # Handle replacements of unsuported characters
ueaw = unicodedata.east_asian_width
if alt:
# widths = dict(W=2, F=2, A=1, N=0.75, H=0.5) # original
widths = dict(N=.75, Na=1, W=2, F=2, A=1)
else:
widths = dict(W=2, F=2, A=1, N=1, H=0.5)
return int(round(sum(widths.get(ueaw(char), 1) for char in u)))
def uea_pad(num, t, direction="<", notrunc=False):
""" Right pad with spaces taking into account East Asian width chars. """
direction = direction.strip() or "<"
t = ' '.join(t.split('\n'))
# TODO: Find better way of dealing with this?
if num <= 0:
return ''
if not notrunc:
# Truncate to max of num characters
t = t[:num]
if real_len(t) < num:
spaces = num - real_len(t)
if direction == "<":
t = t + (" " * spaces)
elif direction == ">":
t = (" " * spaces) + t
elif direction == "^":
right = False
while real_len(t) < num:
t = t + " " if right else " " + t
right = not right
return t
def yt_datetime(yt_date_time):
""" Return a time object and locale formated date string. """
time_obj = time.strptime(yt_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
locale_date = time.strftime("%x", time_obj)
# strip first two digits of four digit year
short_date = re.sub(r"(\d\d\D\d\d\D)20(\d\d)$", r"\1\2", locale_date)
return time_obj, short_date
def generate_playlist_display():
""" Generate list of playlists. """
if not g.ytpls:
g.message = c.r + "No playlists found!"
return logo(c.g) + "\n\n"
g.rprompt = content.page_msg(g.current_page)
cw = getxy().width
fmtrow = "%s%-5s %s %-12s %-8s %-2s%s\n"
fmthd = "%s%-5s %-{}s %-12s %-9s %-5s%s\n".format(cw - 36)
head = (c.ul, "Item", "Playlist", "Author", "Updated", "Count", c.w)
out = "\n" + fmthd % head
for n, x in enumerate(g.ytpls):
col = (c.g if n % 2 == 0 else c.w)
length = x.get('size') or "?"
length = "%4s" % length
title = x.get('title') or "unknown"
author = x.get('author') or "unknown"
updated = yt_datetime(x.get('updated'))[1]
title = uea_pad(cw - 36, title)
out += (fmtrow % (col, str(n + 1), title, author[:12], updated, str(length), c.w))
return out + "\n" * (5 - len(g.ytpls))
def get_user_columns():
""" Get columns from user config, return dict. """
total_size = 0
user_columns = Config.COLUMNS.get
user_columns = user_columns.replace(",", " ").split()
defaults = {"views": dict(name="viewCount", size=4, heading="View"),
"rating": dict(name="rating", size=4, heading="Rtng"),
"comments": dict(name="commentCount", size=4, heading="Comm"),
"date": dict(name="uploaded", size=8, heading="Date"),
"user": dict(name="uploaderName", size=10, heading="User"),
"likes": dict(name="likes", size=4, heading="Like"),
"dislikes": dict(name="dislikes", size=4, heading="Dslk"),
"category": dict(name="category", size=8, heading="Category")}
ret = []
for column in user_columns:
namesize = column.split(":")
name = namesize[0]
if name in defaults:
z = defaults[name]
nm, sz, hd = z['name'], z['size'], z['heading']
if len(namesize) == 2 and namesize[1].isdigit():
sz = int(namesize[1])
total_size += sz
cw = getxy().width
if total_size < cw - 18:
ret.append(dict(name=nm, size=sz, heading=hd))
return ret
def generate_songlist_display(song=False, zeromsg=None):
""" Generate list of choices from a song list."""
# pylint: disable=R0914
if g.browse_mode == "ytpl":
return generate_playlist_display()
max_results = getxy().max_results
if not g.model:
g.message = zeromsg or "Enter /search-term to search or [h]elp"
return logo(c.g) + "\n\n"
g.rprompt = content.page_msg(g.current_page)
have_meta = all(x.ytid in g.meta for x in g.model)
user_columns = get_user_columns() if have_meta else []
maxlength = max(x.length for x in g.model)
lengthsize = 8 if maxlength > 35999 else 7
lengthsize = 5 if maxlength < 6000 else lengthsize
reserved = 9 + lengthsize + len(user_columns)
cw = getxy().width
cw -= 1
title_size = cw - sum(1 + x['size'] for x in user_columns) - reserved
before = [{"name": "idx", "size": 3, "heading": "Num"},
{"name": "title", "size": title_size, "heading": "Title"}]
after = [{"name": "length", "size": lengthsize, "heading": "Time"}]
columns = before + user_columns + after
for n, column in enumerate(columns):
column['idx'] = n
column['sign'] = "-" if not column['name'] == "length" else ""
fmt = ["%{}{}s ".format(x['sign'], x['size']) for x in columns]
fmtrow = fmt[0:1] + ["%s "] + fmt[2:]
fmt, fmtrow = "".join(fmt).strip(), "".join(fmtrow).strip()
titles = tuple([x['heading'][:x['size']] for x in columns])
hrow = c.ul + fmt % titles + c.w
out = "\n" + hrow + "\n"
for n, x in enumerate(g.model[:max_results]):
col = (c.r if n % 2 == 0 else c.p) if not song else c.b
details = {'title': x.title, "length": fmt_time(x.length)}
details = copy.copy(g.meta[x.ytid]) if have_meta else details
otitle = details['title']
details['idx'] = "%2d" % (n + 1)
details['title'] = uea_pad(columns[1]['size'], otitle)
cat = details.get('category') or '-'
details['category'] = pafy.get_categoryname(cat)
data = []
for z in columns:
fieldsize, field = z['size'], z['name']
if len(details[field]) > fieldsize:
details[field] = details[field][:fieldsize]
data.append(details[field])
line = fmtrow % tuple(data)
col = col if not song or song != g.model[n] else c.p
line = col + line + c.w
out += line + "\n"
return out + "\n" * (5 - len(g.model)) if not song else out
def playsong(song, failcount=0, override=False):
""" Play song using config.PLAYER called with args config.PLAYERARGS."""
# pylint: disable=R0911,R0912
if not Config.PLAYER.get or not has_exefile(Config.PLAYER.get):
g.message = "Player not configured! Enter %sset player <player_app> "\
"%s to set a player" % (c.g, c.w)
return
if Config.NOTIFIER.get:
subprocess.Popen(shlex.split(Config.NOTIFIER.get) + [song.title])
# don't interrupt preloading:
while song.ytid in g.preloading:
screen.writestatus("fetching item..")
time.sleep(0.1)
try:
streams.get(song, force=failcount, callback=screen.writestatus)
except (IOError, URLError, HTTPError, socket.timeout) as e:
dbg("--ioerror in playsong call to streams.get %s", str(e))
if "Youtube says" in str(e):
g.message = F('cant get track') % (song.title + " " + str(e))
return
elif failcount < g.max_retries:
dbg("--ioerror - trying next stream")
failcount += 1
return playsong(song, failcount=failcount, override=override)
elif "pafy" in str(e):
g.message = str(e) + " - " + song.ytid
return
except ValueError:
g.message = F('track unresolved')
dbg("----valueerror in playsong call to streams.get")
return
try:
video = ((Config.SHOW_VIDEO.get and override != "audio") or
(override in ("fullscreen", "window", "forcevid")))
m4a = "mplayer" not in Config.PLAYER.get
cached = g.streams[song.ytid]
stream = streams.select(cached, q=failcount, audio=(not video), m4a_ok=m4a)
# handle no audio stream available, or m4a with mplayer
# by switching to video stream and suppressing video output.
if (not stream or failcount) and not video:
dbg(c.r + "no audio or mplayer m4a, using video stream" + c.w)
override = "a-v"
video = True
stream = streams.select(cached, q=failcount, audio=False, maxres=1600)
if not stream:
raise IOError("No streams available")
except (HTTPError) as e:
# Fix for invalid streams (gh-65)
dbg("----htterror in playsong call to gen_real_args %s", str(e))
if failcount < g.max_retries:
failcount += 1
return playsong(song, failcount=failcount, override=override)
else:
g.message = str(e)
return
except IOError as e:
# this may be cause by attempting to play a https stream with
# mplayer
# ====
errmsg = e.message if hasattr(e, "message") else str(e)
g.message = c.r + str(errmsg) + c.w
return
size = get_size(song.ytid, stream['url'])
songdata = (song.ytid, stream['ext'] + " " + stream['quality'],
int(size / (1024 ** 2)))
songdata = "%s; %s; %s Mb" % songdata
screen.writestatus(songdata)
returncode = launch_player(song, songdata, override, stream, video)
failed = returncode not in (0, 42, 43)
if failed and failcount < g.max_retries:
dbg(c.r + "stream failed to open" + c.w)
dbg("%strying again (attempt %s)%s", c.r, (2 + failcount), c.w)
screen.writestatus("error: retrying")
time.sleep(1.2)
failcount += 1
return playsong(song, failcount=failcount, override=override)
return returncode
def _search(progtext, qs=None, msg=None, failmsg=None):
""" Perform memoized url fetch, display progtext. """
loadmsg = "Searching for '%s%s%s'" % (c.y, progtext, c.w)
wdata = call_gdata('search', qs)
def iter_songs():
wdata2 = wdata
while True:
for song in get_tracks_from_json(wdata2):
yield song
if not wdata2.get('nextPageToken'):
break
qs['pageToken'] = wdata2['nextPageToken']
wdata2 = call_gdata('search', qs)
slicer = IterSlicer(iter_songs())
# The youtube search api returns a maximum of 500 results
length = min(wdata['pageInfo']['totalResults'], 500)
paginatesongs(slicer, length=length, msg=msg, failmsg=failmsg,
loadmsg=loadmsg)
def token(page):
""" Returns a page token for a given start index. """
index = (page or 0) * getxy().max_results
k = index//128 - 1
index -= 128 * k
f = [8, index]
if k > 0 or index > 127:
f.append(k+1)
f += [16, 0]
b64 = base64.b64encode(bytes(f)).decode('utf8')
return b64.strip('=')
def generate_search_qs(term, match='term'):
""" Return query string. """
aliases = dict(views='viewCount')
qs = {
'q': term,
'maxResults': 50,
'safeSearch': "none",
'order': aliases.get(Config.ORDER.get, Config.ORDER.get),
'part': 'id,snippet',
'type': 'video',
'key': Config.API_KEY.get
}
if match == 'related':
qs['relatedToVideoId'] = term
del qs['q']
if Config.SEARCH_MUSIC.get:
qs['videoCategoryId'] = 10
return qs
def userdata_cached(userterm):
""" Check if user name search term found in cache """
userterm = ''.join([t.strip().lower() for t in userterm.split(' ')])
return g.username_query_cache.get(userterm)
def cache_userdata(userterm, username, channel_id):
""" Cache user name and channel id tuple """
userterm = ''.join([t.strip().lower() for t in userterm.split(' ')])
g.username_query_cache[userterm] = (username, channel_id)
dbg('Cache data for username search query "{}": {} ({})'.format(
userterm, username, channel_id))
while len(g.username_query_cache) > 300:
g.username_query_cache.popitem(last=False)
return (username, channel_id)
def channelfromname(user):
""" Query channel id from username. """
cached = userdata_cached(user)
if cached:
user, channel_id = cached
else:
# if the user is looked for by their display name,
# we have to sent an additional request to find their
# channel id
qs = {'part': 'id,snippet',
'maxResults': 1,
'q': user,
'type': 'channel'}
try:
userinfo = call_gdata('search', qs)['items']
if len(userinfo) > 0:
snippet = userinfo[0].get('snippet', {})
channel_id = snippet.get('channelId', user)
username = snippet.get('title', user)
user = cache_userdata(user, username, channel_id)[0]
else:
g.message = "User {} not found.".format(c.y + user + c.w)
return
except GdataError as e:
g.message = "Could not retrieve information for user {}\n{}".format(
c.y + user + c.w, e)
dbg('Error during channel request for user {}:\n{}'.format(
user, e))
return
# at this point, we know the channel id associated to a user name
return (user, channel_id)
@commands.command(r'user\s+(.+)')
def usersearch(q_user, identify='forUsername'):
""" Fetch uploads by a YouTube user. """
user, _, term = (x.strip() for x in q_user.partition("/"))
if identify == 'forUsername':
ret = channelfromname(user)
if not ret: # Error
return
user, channel_id = ret
else:
channel_id = user
# at this point, we know the channel id associated to a user name
usersearch_id(user, channel_id, term)
def usersearch_id(user, channel_id, term):
""" Performs a search within a user's (i.e. a channel's) uploads
for an optional search term with the user (i.e. the channel)
identified by its ID """
query = generate_search_qs(term)
aliases = dict(views='viewCount') # The value of the config item is 'views' not 'viewCount'
if Config.USER_ORDER.get:
query['order'] = aliases.get(Config.USER_ORDER.get,
Config.USER_ORDER.get)
query['channelId'] = channel_id
termuser = tuple([c.y + x + c.w for x in (term, user)])
if term:
msg = "Results for {1}{3}{0} (by {2}{4}{0})"
progtext = "%s by %s" % termuser
failmsg = "No matching results for %s (by %s)" % termuser
else:
msg = "Video uploads by {2}{4}{0}"
progtext = termuser[1]
if Config.SEARCH_MUSIC:
failmsg = """User %s not found or has no videos in the Music category.
Use 'set search_music False' to show results not in the Music category.""" % termuser[1]
else:
failmsg = "User %s not found or has no videos." % termuser[1]
msg = str(msg).format(c.w, c.y, c.y, term, user)
_search(progtext, query, msg, failmsg)
def related_search(vitem):
""" Fetch uploads by a YouTube user. """
query = generate_search_qs(vitem.ytid, match='related')
if query.get('videoCategoryId'):
del query['videoCategoryId']
t = vitem.title
ttitle = t[:48].strip() + ".." if len(t) > 49 else t
msg = "Videos related to %s%s%s" % (c.y, ttitle, c.w)
failmsg = "Related to %s%s%s not found" % (c.y, vitem.ytid, c.w)
_search(ttitle, query, msg, failmsg)
# Note: [^./] is to prevent overlap with playlist search command
@commands.command(r'(?:search|\.|/)\s*([^./].{1,500})')
def search(term):
""" Perform search. """
if not term or len(term) < 2:
g.message = c.r + "Not enough input" + c.w
g.content = generate_songlist_display()
return
logging.info("search for %s", term)
query = generate_search_qs(term)
msg = "Search results for %s%s%s" % (c.y, term, c.w)
failmsg = "Found nothing for %s%s%s" % (c.y, term, c.w)
_search(term, query, msg, failmsg)
@commands.command(r'u(?:ser)?pl\s(.*)')
def user_pls(user):
""" Retrieve user playlists. """
return pl_search(user, is_user=True)
@commands.command(r'(?:\.\.|\/\/|pls(?:earch)?\s)\s*(.*)')
def pl_search(term, page=0, splash=True, is_user=False):
""" Search for YouTube playlists.
term can be query str or dict indicating user playlist search.
"""
if not term or len(term) < 2:
g.message = c.r + "Not enough input" + c.w
g.content = generate_songlist_display()
return
if splash:
g.content = logo(c.g)
prog = "user: " + term if is_user else term
g.message = "Searching playlists for %s" % c.y + prog + c.w
screen.update()
if is_user:
ret = channelfromname(term)
if not ret: # Error
return
user, channel_id = ret
else:
# playlist search is done with the above url and param type=playlist
logging.info("playlist search for %s", prog)
qs = generate_search_qs(term)
qs['pageToken'] = token(page)
qs['type'] = 'playlist'
if 'videoCategoryId' in qs:
del qs['videoCategoryId'] # Incompatable with type=playlist
pldata = call_gdata('search', qs)
id_list = [i.get('id', {}).get('playlistId')
for i in pldata.get('items', ())]
result_count = min(pldata['pageInfo']['totalResults'], 500)
qs = {'part': 'contentDetails,snippet',
'maxResults': 50}
if is_user:
if page:
qs['pageToken'] = token(page)
qs['channelId'] = channel_id
else:
qs['id'] = ','.join(id_list)
pldata = call_gdata('playlists', qs)
playlists = get_pl_from_json(pldata)[:getxy().max_results]
if is_user:
result_count = pldata['pageInfo']['totalResults']
if playlists:
g.last_search_query = (pl_search, {"term": term, "is_user": is_user})
g.browse_mode = "ytpl"
g.current_page = page
g.result_count = result_count
g.ytpls = playlists
g.message = "Playlist results for %s" % c.y + prog + c.w
g.content = generate_playlist_display()
else:
g.message = "No playlists found for: %s" % c.y + prog + c.w
g.current_page = 0
g.content = generate_songlist_display(zeromsg=g.message)
def get_pl_from_json(pldata):
""" Process json playlist data. """
try:
items = pldata['items']
except KeyError:
items = []
results = []
for item in items:
snippet = item['snippet']
results.append(dict(
link=item["id"],
size=item["contentDetails"]["itemCount"],
title=snippet["title"],
author=snippet["channelTitle"],
created=snippet["publishedAt"],
updated=snippet['publishedAt'], #XXX Not available in API?
description=snippet["description"]))
return results
def fetch_comments(item):
""" Fetch comments for item using gdata. """
# pylint: disable=R0912
# pylint: disable=R0914
ytid, title = item.ytid, item.title
dbg("Fetching comments for %s", c.c("y", ytid))
screen.writestatus("Fetching comments for %s" % c.c("y", title[:55]))
qs = {'textFormat': 'plainText',
'videoId': ytid,
'maxResults': 50,
'part': 'snippet'}
# XXX should comment threads be expanded? this would require
# additional requests for comments responding on top level comments
jsdata = call_gdata('commentThreads', qs)
coms = jsdata.get('items', [])
coms = [x.get('snippet', {}) for x in coms]
coms = [x.get('topLevelComment', {}) for x in coms]
# skip blanks
coms = [x for x in coms if len(x.get('snippet', {}).get('textDisplay', '').strip())]
if not len(coms):
g.message = "No comments for %s" % item.title[:50]
g.content = generate_songlist_display()
return
commentstext = ''
for n, com in enumerate(coms, 1):
snippet = com.get('snippet', {})
poster = snippet.get('authorDisplayName')
_, shortdate = yt_datetime(snippet.get('publishedAt', ''))
text = snippet.get('textDisplay', '')
cid = ("%s/%s" % (n, len(coms)))
commentstext += ("%s %-35s %s\n" % (cid, c.c("g", poster), shortdate))
commentstext += c.c("y", text.strip()) + '\n\n'
g.content = content.StringContent(commentstext)
@commands.command(r'c\s?(\d{1,4})')
def comments(number):
""" Receive use request to view comments. """
if g.browse_mode == "normal":
item = g.model[int(number) - 1]
fetch_comments(item)
else:
g.content = generate_songlist_display()
g.message = "Comments only available for video items"
def _make_fname(song, ext=None, av=None, subdir=None):
"""" Create download directory, generate filename. """
# pylint: disable=E1103
# Instance of 'bool' has no 'extension' member (some types not inferable)
ddir = os.path.join(Config.DDIR.get, subdir) if subdir else Config.DDIR.get
if not os.path.exists(ddir):
os.makedirs(ddir)
if not ext:
stream = streams.select(streams.get(song),
audio=av == "audio", m4a_ok=True)
ext = stream['ext']
# filename = song.title[:59] + "." + ext
filename = song.title + "." + ext
filename = os.path.join(ddir, mswinfn(filename.replace("/", "-")))
filename = filename.replace('"', '')
return filename
def extract_metadata(name):
""" Try to determine metadata from video title. """
seps = name.count(" - ")
artist = title = None
if seps == 1:
pos = name.find(" - ")
artist = name[:pos].strip()
title = name[pos + 3:].strip()
else:
title = name.strip()
return dict(artist=artist, title=title)
def remux_audio(filename, title):
""" Remux audio file. Insert limited metadata tags. """
dbg("starting remux")
temp_file = filename + "." + str(random.randint(10000, 99999))
os.rename(filename, temp_file)
meta = extract_metadata(title)
metadata = ["title=%s" % meta["title"]]
if meta["artist"]:
metadata = ["title=%s" % meta["title"], "-metadata",
"artist=%s" % meta["artist"]]
cmd = [g.muxapp, "-y", "-i", temp_file, "-acodec", "copy", "-metadata"]
cmd += metadata + ["-vn", filename]
dbg(cmd)
try:
with open(os.devnull, "w") as devnull:
subprocess.call(cmd, stdout=devnull, stderr=subprocess.STDOUT)
except OSError:
dbg("Failed to remux audio using %s", g.muxapp)
os.rename(temp_file, filename)
else:
os.unlink(temp_file)
dbg("remuxed audio file using %s" % g.muxapp)
def transcode(filename, enc_data):
""" Re encode a download. """
base = os.path.splitext(filename)[0]
exe = g.muxapp if g.transcoder_path == "auto" else g.transcoder_path
# ensure valid executable
if not exe or not os.path.exists(exe) or not os.access(exe, os.X_OK):
xprint("Encoding failed. Couldn't find a valid encoder :(\n")
time.sleep(2)
return filename
command = shlex.split(enc_data['command'])
newcom, outfn = command[::], ""
for n, d in enumerate(command):
if d == "ENCODER_PATH":
newcom[n] = exe
elif d == "IN":
newcom[n] = filename
elif d == "OUT":
newcom[n] = outfn = base
elif d == "OUT.EXT":
newcom[n] = outfn = base + "." + enc_data['ext']
returncode = subprocess.call(newcom)
if returncode == 0 and g.delete_orig:
os.unlink(filename)
return outfn
def external_download(song, filename, url):
""" Perform download using external application. """
cmd = Config.DOWNLOAD_COMMAND.get
ddir, basename = Config.DDIR.get, os.path.basename(filename)
cmd_list = shlex.split(cmd)
def list_string_sub(orig, repl, lst):
""" Replace substrings for items in a list. """
return [x if orig not in x else x.replace(orig, repl) for x in lst]
cmd_list = list_string_sub("%F", filename, cmd_list)
cmd_list = list_string_sub("%d", ddir, cmd_list)
cmd_list = list_string_sub("%f", basename, cmd_list)
cmd_list = list_string_sub("%u", url, cmd_list)
cmd_list = list_string_sub("%i", song.ytid, cmd_list)
dbg("Downloading using: %s", " ".join(cmd_list))
subprocess.call(cmd_list)
def _download(song, filename, url=None, audio=False, allow_transcode=True):
""" Download file, show status.
Return filename or None in case of user specified download command.
"""
# pylint: disable=R0914
# too many local variables
# Instance of 'bool' has no 'url' member (some types not inferable)
if not url:
stream = streams.select(streams.get(song), audio=audio, m4a_ok=True)
url = stream['url']
# if an external download command is set, use it
if Config.DOWNLOAD_COMMAND.get:
title = c.y + os.path.splitext(os.path.basename(filename))[0] + c.w
xprint("Downloading %s using custom command" % title)
external_download(song, filename, url)
return None
if not Config.OVERWRITE.get:
if os.path.exists(filename):
xprint("File exists. Skipping %s%s%s ..\n" % (c.r, filename, c.w))
time.sleep(0.2)
return filename
xprint("Downloading to %s%s%s .." % (c.r, filename, c.w))
status_string = (' {0}{1:,}{2} Bytes [{0}{3:.2%}{2}] received. Rate: '
'[{0}{4:4.0f} kbps{2}]. ETA: [{0}{5:.0f} secs{2}]')
resp = urlopen(url)
total = int(resp.info()['Content-Length'].strip())
chunksize, bytesdone, t0 = 16384, 0, time.time()
outfh = open(filename, 'wb')
while True:
chunk = resp.read(chunksize)
outfh.write(chunk)
elapsed = time.time() - t0
bytesdone += len(chunk)
rate = (bytesdone / 1024) / elapsed
eta = (total - bytesdone) / (rate * 1024)
stats = (c.y, bytesdone, c.w, bytesdone * 1.0 / total, rate, eta)
if not chunk:
outfh.close()
break
status = status_string.format(*stats)
sys.stdout.write("\r" + status + ' ' * 4 + "\r")
sys.stdout.flush()
active_encoder = g.encoders[Config.ENCODER.get]
ext = filename.split(".")[-1]
valid_ext = ext in active_encoder['valid'].split(",")
if audio and g.muxapp:
remux_audio(filename, song.title)
if Config.ENCODER.get != 0 and valid_ext and allow_transcode:
filename = transcode(filename, active_encoder)
return filename
def _bi_range(start, end):
"""
Inclusive range function, works for reverse ranges.
eg. 5,2 returns [5,4,3,2] and 2, 4 returns [2,3,4]
"""
if start == end:
return (start,)
elif end < start:
return reversed(range(end, start + 1))
else:
return range(start, end + 1)
def _parse_multi(choice, end=None):
""" Handle ranges like 5-9, 9-5, 5- and -5. Return list of ints. """
end = end or str(len(g.model))
pattern = r'(?<![-\d])(\d+-\d+|-\d+|\d+-|\d+)(?![-\d])'
items = re.findall(pattern, choice)
alltracks = []
for x in items:
if x.startswith("-"):
x = "1" + x
elif x.endswith("-"):
x = x + str(end)
if "-" in x:
nrange = x.split("-")
startend = map(int, nrange)
alltracks += _bi_range(*startend)
else:
alltracks.append(int(x))
return alltracks
@commands.command(r'play\s+(%s|\d+)' % commands.word)
def play_pl(name):
""" Play a playlist by name. """
if name.isdigit():
name = int(name)
name = sorted(g.userpl)[name - 1]
saved = g.userpl.get(name)
if not saved:
name = get_near_name(name, g.userpl)
saved = g.userpl.get(name)
if saved:
g.model.songs = list(saved.songs)
play_all("", "", "")
else:
g.message = F("pl not found") % name
g.content = playlists_display()
@commands.command(r'save')
def save_last():
""" Save command with no playlist name. """
if g.last_opened:
open_save_view("save", g.last_opened)
else:
saveas = ""
# save using artist name in postion 1
if g.model:
saveas = g.model[0].title[:18].strip()
saveas = re.sub(r"[^-\w]", "-", saveas, re.UNICODE)
# loop to find next available name
post = 0
while g.userpl.get(saveas):
post += 1
saveas = g.model[0].title[:18].strip() + "-" + str(post)
# Playlists are not allowed to start with a digit
# TODO: Possibly change this, but ban purely numerical names
saveas = saveas.lstrip("0123456789")
open_save_view("save", saveas)
@commands.command(r'(open|save|view)\s*(%s)' % commands.word)
def open_save_view(action, name):
""" Open, save or view a playlist by name. Get closest name match. """
name = name.replace(" ", "-")
if action == "open" or action == "view":
saved = g.userpl.get(name)
if not saved:
name = get_near_name(name, g.userpl)
saved = g.userpl.get(name)
elif action == "open":
g.active.songs = list(saved.songs)
g.last_opened = name
msg = F("pl loaded") % name
paginatesongs(g.active, msg=msg)
elif action == "view":
g.last_opened = ""
msg = F("pl viewed") % name
paginatesongs(list(saved.songs), msg=msg)
elif not saved and action in "view open".split():
g.message = F("pl not found") % name
g.content = playlists_display()
elif action == "save":
if not g.model:
g.message = "Nothing to save. " + F('advise search')
g.content = generate_songlist_display()
else:
g.userpl[name] = Playlist(name, list(g.model.songs))
g.message = F('pl saved') % name
save_to_file()
g.content = generate_songlist_display()
@commands.command(r'(open|view)\s*(\d{1,4})')
def open_view_bynum(action, num):
""" Open or view a saved playlist by number. """
srt = sorted(g.userpl)
name = srt[int(num) - 1]
open_save_view(action, name)
@commands.command(r'(rm|add)\s*(-?\d[-,\d\s]{,250})')
def songlist_rm_add(action, songrange):
""" Remove or add tracks. works directly on user input. """
selection = _parse_multi(songrange)
if action == "add":
duplicate_songs = []
for songnum in selection:
if g.model[songnum - 1] in g.active:
duplicate_songs.append(str(songnum))
g.active.songs.append(g.model[songnum - 1])
d = g.active.duration
g.message = F('added to pl') % (len(selection), len(g.active), d)
if duplicate_songs:
duplicate_songs = ', '.join(sorted(duplicate_songs))
g.message += '\n'
g.message += F('duplicate tracks') % duplicate_songs
elif action == "rm":
selection = sorted(set(selection), reverse=True)
removed = str(tuple(reversed(selection))).replace(",", "")
for x in selection:
g.model.songs.pop(x - 1)
g.message = F('songs rm') % (len(selection), removed)
g.content = generate_songlist_display()
@commands.command(r'(da|dv)\s+((?:\d+\s\d+|-\d|\d+-|\d,)(?:[\d\s,-]*))')
def down_many(dltype, choice, subdir=None):
""" Download multiple items. """
choice = _parse_multi(choice)
choice = list(set(choice))
downsongs = [g.model[int(x) - 1] for x in choice]
temp = g.model[::]
g.model.songs = downsongs[::]
count = len(downsongs)
av = "audio" if dltype.startswith("da") else "video"
msg = ""
def handle_error(message):
""" Handle error in download. """
g.message = message
g.content = disp
screen.update()
time.sleep(2)
g.model.songs.pop(0)
try:
for song in downsongs:
g.result_count = len(g.model)
disp = generate_songlist_display()
title = "Download Queue (%s):%s\n\n" % (av, c.w)
disp = re.sub(r"(Num\s*?Title.*?\n)", title, disp)
g.content = disp
screen.update()
try:
filename = _make_fname(song, None, av=av, subdir=subdir)
except IOError as e:
handle_error("Error for %s: %s" % (song.title, str(e)))
count -= 1
continue
except KeyError:
handle_error("No audio track for %s" % song.title)
count -= 1
continue
try:
_download(song, filename, url=None, audio=av == "audio")
except HTTPError:
handle_error("HTTP Error for %s" % song.title)
count -= 1
continue
g.model.songs.pop(0)
msg = "Downloaded %s items" % count
g.message = "Saved to " + c.g + song.title + c.w
except KeyboardInterrupt:
msg = "Downloads interrupted!"
finally:
g.model.songs = temp[::]
g.message = msg
g.result_count = len(g.model)
g.content = generate_songlist_display()
@commands.command(r'(da|dv)pl\s+%s' % commands.pl)
def down_plist(dltype, parturl):
""" Download YouTube playlist. """
plist(parturl)
dump(False)
title = g.pafy_pls[parturl][0].title
subdir = mswinfn(title.replace("/", "-"))
down_many(dltype, "1-", subdir=subdir)
msg = g.message
plist(parturl)
g.message = msg
@commands.command(r'(da|dv)upl\s+(.*)')
def down_user_pls(dltype, user):
""" Download all user playlists. """
user_pls(user)
for pl in g.ytpls:
down_plist(dltype, pl.get('link'))
return
@commands.command(r'(%s{0,3})([-,\d\s]{1,250})\s*(%s{0,3})$' %
(commands.rs, commands.rs))
def play(pre, choice, post=""):
""" Play choice. Use repeat/random if appears in pre/post. """
# pylint: disable=R0914
# too many local variables
if g.browse_mode == "ytpl":
if choice.isdigit():
return plist(g.ytpls[int(choice) - 1]['link'])
else:
g.message = "Invalid playlist selection: %s" % c.y + choice + c.w
g.content = generate_songlist_display()
return
if not g.model:
g.message = c.r + "There are no tracks to select" + c.w
g.content = g.content or generate_songlist_display()
else:
shuffle = "shuffle" in pre + post
repeat = "repeat" in pre + post
novid = "-a" in pre + post
fs = "-f" in pre + post
nofs = "-w" in pre + post
forcevid = "-v" in pre + post
if ((novid and fs) or (novid and nofs) or (nofs and fs)
or (novid and forcevid)):
raise IOError("Conflicting override options specified")
override = False
override = "audio" if novid else override
override = "fullscreen" if fs else override
override = "window" if nofs else override
if (not fs) and (not nofs):
override = "forcevid" if forcevid else override
selection = _parse_multi(choice)
songlist = [g.model[x - 1] for x in selection]
# cache next result of displayed items
# when selecting a single item
if len(songlist) == 1:
chosen = selection[0] - 1
if len(g.model) > chosen + 1:
preload(g.model[chosen + 1], override=override)
play_range(songlist, shuffle, repeat, override)
@commands.command(r'(%s{0,3})(?:\*|all)\s*(%s{0,3})' %
(commands.rs, commands.rs))
def play_all(pre, choice, post=""):
""" Play all tracks in model (last displayed). shuffle/repeat if req'd."""
options = pre + choice + post
play(options, "1-" + str(len(g.model)))
@commands.command(r'ls')
def ls():
""" List user saved playlists. """
if not g.userpl:
g.message = F('no playlists')
g.content = g.content or generate_songlist_display(zeromsg=g.message)
else:
g.content = playlists_display()
g.message = F('pl help')
@commands.command(r'vp')
def vp():
""" View current working playlist. """
msg = F('current pl')
txt = F('advise add') if g.model else F('advise search')
failmsg = F('pl empty') + " " + txt
paginatesongs(g.active, msg=msg, failmsg=failmsg)
def _preload(song, delay, override):
""" Get streams (runs in separate thread). """
if g.preload_disabled:
return
ytid = song.ytid
g.preloading.append(ytid)
time.sleep(delay)
video = Config.SHOW_VIDEO.get
video = True if override in ("fullscreen", "window", "forcevid") else video
video = False if override == "audio" else video
try:
m4a = "mplayer" not in Config.PLAYER.get
streamlist = streams.get(song)
stream = streams.select(streamlist, audio=not video, m4a_ok=m4a)
if not stream and not video:
# preload video stream, no audio available
stream = streams.select(streamlist, audio=False)
get_size(ytid, stream['url'], preloading=True)
except (ValueError, AttributeError, IOError) as e:
dbg(e) # Fail silently on preload
finally:
g.preloading.remove(song.ytid)
def preload(song, delay=2, override=False):
""" Get streams. """
args = (song, delay, override)
t = threading.Thread(target=_preload, args=args)
t.daemon = True
t.start()
def play_range(songlist, shuffle=False, repeat=False, override=False):
""" Play a range of songs, exit cleanly on keyboard interrupt. """
if shuffle:
random.shuffle(songlist)
n = 0
while 0 <= n <= len(songlist)-1:
song = songlist[n]
g.content = playback_progress(n, songlist, repeat=repeat)
if not g.command_line:
screen.update(fill_blank=False)
hasnext = len(songlist) > n + 1
if hasnext:
preload(songlist[n + 1], override=override)
set_window_title(song.title + " - mpsyt")
try:
returncode = playsong(song, override=override)
except KeyboardInterrupt:
logging.info("Keyboard Interrupt")
xprint(c.w + "Stopping... ")
screen.reset_terminal()
g.message = c.y + "Playback halted" + c.w
break
set_window_title("mpsyt")
if returncode == 42:
n -= 1
elif returncode == 43:
break
else:
n += 1
if n == -1:
n = len(songlist) - 1 if repeat else 0
elif n == len(songlist) and repeat:
n = 0
g.content = generate_songlist_display()
@commands.command(r'(?:help|h)(?:\s+([-_a-zA-Z]+))?')
def show_help(choice):
""" Print help message. """
g.content = get_help(choice)
@commands.command(r'(?:q|quit|exit)')
def quits(showlogo=True):
""" Exit the program. """
if has_readline:
readline.write_history_file(g.READLINE_FILE)
dbg("Saved history file")
cache.save()
screen.clear()
msg = logo(c.r, version=__version__) if showlogo else ""
msg += F("exitmsg", 2)
if Config.CHECKUPDATE.get and showlogo:
try:
url = "https://github.com/np1/mps-youtube/raw/master/VERSION"
v = urlopen(url, timeout=1).read().decode()
v = re.search(r"^version\s*([\d\.]+)\s*$", v, re.MULTILINE)
if v:
v = v.group(1)
if v > __version__:
msg += "\n\nA newer version is available (%s)\n" % v
except (URLError, HTTPError, socket.timeout):
dbg("check update timed out")
screen.msgexit(msg)
def get_dl_data(song, mediatype="any"):
""" Get filesize and metadata for all streams, return dict. """
def mbsize(x):
""" Return size in MB. """
return str(int(x / (1024 ** 2)))
p = get_pafy(song)
dldata = []
text = " [Fetching stream info] >"
streamlist = [x for x in p.allstreams]
if mediatype == "audio":
streamlist = [x for x in p.audiostreams]
l = len(streamlist)
for n, stream in enumerate(streamlist):
sys.stdout.write(text + "-" * n + ">" + " " * (l - n - 1) + "<\r")
sys.stdout.flush()
try:
size = mbsize(stream.get_filesize())
except TypeError:
dbg(c.r + "---Error getting stream size" + c.w)
size = 0
item = {'mediatype': stream.mediatype,
'size': size,
'ext': stream.extension,
'quality': stream.quality,
'notes': stream.notes,
'url': stream.url}
dldata.append(item)
screen.writestatus("")
return dldata, p
def menu_prompt(model, prompt=" > ", rows=None, header=None, theading=None,
footer=None, force=0):
""" Generate a list of choice, returns item from model. """
content = ""
for x in header, theading, rows, footer:
if isinstance(x, list):
for line in x:
content += line + "\n"
elif isinstance(x, str):
content += x + "\n"
g.content = content
screen.update()
choice = input(prompt)
if choice in model:
return model[choice]
elif force:
return menu_prompt(model, prompt, rows, header, theading, footer,
force)
elif not choice.strip():
return False, False
else: # unrecognised input
return False, "abort"
def prompt_dl(song):
""" Prompt user do choose a stream to dl. Return (url, extension). """
# pylint: disable=R0914
dl_data, p = get_dl_data(song)
dl_text = gen_dl_text(dl_data, song, p)
model = [x['url'] for x in dl_data]
ed = enumerate(dl_data)
model = {str(n + 1): (x['url'], x['ext']) for n, x in ed}
url, ext = menu_prompt(model, "Download number: ", *dl_text)
url2 = ext2 = None
mediatype = [i for i in dl_data if i['url'] == url][0]['mediatype']
if mediatype == "video" and g.muxapp and not Config.DOWNLOAD_COMMAND.get:
# offer mux if not using external downloader
dl_data, p = get_dl_data(song, mediatype="audio")
dl_text = gen_dl_text(dl_data, song, p)
au_choices = "1" if len(dl_data) == 1 else "1-%s" % len(dl_data)
footer = [F('-audio') % ext, F('select mux') % au_choices]
dl_text = tuple(dl_text[0:3]) + (footer,)
aext = ("ogg", "m4a")
model = [x['url'] for x in dl_data if x['ext'] in aext]
ed = enumerate(dl_data)
model = {str(n + 1): (x['url'], x['ext']) for n, x in ed}
prompt = "Audio stream: "
url2, ext2 = menu_prompt(model, prompt, *dl_text)
return url, ext, url2, ext2
def gen_dl_text(ddata, song, p):
""" Generate text for dl screen. """
hdr = []
hdr.append(" %s%s%s" % (c.r, song.title, c.w))
author = p.author
hdr.append(c.r + " Uploaded by " + author + c.w)
hdr.append(" [" + fmt_time(song.length) + "]")
hdr.append("")
heading = tuple("Item Format Quality Media Size Notes".split())
fmt = " {0}%-6s %-8s %-13s %-7s %-5s %-16s{1}"
heading = [fmt.format(c.w, c.w) % heading]
heading.append("")
content = []
for n, d in enumerate(ddata):
row = (n + 1, d['ext'], d['quality'], d['mediatype'], d['size'],
d['notes'])
fmt = " {0}%-6s %-8s %-13s %-7s %5s Mb %-16s{1}"
row = fmt.format(c.g, c.w) % row
content.append(row)
content.append("")
footer = "Select [%s1-%s%s] to download or [%sEnter%s] to return"
footer = [footer % (c.y, len(content) - 1, c.w, c.y, c.w)]
return(content, hdr, heading, footer)
@commands.command(r'(dv|da|d|dl|download)\s*(\d{1,4})')
def download(dltype, num):
""" Download a track or playlist by menu item number. """
# This function needs refactoring!
# pylint: disable=R0912
# pylint: disable=R0914
if g.browse_mode == "ytpl" and dltype in ("da", "dv"):
plid = g.ytpls[int(num) - 1]["link"]
down_plist(dltype, plid)
return
elif g.browse_mode == "ytpl":
g.message = "Use da or dv to specify audio / video playlist download"
g.message = c.y + g.message + c.w
g.content = generate_songlist_display()
return
elif g.browse_mode != "normal":
g.message = "Download must refer to a specific video item"
g.message = c.y + g.message + c.w
g.content = generate_songlist_display()
return
screen.writestatus("Fetching video info...")
song = (g.model[int(num) - 1])
best = dltype.startswith("dv") or dltype.startswith("da")
if not best:
try:
# user prompt for download stream
url, ext, url_au, ext_au = prompt_dl(song)
except KeyboardInterrupt:
g.message = c.r + "Download aborted!" + c.w
g.content = generate_songlist_display()
return
if not url or ext_au == "abort":
# abort on invalid stream selection
g.content = generate_songlist_display()
g.message = "%sNo download selected / invalid input%s" % (c.y, c.w)
return
else:
# download user selected stream(s)
filename = _make_fname(song, ext)
args = (song, filename, url)
if url_au and ext_au:
# downloading video and audio stream for muxing
audio = False
filename_au = _make_fname(song, ext_au)
args_au = (song, filename_au, url_au)
else:
audio = ext in ("m4a", "ogg")
kwargs = dict(audio=audio)
elif best:
# set updownload without prompt
url_au = None
av = "audio" if dltype.startswith("da") else "video"
audio = av == "audio"
filename = _make_fname(song, None, av=av)
args = (song, filename)
kwargs = dict(url=None, audio=audio)
try:
# perform download(s)
dl_filenames = [args[1]]
f = _download(*args, **kwargs)
if f:
g.message = "Saved to " + c.g + f + c.w
if url_au:
dl_filenames += [args_au[1]]
_download(*args_au, allow_transcode=False, **kwargs)
except KeyboardInterrupt:
g.message = c.r + "Download halted!" + c.w
try:
for downloaded in dl_filenames:
os.remove(downloaded)
except IOError:
pass
if url_au:
# multiplex
name, ext = os.path.splitext(args[1])
tmpvideoname = name + '.' +str(random.randint(10000, 99999)) + ext
os.rename(args[1], tmpvideoname)
mux_cmd = [g.muxapp, "-i", tmpvideoname, "-i", args_au[1], "-c",
"copy", name + ".mp4"]
try:
subprocess.call(mux_cmd)
g.message = "Saved to :" + c.g + mux_cmd[7] + c.w
os.remove(tmpvideoname)
os.remove(args_au[1])
except KeyboardInterrupt:
g.message = "Audio/Video multiplex aborted!"
g.content = generate_songlist_display()
def prompt_for_exit():
""" Ask for exit confirmation. """
g.message = c.r + "Press ctrl-c again to exit" + c.w
g.content = generate_songlist_display()
screen.update()
try:
userinput = input(c.r + " > " + c.w)
except (KeyboardInterrupt, EOFError):
quits(showlogo=False)
return userinput
@commands.command(r'rmp\s*(\d+|%s)' % commands.word)
def playlist_remove(name):
""" Delete a saved playlist by name - or purge working playlist if *all."""
if name.isdigit() or g.userpl.get(name):
if name.isdigit():
name = int(name) - 1
name = sorted(g.userpl)[name]
del g.userpl[name]
g.message = "Deleted playlist %s%s%s" % (c.y, name, c.w)
g.content = playlists_display()
save_to_file()
else:
g.message = F('pl not found advise ls') % name
g.content = playlists_display()
@commands.command(r'(mv|sw)\s*(\d{1,4})\s*[\s,]\s*(\d{1,4})')
def songlist_mv_sw(action, a, b):
""" Move a song or swap two songs. """
i, j = int(a) - 1, int(b) - 1
if action == "mv":
g.model.songs.insert(j, g.model.songs.pop(i))
g.message = F('song move') % (g.model[j].title, b)
elif action == "sw":
g.model[i], g.model[j] = g.model[j], g.model[i]
g.message = F('song sw') % (min(a, b), max(a, b))
g.content = generate_songlist_display()
@commands.command(r'add\s*(-?\d[-,\d\s]{1,250})(%s)' % commands.word)
def playlist_add(nums, playlist):
""" Add selected song nums to saved playlist. """
nums = _parse_multi(nums)
if not g.userpl.get(playlist):
playlist = playlist.replace(" ", "-")
g.userpl[playlist] = Playlist(playlist)
for songnum in nums:
g.userpl[playlist].songs.append(g.model[songnum - 1])
dur = g.userpl[playlist].duration
f = (len(nums), playlist, len(g.userpl[playlist]), dur)
g.message = F('added to saved pl') % f
if nums:
save_to_file()
g.content = generate_songlist_display()
@commands.command(r'mv\s*(\d{1,3})\s*(%s)' % commands.word)
def playlist_rename_idx(_id, name):
""" Rename a playlist by ID. """
_id = int(_id) - 1
playlist_rename(sorted(g.userpl)[_id] + " " + name)
@commands.command(r'mv\s*(%s\s+%s)' % (commands.word, commands.word))
def playlist_rename(playlists):
""" Rename a playlist using mv command. """
# Deal with old playlist names that permitted spaces
a, b = "", playlists.split(" ")
while a not in g.userpl:
a = (a + " " + (b.pop(0))).strip()
if not b and a not in g.userpl:
g.message = F('no pl match for rename')
g.content = g.content or playlists_display()
return
b = "-".join(b)
g.userpl[b] = Playlist(b)
g.userpl[b].songs = list(g.userpl[a].songs)
playlist_remove(a)
g.message = F('pl renamed') % (a, b)
save_to_file()
@commands.command(r'(rm|add)\s(?:\*|all)')
def add_rm_all(action):
""" Add all displayed songs to current playlist.
remove all displayed songs from view.
"""
if action == "rm":
g.model.songs.clear()
msg = c.b + "Cleared all songs" + c.w
g.content = generate_songlist_display(zeromsg=msg)
elif action == "add":
size = len(g.model)
songlist_rm_add("add", "-" + str(size))
@commands.command(r'(n|p)\s*(\d{1,2})?')
def nextprev(np, page=None):
""" Get next / previous search results. """
if isinstance(g.content, content.PaginatedContent):
page_count = g.content.numPages()
function = g.content.getPage
args = {}
else:
page_count = math.ceil(g.result_count/getxy().max_results)
function, args = g.last_search_query
good = False
if function:
if np == "n":
if g.current_page + 1 < page_count:
g.current_page += 1
good = True
elif np == "p":
if page and int(page) in range(1,20):
g.current_page = int(page)-1
good = True
elif g.current_page > 0:
g.current_page -= 1
good = True
if good:
function(page=g.current_page, **args)
else:
norp = "next" if np == "n" else "previous"
g.message = "No %s items to display" % norp
if not isinstance(g.content, content.PaginatedContent):
g.content = generate_songlist_display()
return good
@commands.command(r'u\s?([\d]{1,4})')
def user_more(num):
""" Show more videos from user of vid num. """
if g.browse_mode != "normal":
g.message = "User uploads must refer to a specific video item"
g.message = c.y + g.message + c.w
g.content = generate_songlist_display()
return
g.current_page = 0
item = g.model[int(num) - 1]
channel_id = g.meta.get(item.ytid, {}).get('uploader')
user = g.meta.get(item.ytid, {}).get('uploaderName')
usersearch_id(user, channel_id, '')
@commands.command(r'r\s?(\d{1,4})')
def related(num):
""" Show videos related to to vid num. """
if g.browse_mode != "normal":
g.message = "Related items must refer to a specific video item"
g.message = c.y + g.message + c.w
g.content = generate_songlist_display()
return
g.current_page = 0
item = g.model[int(num) - 1]
related_search(item)
@commands.command(r'x\s*(\d+)')
def clip_copy(num):
""" Copy item to clipboard. """
if g.browse_mode == "ytpl":
p = g.ytpls[int(num) - 1]
link = "https://youtube.com/playlist?list=%s" % p['link']
elif g.browse_mode == "normal":
item = (g.model[int(num) - 1])
link = "https://youtube.com/watch?v=%s" % item.ytid
else:
g.message = "clipboard copy not valid in this mode"
g.content = generate_songlist_display()
return
if has_pyperclip:
try:
pyperclip.copy(link)
g.message = c.y + link + c.w + " copied"
g.content = generate_songlist_display()
except Exception as e:
xprint(link)
xprint("Error - couldn't copy to clipboard.")
xprint(e.__doc__)
xprint("")
input("Press Enter to continue.")
g.content = generate_songlist_display()
else:
g.message = "pyperclip module must be installed for clipboard support\n"
g.message += "see https://pypi.python.org/pypi/pyperclip/"
g.content = generate_songlist_display()
@commands.command(r'mix\s*(\d{1,4})')
def mix(num):
""" Retrieves the YouTube mix for the selected video. """
g.content = g.content or generate_songlist_display()
if g.browse_mode != "normal":
g.message = F('mix only videos')
else:
item = (g.model[int(num) - 1])
if item is None:
g.message = F('invalid item')
return
item = get_pafy(item)
# Mix playlists are made up of 'RD' + video_id
try:
plist("RD" + item.videoid)
except OSError:
g.message = F('no mix')
@commands.command(r'i\s*(\d{1,4})')
def info(num):
""" Get video description. """
if g.browse_mode == "ytpl":
p = g.ytpls[int(num) - 1]
# fetch the playlist item as it has more metadata
if p['link'] in g.pafy_pls:
ytpl = g.pafy_pls[p['link']][0]
else:
g.content = logo(col=c.g)
g.message = "Fetching playlist info.."
screen.update()
dbg("%sFetching playlist using pafy%s", c.y, c.w)
ytpl = pafy.get_playlist2(p['link'])
g.pafy_pls[p['link']] = (ytpl, IterSlicer(ytpl))
ytpl_desc = ytpl.description
g.content = generate_songlist_display()
created = yt_datetime(p['created'])[0]
updated = yt_datetime(p['updated'])[0]
out = c.ul + "Playlist Info" + c.w + "\n\n"
out += p['title']
out += "\n" + ytpl_desc
out += ("\n\nAuthor : " + p['author'])
out += "\nSize : " + str(p['size']) + " videos"
out += "\nCreated : " + time.strftime("%x %X", created)
out += "\nUpdated : " + time.strftime("%x %X", updated)
out += "\nID : " + str(p['link'])
out += ("\n\n%s[%sPress enter to go back%s]%s" % (c.y, c.w, c.y, c.w))
g.content = out
elif g.browse_mode == "normal":
g.content = logo(c.b)
screen.update()
screen.writestatus("Fetching video metadata..")
item = (g.model[int(num) - 1])
streams.get(item)
p = get_pafy(item)
pub = time.strptime(str(p.published), "%Y-%m-%d %H:%M:%S")
screen.writestatus("Fetched")
out = c.ul + "Video Info" + c.w + "\n\n"
out += p.title or ""
out += "\n" + (p.description or "")
out += "\n\nAuthor : " + str(p.author)
out += "\nPublished : " + time.strftime("%c", pub)
out += "\nView count : " + str(p.viewcount)
out += "\nRating : " + str(p.rating)[:4]
out += "\nLikes : " + str(p.likes)
out += "\nDislikes : " + str(p.dislikes)
out += "\nCategory : " + str(p.category)
out += "\nLink : " + "https://youtube.com/watch?v=%s" % p.videoid
out += "\n\n%s[%sPress enter to go back%s]%s" % (c.y, c.w, c.y, c.w)
g.content = out
@commands.command(r'playurl\s(.*[-_a-zA-Z0-9]{11}[^\s]*)(\s-(?:f|a|w))?')
def play_url(url, override):
""" Open and play a youtube video url. """
override = override if override else "_"
g.browse_mode = "normal"
yt_url(url, print_title=1)
if len(g.model) == 1:
play(override, "1", "_")
if g.command_line:
sys.exit()
@commands.command(r'browserplay\s(\d{1,50})')
def browser_play(number):
"""Open a previously searched result in the browser."""
if (len(g.model) == 0):
g.message = c.r + "No previous search." + c.w
g.content = logo(c.r)
return
try:
index = int(number) - 1
if (0 <= index < len(g.model)):
base_url = "https://www.youtube.com/watch?v="
video = g.model[index]
url = base_url + video.ytid
webbrowser.open(url)
g.content = g.content or generate_songlist_display()
else:
g.message = c.r + "Out of range." + c.w
g.content = g.content or generate_songlist_display()
return
except (HTTPError, URLError, Exception) as e:
g.message = c.r + str(e) + c.w
g.content = g.content or generate_songlist_display()
return
@commands.command(r'dlurl\s(.*[-_a-zA-Z0-9]{11}.*)')
def dl_url(url):
""" Open and prompt for download of youtube video url. """
g.browse_mode = "normal"
yt_url(url)
if len(g.model) == 1:
download("download", "1")
if g.command_line:
sys.exit()
@commands.command(r'daurl\s(.*[-_a-zA-Z0-9]{11}.*)')
def da_url(url):
""" Open and prompt for download of youtube best audio from url. """
g.browse_mode = "normal"
yt_url(url)
if len(g.model) == 1:
download("da", "1")
if g.command_line:
sys.exit()
@commands.command(r'url\s(.*[-_a-zA-Z0-9]{11}.*)')
def yt_url(url, print_title=0):
""" Acess videos by urls. """
url_list = url.split()
g.model.songs = []
for u in url_list:
try:
p = pafy.new(u)
except (IOError, ValueError) as e:
g.message = c.r + str(e) + c.w
g.content = g.content or generate_songlist_display(zeromsg=g.message)
return
g.browse_mode = "normal"
v = Video(p.videoid, p.title, p.length)
g.model.songs.append(v)
if not g.command_line:
g.content = generate_songlist_display()
if print_title:
xprint(v.title)
@commands.command(r'url_file\s(\S+)')
def yt_url_file(file_name):
""" Access a list of urls in a text file """
#Open and read the file
try:
with open(file_name, "r") as fo:
output = ' '.join([line.strip() for line in fo if line.strip()])
except (IOError):
g.message = c.r + 'Error while opening the file, check the validity of the path' + c.w
g.content = g.content or generate_songlist_display(zeromsg=g.message)
return
#Finally pass the input to yt_url
yt_url(output)
@commands.command(r'(un)?dump')
def dump(un):
""" Show entire playlist. """
func, args = g.last_search_query
if func is paginatesongs:
paginatesongs(dumps=(not un), **args)
else:
un = "" if not un else un
g.message = "%s%sdump%s may only be used on an open YouTube playlist"
g.message = g.message % (c.y, un, c.w)
g.content = generate_songlist_display()
def paginatesongs(func, page=0, splash=True, dumps=False,
length=None, msg=None, failmsg=None, loadmsg=None):
if splash:
g.message = loadmsg or ''
g.content = logo(col=c.b)
screen.update()
max_results = getxy().max_results
if dumps:
s = 0
e = None
else:
s = page * max_results
e = (page + 1) * max_results
if callable(func):
songs = func(s, e)
else:
songs = func[s:e]
if length is None:
length = len(func)
args = {'func':func, 'length':length, 'msg':msg,
'failmsg':failmsg, 'loadmsg': loadmsg}
g.last_search_query = (paginatesongs, args)
g.browse_mode = "normal"
g.current_page = page
g.result_count = length
g.model.songs = songs
g.content = generate_songlist_display()
g.last_opened = ""
g.message = msg or ''
if not songs:
g.message = failmsg or g.message
if songs:
# preload first result url
preload(songs[0], delay=0)
@commands.command(r'pl\s+%s' % commands.pl)
def plist(parturl):
""" Retrieve YouTube playlist. """
if parturl in g.pafy_pls:
ytpl, plitems = g.pafy_pls[parturl]
else:
dbg("%sFetching playlist using pafy%s", c.y, c.w)
ytpl = pafy.get_playlist2(parturl)
plitems = IterSlicer(ytpl)
g.pafy_pls[parturl] = (ytpl, plitems)
def pl_seg(s, e):
return [Video(i.videoid, i.title, i.length) for i in plitems[s:e]]
msg = "Showing YouTube playlist %s" % (c.y + ytpl.title + c.w)
loadmsg = "Retrieving YouTube playlist"
paginatesongs(pl_seg, length=len(ytpl), msg=msg, loadmsg=loadmsg)
@commands.command(r'shuffle')
def shuffle_fn():
""" Shuffle displayed items. """
random.shuffle(g.model.songs)
g.message = c.y + "Items shuffled" + c.w
g.content = generate_songlist_display()
@commands.command(r'clearcache')
def clearcache():
""" Clear cached items - for debugging use. """
g.pafs = {}
g.streams = {}
dbg("%scache cleared%s", c.p, c.w)
g.message = "cache cleared"
def show_message(message, col=c.r, update=False):
""" Show message using col, update screen if required. """
g.content = generate_songlist_display()
g.message = col + message + c.w
if update:
screen.update()
def _do_query(url, query, err='query failed', report=False):
""" Perform http request using mpsyt user agent header.
if report is True, return whether response is from memo
"""
# create url opener
ua = "mps-youtube/%s ( %s )" % (__version__, __url__)
mpsyt_opener = build_opener()
mpsyt_opener.addheaders = [('User-agent', ua)]
# convert query to sorted list of tuples (needed for consistent url_memo)
query = [(k, query[k]) for k in sorted(query.keys())]
url = "%s?%s" % (url, urlencode(query))
try:
wdata = mpsyt_opener.open(url).read().decode()
except (URLError, HTTPError) as e:
g.message = "%s: %s (%s)" % (err, e, url)
g.content = logo(c.r)
return None if not report else (None, False)
return wdata if not report else (wdata, False)
def _best_song_match(songs, title, duration):
""" Select best matching song based on title, length.
Score from 0 to 1 where 1 is best.
"""
# pylint: disable=R0914
seqmatch = difflib.SequenceMatcher
def variance(a, b):
""" Return difference ratio. """
return float(abs(a - b)) / max(a, b)
candidates = []
ignore = "music video lyrics new lyrics video audio".split()
extra = "official original vevo".split()
for song in songs:
dur, tit = int(song.length), song.title
dbg("Title: %s, Duration: %s", tit, dur)
for word in extra:
if word in tit.lower() and word not in title.lower():
pattern = re.compile(word, re.I)
tit = pattern.sub("", tit)
for word in ignore:
if word in tit.lower() and word not in title.lower():
pattern = re.compile(word, re.I)
tit = pattern.sub("", tit)
replacechars = re.compile(r"[\]\[\)\(\-]")
tit = replacechars.sub(" ", tit)
multiple_spaces = re.compile(r"(\s)(\s*)")
tit = multiple_spaces.sub(r"\1", tit)
title_score = seqmatch(None, title.lower(), tit.lower()).ratio()
duration_score = 1 - variance(duration, dur)
dbg("Title score: %s, Duration score: %s", title_score,
duration_score)
# apply weightings
score = duration_score * .5 + title_score * .5
candidates.append((score, song))
best_score, best_song = max(candidates, key=lambda x: x[0])
percent_score = int(100 * best_score)
return best_song, percent_score
def _match_tracks(artist, title, mb_tracks):
""" Match list of tracks in mb_tracks by performing multiple searches. """
# pylint: disable=R0914
dbg("artists is %s", artist)
dbg("title is %s", title)
title_artist_str = c.g + title + c.w, c.g + artist + c.w
xprint("\nSearching for %s by %s\n\n" % title_artist_str)
def dtime(x):
""" Format time to M:S. """
return time.strftime('%M:%S', time.gmtime(int(x)))
# do matching
for track in mb_tracks:
ttitle = track['title']
length = track['length']
xprint("Search : %s%s - %s%s - %s" % (c.y, artist, ttitle, c.w,
dtime(length)))
q = "%s %s" % (artist, ttitle)
w = q = ttitle if artist == "Various Artists" else q
query = generate_search_qs(w, 0)
dbg(query)
# perform fetch
wdata = call_gdata('search', query)
results = get_tracks_from_json(wdata)
if not results:
xprint(c.r + "Nothing matched :(\n" + c.w)
continue
s, score = _best_song_match(results, artist + " " + ttitle, length)
cc = c.g if score > 85 else c.y
cc = c.r if score < 75 else cc
xprint("Matched: %s%s%s - %s \n[%sMatch confidence: "
"%s%s]\n" % (c.y, s.title, c.w, fmt_time(s.length),
cc, score, c.w))
yield s
def _get_mb_tracks(albumid):
""" Get track listing from MusicBraiz by album id. """
ns = {'mb': 'http://musicbrainz.org/ns/mmd-2.0#'}
url = "http://musicbrainz.org/ws/2/release/" + albumid
query = {"inc": "recordings"}
wdata = _do_query(url, query, err='album search error')
if not wdata:
return None
root = ET.fromstring(wdata)
tlist = root.find("./mb:release/mb:medium-list/mb:medium/mb:track-list",
namespaces=ns)
mb_songs = tlist.findall("mb:track", namespaces=ns)
tracks = []
path = "./mb:recording/mb:"
for track in mb_songs:
try:
title, length, rawlength = "unknown", 0, 0
title = track.find(path + "title", namespaces=ns).text
rawlength = track.find(path + "length", namespaces=ns).text
length = int(round(float(rawlength) / 1000))
except (ValueError, AttributeError):
xprint("not found")
tracks.append(dict(title=title, length=length, rawlength=rawlength))
return tracks
def _get_mb_album(albumname, **kwa):
""" Return artist, album title and track count from MusicBrainz. """
url = "http://musicbrainz.org/ws/2/release/"
qargs = dict(
release='"%s"' % albumname,
primarytype=kwa.get("primarytype", "album"),
status=kwa.get("status", "official"))
qargs.update({k: '"%s"' % v for k, v in kwa.items()})
qargs = ["%s:%s" % item for item in qargs.items()]
qargs = {"query": " AND ".join(qargs)}
g.message = "Album search for '%s%s%s'" % (c.y, albumname, c.w)
wdata = _do_query(url, qargs)
if not wdata:
return None
ns = {'mb': 'http://musicbrainz.org/ns/mmd-2.0#'}
root = ET.fromstring(wdata)
rlist = root.find("mb:release-list", namespaces=ns)
if int(rlist.get('count')) == 0:
return None
album = rlist.find("mb:release", namespaces=ns)
artist = album.find("./mb:artist-credit/mb:name-credit/mb:artist",
namespaces=ns).find("mb:name", namespaces=ns).text
title = album.find("mb:title", namespaces=ns).text
aid = album.get('id')
return dict(artist=artist, title=title, aid=aid)
@commands.command(r'album\s*(.{0,500})')
def search_album(term):
"""Search for albums. """
# pylint: disable=R0914,R0912
if not term:
show_message("Enter album name:", c.g, update=True)
term = input("> ")
if not term or len(term) < 2:
g.message = c.r + "Not enough input!" + c.w
g.content = generate_songlist_display()
return
album = _get_mb_album(term)
if not album:
show_message("Album '%s' not found!" % term)
return
out = "'%s' by %s%s%s\n\n" % (album['title'],
c.g, album['artist'], c.w)
out += ("[Enter] to continue, [q] to abort, or enter artist name for:\n"
" %s" % (c.y + term + c.w + "\n"))
prompt = "Artist? [%s] > " % album['artist']
xprint(prompt, end="")
artistentry = input().strip()
if artistentry:
if artistentry == "q":
show_message("Album search abandoned!")
return
album = _get_mb_album(term, artist=artistentry)
if not album:
show_message("Album '%s' by '%s' not found!" % (term, artistentry))
return
title, artist = album['title'], album['artist']
mb_tracks = _get_mb_tracks(album['aid'])
if not mb_tracks:
show_message("Album '%s' by '%s' has 0 tracks!" % (title, artist))
return
msg = "%s%s%s by %s%s%s\n\n" % (c.g, title, c.w, c.g, artist, c.w)
msg += "Enter to begin matching or [q] to abort"
g.message = msg
g.content = "Tracks:\n"
for n, track in enumerate(mb_tracks, 1):
g.content += "%02s %s" % (n, track['title'])
g.content += "\n"
screen.update()
entry = input("Continue? [Enter] > ")
if entry == "":
pass
else:
show_message("Album search abandoned!")
return
songs = []
screen.clear()
itt = _match_tracks(artist, title, mb_tracks)
stash = Config.SEARCH_MUSIC.get, Config.ORDER.get
Config.SEARCH_MUSIC.value = True
Config.ORDER.value = "relevance"
try:
songs.extend(itt)
except KeyboardInterrupt:
xprint("%sHalted!%s" % (c.r, c.w))
finally:
Config.SEARCH_MUSIC.value, Config.ORDER.value = stash
if songs:
xprint("\n%s / %s songs matched" % (len(songs), len(mb_tracks)))
input("Press Enter to continue")
msg = "Contents of album %s%s - %s%s %s(%d/%d)%s:" % (
c.y, artist, title, c.w, c.b, len(songs), len(mb_tracks), c.w)
failmsg = "Found no album tracks for %s%s%s" % (c.y, title, c.w)
paginatesongs(songs, msg=msg, failmsg=failmsg)
@commands.command(r'encoders?')
def show_encs():
""" Display available encoding presets. """
out = "%sEncoding profiles:%s\n\n" % (c.ul, c.w)
for x, e in enumerate(g.encoders):
sel = " (%sselected%s)" % (c.y, c.w) if Config.ENCODER.get == x else ""
out += "%2d. %s%s\n" % (x, e['name'], sel)
g.content = out
message = "Enter %sset encoder <num>%s to select an encoder"
g.message = message % (c.g, c.w)
def matchfunction(func, regex, userinput):
""" Match userinput against regex.
Call func, return True if matches.
"""
# Not supported in python 3.3 or lower
# match = regex.fullmatch(userinput)
# if match:
match = regex.match(userinput)
if match and match.group(0) == userinput:
matches = match.groups()
dbg("input: %s", userinput)
dbg("function call: %s", func.__name__)
dbg("regx matches: %s", matches)
try:
func(*matches)
except IndexError:
if g.debug_mode:
g.content = ''.join(traceback.format_exception(
*sys.exc_info()))
g.message = F('invalid range')
g.content = g.content or generate_songlist_display()
except (ValueError, IOError) as e:
if g.debug_mode:
g.content = ''.join(traceback.format_exception(
*sys.exc_info()))
g.message = F('cant get track') % str(e)
g.content = g.content or\
generate_songlist_display(zeromsg=g.message)
except GdataError as e:
if g.debug_mode:
g.content = ''.join(traceback.format_exception(
*sys.exc_info()))
g.message = F('no data') % e
g.content = g.content
return True
def main():
""" Main control loop. """
set_window_title("mpsyt")
if not g.command_line:
g.content = logo(col=c.g, version=__version__) + "\n\n"
g.message = "Enter /search-term to search or [h]elp"
screen.update()
# open playlists from file
convert_playlist_to_v2()
open_from_file()
arg_inp = ' '.join(g.argument_commands)
prompt = "> "
arg_inp = arg_inp.replace(r",,", "[mpsyt-comma]")
arg_inp = arg_inp.split(",")
while True:
next_inp = ""
if len(arg_inp):
next_inp = arg_inp.pop(0).strip()
next_inp = next_inp.replace("[mpsyt-comma]", ",")
try:
userinput = next_inp or input(prompt).strip()
except (KeyboardInterrupt, EOFError):
userinput = prompt_for_exit()
for i in g.commands:
if matchfunction(i.function, i.regex, userinput):
break
else:
g.content = g.content or generate_songlist_display()
if g.command_line:
g.content = ""
if userinput and not g.command_line:
g.message = c.b + "Bad syntax. Enter h for help" + c.w
elif userinput and g.command_line:
sys.exit("Bad syntax")
screen.update()
Be the first to comment
You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.