You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

880 lines
34 KiB

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
"""scdl allows you to download music from Soundcloud
Usage:
scdl (-l <track_url> | me) [-a | -f | -C | -t | -p | -r][-c | --force-metadata][-n <maxtracks>]
[-o <offset>][--hidewarnings][--debug | --error][--path <path>][--addtofile][--addtimestamp]
[--onlymp3][--hide-progress][--min-size <size>][--max-size <size>][--remove][--no-album-tag]
[--no-playlist-folder][--download-archive <file>][--extract-artist][--flac][--original-art]
[--original-name][--no-original][--only-original][--name-format <format>][--strict-playlist]
[--playlist-name-format <format>][--client-id <id>][--auth-token <token>][--overwrite][--no-playlist]
scdl -h | --help
scdl --version
Options:
-h --help Show this screen
--version Show version
-l [url] URL can be track/playlist/user
-n [maxtracks] Download the n last tracks of a playlist according to the creation date
-s Download the stream of a user (token needed)
-a Download all tracks of user (including reposts)
-t Download all uploads of a user (no reposts)
-f Download all favorites of a user
-C Download all commented by a user
-p Download all playlists of a user
-r Download all reposts of user
-c Continue if a downloaded file already exists
--force-metadata This will set metadata on already downloaded track
-o [offset] Begin with a custom offset
--addtimestamp Add track creation timestamp to filename,
which allows for chronological sorting
--addtofile Add artist to filename if missing
--debug Set log level to DEBUG
--download-archive [file] Keep track of track IDs in an archive file,
and skip already-downloaded files
--error Set log level to ERROR
--extract-artist Set artist tag from title instead of username
--hide-progress Hide the wget progress bar
--hidewarnings Hide Warnings. (use with precaution)
--max-size [max-size] Skip tracks larger than size (k/m/g)
--min-size [min-size] Skip tracks smaller than size (k/m/g)
--no-playlist-folder Download playlist tracks into main directory,
instead of making a playlist subfolder
--onlymp3 Download only the streamable mp3 file,
even if track has a Downloadable file
--path [path] Use a custom path for downloaded files
--remove Remove any files not downloaded from execution
--flac Convert original files to .flac
--no-album-tag On some player track get the same cover art if from the same album, this prevent it
--original-art Download original cover art
--original-name Do not change name of original file downloads
--no-original Do not download original file; only mp3 or m4a
--only-original Only download songs with original file available
--name-format [format] Specify the downloaded file name format
--playlist-name-format [format] Specify the downloaded file name format, if it is being downloaded as part of a playlist
--client-id [id] Specify the client_id to use
--auth-token [token] Specify the auth token to use
--overwrite Overwrite file if it already exists
--strict-playlist Abort playlist downloading if one track fails to download
--no-playlist Skip downloading playlists
"""
import cgi
import configparser
import itertools
import logging
import math
import mimetypes
mimetypes.init()
import os
import pathlib
import shutil
import subprocess
import sys
import tempfile
import time
import traceback
import urllib.parse
import warnings
from dataclasses import asdict
import mutagen
from mutagen.easymp4 import EasyMP4
EasyMP4.RegisterTextKey("website", "purl")
import requests
from clint.textui import progress
from docopt import docopt
from pathvalidate import sanitize_filename
from soundcloud import (BasicAlbumPlaylist, BasicTrack, MiniTrack, SoundCloud,
Transcoding)
from scdl import __version__, utils
logging.basicConfig(level=logging.INFO, format="%(message)s")
logging.getLogger("requests").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addFilter(utils.ColorizeFilter())
fileToKeep = []
class SoundCloudException(Exception):
pass
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
logger.error("\nGoodbye!")
else:
logger.error("".join(traceback.format_exception(exc_type, exc_value, exc_traceback)))
sys.exit(1)
sys.excepthook = handle_exception
def main():
"""
Main function, parses the URL from command line arguments
"""
# exit if ffmpeg not installed
if not is_ffmpeg_available():
logger.error("ffmpeg is not installed")
sys.exit(1)
# Parse arguments
arguments = docopt(__doc__, version=__version__)
if arguments["--debug"]:
logger.level = logging.DEBUG
elif arguments["--error"]:
logger.level = logging.ERROR
if "XDG_CONFIG_HOME" in os.environ:
config_file = pathlib.Path(os.environ["XDG_CONFIG_HOME"], "scdl", "scdl.cfg")
else:
config_file = pathlib.Path.home().joinpath(".config", "scdl", "scdl.cfg")
# import conf file
config = get_config(config_file)
logger.info("Soundcloud Downloader")
logger.debug(arguments)
client_id = arguments["--client-id"] or config["scdl"]["client_id"]
token = arguments["--auth-token"] or config["scdl"]["auth_token"]
client = SoundCloud(client_id, token if token else None)
if not client.is_client_id_valid():
if arguments["--client-id"]:
logger.error(f"Invalid client_id specified by --client-id argument. Using a dynamically generated client_id...")
elif config["scdl"]["client_id"]:
logger.error(f"Invalid client_id in {config_file}. Using a dynamically generated client_id...")
client = SoundCloud(None, token if token else None)
if not client.is_client_id_valid():
logger.error("Dynamically generated client_id is not valid")
sys.exit(1)
if (token or arguments["me"]) and not client.is_auth_token_valid():
if arguments["--auth-token"]:
logger.error(f"Invalid auth_token specified by --auth-token argument")
else:
logger.error(f"Invalid auth_token in {config_file}")
sys.exit(1)
if arguments["-o"] is not None:
try:
arguments["--offset"] = int(arguments["-o"]) - 1
if arguments["--offset"] < 0:
raise ValueError()
except Exception:
logger.error("Offset should be a positive integer...")
sys.exit(1)
logger.debug("offset: %d", arguments["--offset"])
if arguments["--min-size"] is not None:
try:
arguments["--min-size"] = utils.size_in_bytes(arguments["--min-size"])
except Exception:
logger.exception(
"Min size should be an integer with a possible unit suffix"
)
sys.exit(1)
logger.debug("min-size: %d", arguments["--min-size"])
if arguments["--max-size"] is not None:
try:
arguments["--max-size"] = utils.size_in_bytes(arguments["--max-size"])
except Exception:
logger.error("Max size should be an integer with a possible unit suffix")
sys.exit(1)
logger.debug("max-size: %d", arguments["--max-size"])
if arguments["--hidewarnings"]:
warnings.filterwarnings("ignore")
if not arguments["--name-format"]:
arguments["--name-format"] = config["scdl"]["name_format"]
if not arguments["--playlist-name-format"]:
arguments["--playlist-name-format"] = config["scdl"]["playlist_name_format"]
if arguments["me"]:
# set url to profile associated with auth token
arguments["-l"] = client.get_me().permalink_url
arguments["-l"] = validate_url(client, arguments["-l"])
# convert arguments dict to python_args (kwargs-friendly args)
python_args = {}
for key, value in arguments.items():
key = key.strip("-").replace("-", "_")
python_args[key] = value
# change download path
path = arguments["--path"] or config["scdl"]["path"]
if os.path.exists(path):
os.chdir(path)
else:
if arguments["--path"]:
logger.error(f"Invalid download path '{path}' specified by --path argument")
else:
logger.error(f"Invalid download path '{path}' in {config_file}")
sys.exit(1)
logger.debug("Downloading to " + os.getcwd() + "...")
download_url(client, **python_args)
if arguments["--remove"]:
remove_files()
def validate_url(client: SoundCloud, url: str):
"""
If url is a valid soundcloud.com url, return it.
Otherwise, try to fix the url so that it is valid.
If it cannot be fixed, exit the program.
"""
if url.startswith("https://m.soundcloud.com") or url.startswith("http://m.soundcloud.com") or url.startswith("m.soundcloud.com"):
url = url.replace("m.", "", 1)
if url.startswith("https://www.soundcloud.com") or url.startswith("http://www.soundcloud.com") or url.startswith("www.soundcloud.com"):
url = url.replace("www.", "", 1)
if url.startswith("soundcloud.com"):
url = "https://" + url
if url.startswith("https://soundcloud.com") or url.startswith("http://soundcloud.com"):
url = urllib.parse.urljoin(url, urllib.parse.urlparse(url).path)
return url
# see if link redirects to soundcloud.com
try:
resp = requests.get(url)
if url.startswith("https://soundcloud.com") or url.startswith("http://soundcloud.com"):
return urllib.parse.urljoin(resp.url, urllib.parse.urlparse(resp.url).path)
except Exception:
# see if given a username instead of url
if client.resolve(f"https://soundcloud.com/{url}"):
return f"https://soundcloud.com/{url}"
logger.error("URL is not valid")
sys.exit(1)
def get_config(config_file: pathlib.Path) -> configparser.ConfigParser:
"""
Gets config from scdl.cfg
"""
config = configparser.ConfigParser()
default_config_file = pathlib.Path(__file__).with_name("scdl.cfg")
# load default config first
config.read_file(open(default_config_file, encoding="UTF-8"))
# load config file if it exists
if config_file.exists():
config.read_file(open(config_file, encoding="UTF-8"))
# save config to disk
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, "w", encoding="UTF-8") as f:
config.write(f)
return config
def download_url(client: SoundCloud, **kwargs):
"""
Detects if a URL is a track or a playlist, and parses the track(s)
to the track downloader
"""
url = kwargs.get("l")
item = client.resolve(url)
logger.debug(item)
offset = kwargs.get("offset", 0)
if not item:
logger.error("URL is not valid")
sys.exit(1)
elif item.kind == "track":
logger.info("Found a track")
download_track(client, item, **kwargs)
elif item.kind == "playlist":
logger.info("Found a playlist")
download_playlist(client, item, playlist_offset=offset, **kwargs)
elif item.kind == "user":
user = item
logger.info("Found a user profile")
if kwargs.get("f"):
logger.info(f"Retrieving all likes of user {user.username}...")
resources = client.get_user_likes(user.id, limit=1000)
for i, like in itertools.islice(enumerate(resources, 1), offset, None):
logger.info(f"like n°{i} of {user.likes_count}")
if hasattr(like, "track"):
download_track(client, like.track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs)
elif hasattr(like, "playlist"):
download_playlist(client, client.get_playlist(like.playlist.id), **kwargs)
else:
logger.error(f"Unknown like type {like}")
if kwargs.get("strict_playlist"):
sys.exit(1)
logger.info(f"Downloaded all likes of user {user.username}!")
elif kwargs.get("C"):
logger.info(f"Retrieving all commented tracks of user {user.username}...")
resources = client.get_user_comments(user.id, limit=1000)
for i, comment in itertools.islice(enumerate(resources, 1), offset, None):
logger.info(f"comment n°{i} of {user.comments_count}")
download_track(client, client.get_track(comment.track.id), exit_on_fail=kwargs.get("strict_playlist"), **kwargs)
logger.info(f"Downloaded all commented tracks of user {user.username}!")
elif kwargs.get("t"):
logger.info(f"Retrieving all tracks of user {user.username}...")
resources = client.get_user_tracks(user.id, limit=1000)
for i, track in itertools.islice(enumerate(resources, 1), offset, None):
logger.info(f"track n°{i} of {user.track_count}")
download_track(client, track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs)
logger.info(f"Downloaded all tracks of user {user.username}!")
elif kwargs.get("a"):
logger.info(f"Retrieving all tracks & reposts of user {user.username}...")
resources = client.get_user_stream(user.id, limit=1000)
for i, item in itertools.islice(enumerate(resources, 1), offset, None):
logger.info(f"item n°{i} of {user.track_count + user.reposts_count if user.reposts_count else '?'}")
if item.type in ("track", "track-repost"):
download_track(client, item.track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs)
elif item.type in ("playlist", "playlist-repost"):
download_playlist(client, item.playlist, **kwargs)
else:
logger.error(f"Unknown item type {item.type}")
if kwargs.get("strict_playlist"):
sys.exit(1)
logger.info(f"Downloaded all tracks & reposts of user {user.username}!")
elif kwargs.get("p"):
logger.info(f"Retrieving all playlists of user {user.username}...")
resources = client.get_user_playlists(user.id, limit=1000)
for i, playlist in itertools.islice(enumerate(resources, 1), offset, None):
logger.info(f"playlist n°{i} of {user.playlist_count}")
download_playlist(client, playlist, **kwargs)
logger.info(f"Downloaded all playlists of user {user.username}!")
elif kwargs.get("r"):
logger.info(f"Retrieving all reposts of user {user.username}...")
resources = client.get_user_reposts(user.id, limit=1000)
for i, item in itertools.islice(enumerate(resources, 1), offset, None):
logger.info(f"item n°{i} of {user.reposts_count or '?'}")
if item.type == "track-repost":
download_track(client, item.track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs)
elif item.type == "playlist-repost":
download_playlist(client, item.playlist, **kwargs)
else:
logger.error(f"Unknown item type {item.type}")
if kwargs.get("strict_playlist"):
sys.exit(1)
logger.info(f"Downloaded all reposts of user {user.username}!")
else:
logger.error("Please provide a download type...")
sys.exit(1)
else:
logger.error(f"Unknown item type {item.kind}")
sys.exit(1)
def remove_files():
"""
Removes any pre-existing tracks that were not just downloaded
"""
logger.info("Removing local track files that were not downloaded...")
files = [f for f in os.listdir(".") if os.path.isfile(f)]
for f in files:
if f not in fileToKeep:
os.remove(f)
def download_playlist(client: SoundCloud, playlist: BasicAlbumPlaylist, **kwargs):
"""
Downloads a playlist
"""
if kwargs.get("no_playlist"):
logger.info("Skipping playlist...")
return
playlist_name = playlist.title.encode("utf-8", "ignore")
playlist_name = playlist_name.decode("utf-8")
playlist_name = sanitize_filename(playlist_name)
if not kwargs.get("no_playlist_folder"):
if not os.path.exists(playlist_name):
os.makedirs(playlist_name)
os.chdir(playlist_name)
try:
if kwargs.get("n"): # Order by creation date and get the n lasts tracks
playlist.tracks.sort(
key=lambda track: track.id, reverse=True
)
playlist.tracks = playlist.tracks[: int(kwargs.get("n"))]
else:
del playlist.tracks[: kwargs.get("playlist_offset", 0)]
tracknumber_digits = len(str(len(playlist.tracks)))
for counter, track in itertools.islice(enumerate(playlist.tracks, 1), kwargs.get("playlist_offset", 0), None):
logger.debug(track)
logger.info(f"Track n°{counter}")
playlist_info = {
"author": playlist.user.username,
"id": playlist.id,
"title": playlist.title,
"tracknumber": str(counter).zfill(tracknumber_digits),
}
if isinstance(track, MiniTrack):
if playlist.secret_token:
track = client.get_tracks([track.id], playlist.id, playlist.secret_token)[0]
else:
track = client.get_track(track.id)
download_track(client, track, playlist_info, kwargs.get("strict_playlist"), **kwargs)
finally:
if not kwargs.get("no_playlist_folder"):
os.chdir("..")
def try_utime(path, filetime):
try:
os.utime(path, (time.time(), filetime))
except Exception:
logger.error("Cannot update utime of file")
def get_filename(track: BasicTrack, original_filename=None, aac=False, playlist_info=None, **kwargs):
username = track.user.username
title = track.title.encode("utf-8", "ignore").decode("utf-8")
if kwargs.get("addtofile"):
if username not in title and "-" not in title:
title = "{0} - {1}".format(username, title)
logger.debug('Adding "{0}" to filename'.format(username))
timestamp = str(int(track.created_at.timestamp()))
if kwargs.get("addtimestamp"):
title = timestamp + "_" + title
if not kwargs.get("addtofile") and not kwargs.get("addtimestamp"):
if playlist_info:
title = kwargs.get("playlist_name_format").format(**asdict(track), playlist=playlist_info, timestamp=timestamp)
else:
title = kwargs.get("name_format").format(**asdict(track), timestamp=timestamp)
ext = ".m4a" if aac else ".mp3" # contain aac in m4a to write metadata
if original_filename is not None:
original_filename = original_filename.encode("utf-8", "ignore").decode("utf-8")
ext = os.path.splitext(original_filename)[1]
filename = limit_filename_length(title, ext)
filename = sanitize_filename(filename)
return filename
def download_original_file(client: SoundCloud, track: BasicTrack, title: str, playlist_info=None, **kwargs):
logger.info("Downloading the original file.")
# Get the requests stream
url = client.get_track_original_download(track.id, track.secret_token)
if not url:
logger.info("Could not get original download link")
return (None, False)
r = requests.get(url, stream=True)
if r.status_code == 401:
logger.info("The original file has no download left.")
return (None, False)
if r.status_code == 404:
logger.info("Could not get name from stream - using basic name")
return (None, False)
# Find filename
header = r.headers.get("content-disposition")
_, params = cgi.parse_header(header)
if "filename" in params:
filename = urllib.parse.unquote(params["filename"], encoding="utf-8")
else:
raise SoundCloudException(f"Could not get filename from content-disposition header: {header}")
if not kwargs.get("original_name"):
filename, ext = os.path.splitext(filename)
# Find file extension
mime = r.headers.get("content-type")
ext = ext or mimetypes.guess_extension(mime)
filename += ext
filename = get_filename(track, filename, playlist_info=playlist_info, **kwargs)
logger.debug(f"filename : {filename}")
# Skip if file ID or filename already exists
if already_downloaded(track, title, filename, **kwargs):
if kwargs.get("flac") and can_convert(filename):
filename = filename[:-4] + ".flac"
return (filename, True)
# Write file
total_length = int(r.headers.get("content-length"))
min_size = kwargs.get("min_size") or 0
max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size
if not min_size <= total_length <= max_size:
raise SoundCloudException("File not within --min-size and --max-size bounds")
temp = tempfile.NamedTemporaryFile(delete=False)
received = 0
with temp as f:
for chunk in progress.bar(
r.iter_content(chunk_size=1024),
expected_size=(total_length / 1024) + 1,
hide=True if kwargs.get("hide_progress") else False,
):
if chunk:
received += len(chunk)
f.write(chunk)
f.flush()
if received != total_length:
logger.error("connection closed prematurely, download incomplete")
sys.exit(1)
shutil.move(temp.name, os.path.join(os.getcwd(), filename))
if kwargs.get("flac") and can_convert(filename):
logger.info("Converting to .flac...")
newfilename = limit_filename_length(filename[:-4], ".flac")
commands = ["ffmpeg", "-i", filename, newfilename, "-loglevel", "error"]
logger.debug(f"Commands: {commands}")
subprocess.call(commands)
os.remove(filename)
filename = newfilename
return (filename, False)
def get_transcoding_m3u8(client: SoundCloud, transcoding: Transcoding, **kwargs):
url = transcoding.url
bitrate_KBps = 256 / 8 if "aac" in transcoding.preset else 128 / 8
total_bytes = bitrate_KBps * transcoding.duration
min_size = kwargs.get("min_size") or 0
max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size
if not min_size <= total_bytes <= max_size:
raise SoundCloudException("File not within --min-size and --max-size bounds")
if url is not None:
headers = client.get_default_headers()
if client.auth_token:
headers["Authorization"] = f"OAuth {client.auth_token}"
r = requests.get(url, params={"client_id": client.client_id}, headers=headers)
logger.debug(r.url)
return r.json()["url"]
def download_hls(client: SoundCloud, track: BasicTrack, title: str, playlist_info=None, **kwargs):
if not track.media.transcodings:
raise SoundCloudException(f"Track {track.permalink_url} has no transcodings available")
logger.debug(f"Trancodings: {track.media.transcodings}")
aac_transcoding = None
mp3_transcoding = None
for t in track.media.transcodings:
if t.format.protocol == "hls" and "aac" in t.preset:
aac_transcoding = t
elif t.format.protocol == "hls" and "mp3" in t.preset:
mp3_transcoding = t
aac = False
transcoding = None
if not kwargs.get("onlymp3") and aac_transcoding:
transcoding = aac_transcoding
aac = True
elif mp3_transcoding:
transcoding = mp3_transcoding
if not transcoding:
raise SoundCloudException(f"Could not find mp3 or aac transcoding. Available transcodings: {[t.preset for t in track.media.transcodings if t.format.protocol == 'hls']}")
filename = get_filename(track, None, aac, playlist_info, **kwargs)
logger.debug(f"filename : {filename}")
# Skip if file ID or filename already exists
if already_downloaded(track, title, filename, **kwargs):
return (filename, True)
# Get the requests stream
url = get_transcoding_m3u8(client, transcoding, **kwargs)
filename_path = os.path.abspath(filename)
p = subprocess.Popen(
["ffmpeg", "-i", url, "-c", "copy", filename_path, "-loglevel", "error"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
if stderr:
logger.error(stderr.decode("utf-8"))
return (filename, False)
def download_track(client: SoundCloud, track: BasicTrack, playlist_info=None, exit_on_fail=True, **kwargs):
"""
Downloads a track
"""
try:
title = track.title
title = title.encode("utf-8", "ignore").decode("utf-8")
logger.info(f"Downloading {title}")
# Not streamable
if not track.streamable:
logger.warning("Track is not streamable...")
# Geoblocked track
if track.policy == "BLOCK":
raise SoundCloudException(f"{title} is not available in your location...")
# Downloadable track
filename = None
is_already_downloaded = False
if (
track.downloadable
and track.has_downloads_left
and not kwargs["onlymp3"]
and not kwargs.get("no_original")
):
filename, is_already_downloaded = download_original_file(client, track, title, playlist_info, **kwargs)
if filename is None:
if kwargs.get("only_original"):
raise SoundCloudException(f'Track "{track.permalink_url}" does not have original file available. Not downloading...')
filename, is_already_downloaded = download_hls(client, track, title, playlist_info, **kwargs)
if kwargs.get("remove"):
fileToKeep.append(filename)
record_download_archive(track, **kwargs)
# Skip if file ID or filename already exists
if is_already_downloaded and not kwargs.get("force_metadata"):
raise SoundCloudException(f"{filename} already downloaded.")
# If file does not exist an error occurred
if not os.path.isfile(filename):
raise SoundCloudException(f"An error occurred downloading {filename}.")
# Try to set the metadata
if (
filename.endswith(".mp3")
or filename.endswith(".flac")
or filename.endswith(".m4a")
):
try:
set_metadata(track, filename, playlist_info, **kwargs)
except Exception:
os.remove(filename)
logger.exception("Error trying to set the tags...")
raise SoundCloudException("Error trying to set the tags...")
else:
logger.error("This type of audio doesn't support tagging...")
# Try to change the real creation date
filetime = int(time.mktime(track.created_at.timetuple()))
try_utime(filename, filetime)
logger.info(f"{filename} Downloaded.\n")
except SoundCloudException as err:
logger.error(err)
if exit_on_fail:
sys.exit(1)
def can_convert(filename):
ext = os.path.splitext(filename)[1]
return "wav" in ext or "aif" in ext
def already_downloaded(track: BasicTrack, title: str, filename: str, **kwargs):
"""
Returns True if the file has already been downloaded
"""
already_downloaded = False
if os.path.isfile(filename):
already_downloaded = True
if kwargs.get("overwrite"):
os.remove(filename)
already_downloaded = False
if (
kwargs.get("flac")
and can_convert(filename)
and os.path.isfile(filename[:-4] + ".flac")
):
already_downloaded = True
if kwargs.get("overwrite"):
os.remove(filename[:-4] + ".flac")
already_downloaded = False
if kwargs.get("download_archive") and in_download_archive(track, **kwargs):
already_downloaded = True
if kwargs.get("flac") and can_convert(filename) and os.path.isfile(filename):
already_downloaded = False
if already_downloaded:
if kwargs.get("c") or kwargs.get("remove") or kwargs.get("force_metadata"):
return True
else:
logger.error(f'Track "{title}" already exists!')
logger.error("Exiting... (run again with -c to continue)")
sys.exit(1)
return False
def in_download_archive(track: BasicTrack, **kwargs):
"""
Returns True if a track_id exists in the download archive
"""
if not kwargs.get("download_archive"):
return
archive_filename = kwargs.get("download_archive")
try:
with open(archive_filename, "a+", encoding="utf-8") as file:
file.seek(0)
track_id = str(track.id)
for line in file:
if line.strip() == track_id:
return True
except IOError as ioe:
logger.error("Error trying to read download archive...")
logger.error(ioe)
return False
def record_download_archive(track: BasicTrack, **kwargs):
"""
Write the track_id in the download archive
"""
if not kwargs.get("download_archive"):
return
archive_filename = kwargs.get("download_archive")
try:
with open(archive_filename, "a", encoding="utf-8") as file:
file.write(f"{track.id}\n")
except IOError as ioe:
logger.error("Error trying to write to download archive...")
logger.error(ioe)
def set_metadata(track: BasicTrack, filename: str, playlist_info=None, **kwargs):
"""
Sets the mp3 file metadata using the Python module Mutagen
"""
logger.info("Setting tags...")
artwork_url = track.artwork_url
user = track.user
if not artwork_url:
artwork_url = user.avatar_url
response = None
if kwargs.get("original_art"):
new_artwork_url = artwork_url.replace("large", "original")
try:
response = requests.get(new_artwork_url, stream=True)
if response.headers["Content-Type"] not in (
"image/png",
"image/jpeg",
"image/jpg",
):
response = None
except Exception:
pass
if response is None:
new_artwork_url = artwork_url.replace("large", "t500x500")
response = requests.get(new_artwork_url, stream=True)
if response.headers["Content-Type"] not in (
"image/png",
"image/jpeg",
"image/jpg",
):
response = None
if response is None:
logger.error(f"Could not get cover art at {new_artwork_url}")
with tempfile.NamedTemporaryFile() as out_file:
if response:
shutil.copyfileobj(response.raw, out_file)
out_file.seek(0)
track.date = track.created_at.strftime("%Y-%m-%d %H::%M::%S")
track.artist = user.username
if kwargs.get("extract_artist"):
for dash in [" - ", "", "", "", ""]:
if dash in track.title:
artist_title = track.title.split(dash)
track.artist = artist_title[0].strip()
track.title = artist_title[1].strip()
break
audio = mutagen.File(filename, easy=True)
audio.delete()
audio["title"] = track.title
audio["artist"] = track.artist
if track.genre:
audio["genre"] = track.genre
if track.permalink_url:
audio["website"] = track.permalink_url
if track.date:
audio["date"] = track.date
if playlist_info:
if not kwargs.get("no_album_tag"):
audio["album"] = playlist_info["title"]
audio["tracknumber"] = str(playlist_info["tracknumber"])
audio.save()
a = mutagen.File(filename)
if track.description:
if a.__class__ == mutagen.flac.FLAC:
a["description"] = track.description
elif a.__class__ == mutagen.mp3.MP3:
a["COMM"] = mutagen.id3.COMM(
encoding=3, lang="ENG", text=track.description
)
elif a.__class__ == mutagen.mp4.MP4:
a["\xa9cmt"] = track.description
if response:
if a.__class__ == mutagen.flac.FLAC:
p = mutagen.flac.Picture()
p.data = out_file.read()
p.mime = "image/jpeg"
p.type = mutagen.id3.PictureType.COVER_FRONT
a.add_picture(p)
elif a.__class__ == mutagen.mp3.MP3:
a["APIC"] = mutagen.id3.APIC(
encoding=3,
mime="image/jpeg",
type=3,
desc="Cover",
data=out_file.read(),
)
elif a.__class__ == mutagen.mp4.MP4:
a["covr"] = [mutagen.mp4.MP4Cover(out_file.read())]
a.save()
def limit_filename_length(name: str, ext: str, max_bytes=255):
while len(name.encode("utf-8")) + len(ext.encode("utf-8")) > max_bytes:
name = name[:-1]
return name + ext
def is_ffmpeg_available():
"""
Returns true if ffmpeg is available in the operating system
"""
return shutil.which("ffmpeg") is not None
if __name__ == "__main__":
main()