Initial Commit

This commit is contained in:
har0ke 2019-09-11 19:30:11 +02:00
commit 473fa92029
16 changed files with 893 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.idea/*
*.pyc
venv/*

27
README.md Normal file
View File

@ -0,0 +1,27 @@
# iTunesToM3ULibrary
Converts iTunes library playlsits/folders/lately added into M3U playlists.
## Use
```
python3 main.py /path/to/config.json
```
## Configuration file
JSON file should consist of a dictionary containing settings.
```json
{
"setting1": "value1",
"setting2": "value2"
}
```
For an overview about all settings refer to [settings.py](https://github.com/okehargens/iTunesToM3ULibrary/blob/master/src/configuration.py).
Example settings files `configs/settings_*.py`. For example:
[settings_base.json](https://github.com/okehargens/iTunesToM3ULibrary/blob/master/configs/settings_base.json) &
[settings_X3.json](https://github.com/okehargens/iTunesToM3ULibrary/blob/master/configs/settings_X3.json)
## Disclaimer
Use on your own responsibility. I take no responsibility for data loss, that may be caused by misconfiguration or bugs.

9
configs/settings_X3.json Normal file
View File

@ -0,0 +1,9 @@
{
"extends": "settings_base.json",
"STORAGE_PATH_SEPARATOR": "\\",
"RELATIVE_PATHS": false,
"STORAGE_M3U_MEDIA_PATH": "TF1:\\media",
"STORAGE_COPY_MEDIA_PATH": "/run/media/oke/X3/media/",
"STORAGE_COPY_PLAYLIST_PATH": "/run/media/oke/X3/playlists/"
}

View File

@ -0,0 +1,17 @@
{
"IGNORE_FOLDERS": false,
"CLEANUP_MEDIA_COPY_STORAGE": true,
"ITUNES_LIBRARY": "/run/user/1000/gvfs/ftp:host=syn-storage.local/music/iTunes/iTunes Library.xml",
"ITUNES_MEDIA_PATH": "/run/user/1000/gvfs/ftp:host=syn-storage.local/music/iTunes/iTunes Media/",
"ITUNES_MEDIA_DB_PATH": "Y:/iTunes/iTunes Media/",
"RELATIVE_PATHS": true,
"STORAGE_M3U_MEDIA_PATH": null,
"STORAGE_PATH_SEPARATOR": null,
"STORAGE_COPY_MEDIA_PATH": null,
"STORAGE_COPY_PLAYLIST_PATH": null,
"INCLUDE": ["all"],
"EXCLUDE": ["get_Library", "get_Movies", "get_Downloaded", "get_Audiobooks", "get_TV Shows"]
}

View File

@ -0,0 +1,10 @@
{
"CLEANUP_MEDIA_COPY_STORAGE": false,
"extends": "settings_base.json",
"STORAGE_PATH_SEPARATOR": "/",
"RELATIVE_PATHS": true,
"STORAGE_M3U_MEDIA_PATH": null,
"STORAGE_COPY_MEDIA_PATH": null,
"STORAGE_COPY_PLAYLIST_PATH": "~/Music/playlists/"
}

0
src/__init__.py Normal file
View File

View File

@ -0,0 +1,4 @@
from .library import Library
from .playlist import Playlist
from .song import Song

View File

@ -0,0 +1,91 @@
import os
from abc import abstractmethod
from .song import Song
from .playlist import Playlist
class Library(object):
def __init__(self, media_path):
self.media_path = media_path
self.play_lists_by_id = {}
self.play_lists_by_name = {}
self.play_list_by_full_name = {}
self.play_lists = []
self.songs = {}
self.load_songs()
self.load_playlists()
@abstractmethod
def load_songs(self):
"""
load songs into this library and add them via add_songs method
"""
pass
@abstractmethod
def load_playlists(self):
"""
load playlists into library and add them via add_playlists method
:return:
"""
pass
def add_playlists(self, pls):
"""
add playlists managed by this library
:param pls: playlists to add
"""
for pl in pls:
self.play_lists_by_id[pl.id] = pl
if pl.name not in self.play_lists_by_name:
self.play_lists_by_name[pl.name] = []
self.play_lists_by_name[pl.name].append(pl)
self.play_lists.append(pl)
self.add_songs(pl.tracks)
for p in self.play_lists:
self.play_list_by_full_name[self.get_playlist_full_name(p)] = p
def add_songs(self, songs):
"""
adds songs to this library to manage
:param songs: list of songs
"""
d_songs = {}
for song in songs:
d_songs[song.track_id] = song
self.songs.update(d_songs)
def get_playlist_full_name(self, pl):
"""
:param pl: a playlist of this library
:return: the full hierachy name of this playlist
"""
def resolve(entry, path=""):
if entry.parent_id:
return resolve(self.play_lists_by_id[entry.parent_id], os.path.join(entry.name, path))
return os.path.join(entry.name, path)
return resolve(self.play_lists_by_id[pl.id])[:-1]
def get_playlist_by_name(self, playlistName):
"""
:param playlistName: name of playlist
:return: list of playlist, containing playlists with this name
"""
return self.play_lists_by_name[playlistName]
def get_play_lists_by_full_name(self, name):
"""
:param name: full name of playlist
:return: playlist going by this name
"""
return self.play_list_by_full_name[name]
def find_play_lists_by_full_name(self, name):
"""
:param name: substring
:return: all playlist's, which's full name contains the substing
"""
return [self.play_list_by_full_name[k] for k in self.play_list_by_full_name.keys() if name in k]

View File

@ -0,0 +1,43 @@
import copy
import itertools
import os
from utils import fix_path
class Playlist(object):
def __init__(self, name, id, parent_id, library, tracks=None):
self.name = name
self.tracks = tracks or []
self.id = id
self.parent_id = parent_id
self.library = library
def get_m3u_content(self, media_path, storage):
device_media_path_list = storage.device_media_path.split(storage.storage_path_separator)
def is_not_empty_string(s):
return s != ''
def c_path(path):
if storage.use_related_paths:
rel_p = storage.storage_path_separator.join(["."] + [".." for i in range(len(self.library.get_playlist_full_name(self).split(os.sep)) - 1)])
rel_p_to_m = os.path.relpath(storage.media_path, storage.playlist_path)
if storage.ignore_folders:
return storage.storage_path_separator.join([rel_p_to_m, path.replace(media_path, "")])
else:
return storage.storage_path_separator.join([rel_p, storage.storage_path_separator.join([rel_p_to_m, path.replace(media_path, "")])])
return storage.storage_path_separator.join(filter(is_not_empty_string,
itertools.chain(device_media_path_list + fix_path(path).replace(media_path, "").split(os.sep))))
return "\n".join([c_path(t.location) for t in self.sorted_tracks if t.location])
@property
def sorted_tracks(self):
tracks = copy.copy(self.tracks)
tracks.sort(key=lambda a: (a.track_number if a.track_number is not None else 5))
return tracks
def __repr__(self):
return u"<Playlist: %s>" % str(self)
def __str__(self):
return self.library.get_playlist_hierarchy(self)

40
src/base_library/song.py Normal file
View File

@ -0,0 +1,40 @@
class Song(object):
def __init__(self):
self.name = None
self.track_id = None
self.artist = None
self.album_artist = None
self.composer = None
self.album = None
self.genre = None
self.kind = None
self.size = None
self.total_time = None
self.track_number = None
self.track_count = None
self.disc_number = None
self.disc_count = None
self.year = None
self.date_modified = None
self.date_added = None
self.bit_rate = None
self.sample_rate = None
self.comments = None
self.rating = None
self.rating_computed = None
self.album_rating = None
self.play_count = None
self.skip_count = None
self.skip_date = None
self.location = None
self.compilation = None
self.grouping = None
self.lastplayed = None
self.length = None
def __str__(self):
return self.name
def __repr__(self):
return "<Song: %s>" % str(self)

81
src/configuration.py Normal file
View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
import json
import codecs
import os
class Configuration(object):
@classmethod
def load_json_recursive(cls, fn):
with codecs.open(fn, encoding="utf-8") as f:
js = json.loads(f.read())
if "extends" in js:
base_fn = os.path.join(os.path.dirname(fn), js["extends"])
base_js = cls.load_json_recursive(base_fn)
base_js.update(js)
return base_js
return js
def __init__(self, conf_fn):
js = self.load_json_recursive(conf_fn)
# Path where actual iTunes Media is stored.
self.itunes_media_path = js["ITUNES_MEDIA_PATH"]
# Path that iTunes accesses Media (for example if iTunes runs on VM).
# By default this should match ITUNES_MEDIA_PATH
self.itunes_media_db_path = js["ITUNES_MEDIA_DB_PATH"]
# Path to copy media to
self.storage_copy_media_path = js["STORAGE_COPY_MEDIA_PATH"]
# Path path to copy playlist files to
self.storage_copy_playlist_path = js["STORAGE_COPY_PLAYLIST_PATH"]
# The iTunes library
self.itunes_library = str(js["ITUNES_LIBRARY"])
# Whether to user relative paths in M3U files or not.
self.relative_paths = js["RELATIVE_PATHS"]
self.cleanup_media_copy_storage = js["CLEANUP_MEDIA_COPY_STORAGE"]
# if you got folders in your iTunes Library. If IGNORE_FOLDERS = True, all M3U files will be put directly in
# STORAGE_COPY_PLAYLIST_PATH. If IGNORE_FOLDERS = False, M3U files will be put in folders,
# matching the iTunes folders
self.ignore_folders = js["IGNORE_FOLDERS"]
# absolute path for MP3 Device to STORAGE_COPY_MEDIA. Only for M3U absolute path creation
# if RELATED_PATH = False.
self.storage_m3u_media_path = js["STORAGE_M3U_MEDIA_PATH"]
# path separator on device
self.storage_path_separator = js["STORAGE_PATH_SEPARATOR"]
# latest created by this tool
self.include = js["INCLUDE"]
self.exclude = js["EXCLUDE"]
if self.storage_copy_media_path is None:
self.storage_copy_media_path = self.itunes_media_path
if self.storage_m3u_media_path is None:
self.storage_m3u_media_path = self.storage_copy_media_path
# self.storage_m3u_media_path = os.path.abspath(os.path.expanduser(self.storage_m3u_media_path))
self.storage_copy_media_path = os.path.abspath(os.path.expanduser(self.storage_copy_media_path))
self.storage_copy_playlist_path = os.path.abspath(os.path.expanduser(self.storage_copy_playlist_path))
self.itunes_media_path = os.path.abspath(os.path.expanduser(self.itunes_media_path))
# self.itunes_media_db_path = os.path.abspath(os.path.expanduser(self.storage_m3u_media_path))
while self.itunes_media_db_path[-1] == "/":
self.itunes_media_db_path = self.itunes_media_db_path[:-1]
while self.storage_m3u_media_path[-1] == self.storage_path_separator:
self.storage_m3u_media_path = self.storage_m3u_media_path[:-1]
if not os.path.exists(self.itunes_media_path):
raise Exception("library path does not exist: %s" % self.itunes_media_path)
if not os.path.exists(self.storage_copy_media_path):
raise Exception("media path does not exist: %s" % self.storage_copy_media_path)
if not os.path.exists(self.storage_copy_playlist_path):
raise Exception("playlist path does not exist: %s" % self.storage_copy_playlist_path)

147
src/itunes_library.py Normal file
View File

@ -0,0 +1,147 @@
import hashlib
import io
import os
import plistlib
import time
from urllib import parse as urlparse
from datetime import date
from time import mktime
from utils import path_insensitive, fix_path
from base_library import Library, Song, Playlist
class ITunesLibrary(Library):
def __init__(self, media_path, xml_file, media_internal_path, files_only=False):
self.media_internal_path = media_internal_path
self.files_only = files_only
print("Load xml file")
with open(xml_file, "rb") as f:
bio = io.BytesIO(f.read()) # gvfs files are not always seekable
self.il = plistlib.load(bio)
super(ITunesLibrary, self).__init__(media_path)
def load_songs(self):
date_format = "%Y-%m-%d %H:%M:%S"
songs = []
print("Validating tracks")
total = len(self.il['Tracks'])
for idx, (trackid, attributes) in enumerate(self.il['Tracks'].items()):
print("\r % 2d%%" % int(100. * idx / total), end="")
s = Song()
s.name = attributes.get('Name')
s.track_id = int(attributes.get('Track ID'))
s.artist = attributes.get('Artist')
s.album_artist = attributes.get('Album Artist')
s.composer = attributes.get('Composer')
s.album = attributes.get('Album')
s.genre = attributes.get('Genre')
s.kind = attributes.get('Kind')
if attributes.get('Size'):
s.size = int(attributes.get('Size'))
s.total_time = attributes.get('Total Time')
s.track_number = attributes.get('Track Number')
if attributes.get('Track Count'):
s.track_count = int(attributes.get('Track Count'))
if attributes.get('Disc Number'):
s.disc_number = int(attributes.get('Disc Number'))
if attributes.get('Disc Count'):
s.disc_count = int(attributes.get('Disc Count'))
if attributes.get('Year'):
s.year = int(attributes.get('Year'))
if attributes.get('Date Modified'):
s.date_modified = time.strptime(str(attributes.get('Date Modified')), date_format)
if attributes.get('Date Added'):
s.date_added = time.strptime(str(attributes.get('Date Added')), date_format)
if attributes.get('Bit Rate'):
s.bit_rate = int(attributes.get('Bit Rate'))
if attributes.get('Sample Rate'):
s.sample_rate = int(attributes.get('Sample Rate'))
s.comments = attributes.get("Comments")
if attributes.get('Rating'):
s.rating = int(attributes.get('Rating'))
s.rating_computed = 'Rating Computed' in attributes
if attributes.get('Play Count'):
s.play_count = int(attributes.get('Play Count'))
if attributes.get('Location'):
s.location = attributes.get('Location')
s.location = urlparse.unquote(urlparse.urlparse(attributes.get('Location')).path[1:])
s.location = s.location # fixes bug #19
if self.media_internal_path is None or self.media_path is None:
raise Exception("media_internal_path or media_path not set")
if self.media_internal_path in self.media_path:
raise Exception("media_internal_path <%s> incorrect for: %s"
% (self.media_internal_path, s.location))
if self.media_internal_path not in s.location:
print("\r media_internal_path <%s> not in location of media <%s>" %
(self.media_internal_path, s.location))
s.location = path_insensitive(s.location.replace(self.media_internal_path, self.media_path))
s.compilation = 'Compilation' in attributes
if attributes.get('Play Date UTC'):
s.lastplayed = time.strptime(str(attributes.get('Play Date UTC')), date_format)
if attributes.get('Skip Count'):
s.skip_count = int(attributes.get('Skip Count'))
if attributes.get('Skip Date'):
s.skip_date = time.strptime(str(attributes.get('Skip Date')), date_format)
if attributes.get('Total Time'):
s.length = int(attributes.get('Total Time'))
if attributes.get('Grouping'):
s.grouping = attributes.get('Grouping')
if self.files_only is False or attributes.get('Track Type') == 'File':
songs.append(s)
print("\r 100%")
self.add_songs(songs)
def load_playlists(self):
print("Load playlists")
new_playlists = []
for p in self.il['Playlists']:
pl = Playlist(p["Name"], p["Playlist Persistent ID"],
p["Parent Persistent ID"] if "Parent Persistent ID" in p else None, self)
track_num = 1
if 'Playlist Items' in p:
for track in p['Playlist Items']:
track_id = int(track['Track ID'])
t = self.songs[track_id]
t.playlist_order = track_num
track_num += 1
pl.tracks.append(t)
new_playlists.append(pl)
self.add_playlists(new_playlists)
self.add_latest()
def add_latest(self):
"""
add latest albums to play-lists
"""
# filter and sort albums
albums_dict = {}
for song_id in self.songs:
song = self.songs[song_id]
key = (song.album, song.album_artist or song.artist, date.fromtimestamp(mktime(song.date_added)), song.year)
if key in albums_dict:
albums_dict[key].append(song)
else:
albums_dict[key] = [song]
albums_list = [i for i in albums_dict.keys() if len(albums_dict[i]) > 3]
albums_list.sort(key=lambda x: (x[2], x[0]), reverse=True)
# create play-lists
new_playlists = []
tracks = []
i = 1
for alb in albums_list[0:99]:
name = fix_path(("%02d - %s - %s - %s" % (i, alb[3], alb[1], alb[0])).strip().replace(os.path.sep, "_"))
i += 1
m = hashlib.md5(name.encode("utf-8")).hexdigest()
tracks += albums_dict[alb]
new_playlists.append(Playlist(name, m, 9, self, albums_dict[alb]))
# add parent
new_playlists.append(Playlist("Last Added", 9, None, self, tracks))
# add to library
self.add_playlists(new_playlists)

63
src/m3u_storage.py Normal file
View File

@ -0,0 +1,63 @@
import codecs
import errno
import os
import shutil
import subprocess
class M3UStorage(object):
"""
holds data of storage to copy library to
"""
def __init__(self, media_path, playlist_path, use_related_paths=True,
device_media_path="", ignore_folders=False, storage_path_separator=os.sep):
self.device_media_path = device_media_path
self.media_path = media_path
self.playlist_path = playlist_path
self.ignore_folders = ignore_folders
self.use_related_paths = use_related_paths
self.storage_path_separator = storage_path_separator
@staticmethod
def makedirs(fname):
if not os.path.exists(os.path.dirname(fname)):
try:
os.makedirs(os.path.dirname(fname))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
def save(self, path, content):
path = path.replace(":", "_").replace("?", "_").replace("mtp_host", "mtp:host")
self.assert_path_in_storage(path)
self.makedirs(path)
with codecs.open(path, "w", encoding="utf-8") as f:
f.write(content)
@staticmethod
def content_equals(path, content):
if os.path.exists(path):
with codecs.open(path, "r", encoding="utf-8") as f:
try:
cont = f.read()
return content == cont
except IOError:
return False
return False
def copy(self, from_, to_):
self.makedirs(to_)
self.assert_path_in_storage(to_)
if not os.path.exists(to_):
try:
if "mtp:" in to_:
subprocess.check_output(['gvfs-copy', from_, to_])
else:
shutil.copy(from_, to_)
except IOError as e:
print(e)
def assert_path_in_storage(self, path):
assert self.media_path in path or self.playlist_path in path

56
src/main.py Normal file
View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
import argparse
import os
from configuration import Configuration
from itunes_library import ITunesLibrary
from m3u_storage import M3UStorage
from synchronisation import LibrarySynchronisation
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("conf_file", help="Configuration file <settings>.json")
options = parser.parse_args()
configuration = Configuration(options.conf_file)
library = ITunesLibrary(configuration.itunes_media_path, configuration.itunes_library,
configuration.itunes_media_db_path)
storage = M3UStorage(configuration.storage_copy_media_path, configuration.storage_copy_playlist_path,
configuration.relative_paths, configuration.storage_m3u_media_path, configuration.ignore_folders,
configuration.storage_path_separator)
connector = LibrarySynchronisation(library, storage)
for song in library.songs.values():
if song.location and not song.location.startswith(library.media_path):
raise Exception("library internal path does not seem to be right: %s" % library.songs[
library.songs.keys()[0]].location)
all_playlists = set(library.play_lists)
g = lambda x: {library.get_play_lists_by_full_name(x)}
s = lambda x: set(library.find_play_lists_by_full_name(x))
include = set([])
for i in configuration.include:
if i.startswith("get_"):
include.update(g(i[4:]))
elif i.startswith("search_"):
include.update(s(i[7:]))
elif i == "all":
include.update(all_playlists)
else:
raise Exception("Missconfiguration: objects in EXCLUDE must start with 'get' or 'search'")
exclude = set([])
for i in configuration.exclude:
if i.startswith("get"):
exclude.update(g(i[4:]))
elif i.startswith("search"):
exclude.update(s(i[7:]))
else:
raise Exception("Missconfiguration: objects in EXCLUDE must start with 'get' or 'search'")
connector.add(include - exclude)
connector.sync(True, configuration.cleanup_media_copy_storage)

254
src/synchronisation.py Normal file
View File

@ -0,0 +1,254 @@
import os
import json
from utils import fix_path
class Action(object):
def execute(self):
pass
def get_change_info(self):
pass
def __repr__(self):
return "Change: %s" % self.get_change_info()
class SynchronisationAction(Action):
def __init__(self, sync):
print("Initialize %s" % self.__class__.__name__)
assert(isinstance(sync, LibrarySynchronisation))
self.sync_obj = sync
class CopyMediaAction(SynchronisationAction):
def __init__(self, *args, **kwargs):
super(CopyMediaAction, self).__init__(*args, **kwargs)
if self.sync_obj.library.media_path == self.sync_obj.storage.media_path:
self.tracks_to_create = set([])
else:
self.tracks_to_create = set(self.sync_obj.get_copy_mapping().keys())\
.difference(set(self.sync_obj.explore_media()))
def execute(self):
size = float(len(self.tracks_to_create))
count = 0.0
copy_mapping = self.sync_obj.get_copy_mapping()
for t in self.tracks_to_create:
if not t.startswith(self.sync_obj.storage.media_path):
raise Exception("Invalid Track path: \n" + "\n".join((t, self.sync_obj.storage.media_path)))
count += 1.0
print("%.2F%% Copy Track: %s" % (count / size * 100, t))
self.sync_obj.storage.copy(copy_mapping[t].location, t)
def get_change_info(self):
return "Tracks to add: " + json.dumps(list(self.tracks_to_create), indent=4)
class RemoveEmptyDirs(SynchronisationAction):
@staticmethod
def find_empty_dirs(root_dir='.'):
"""
fetches all empty dirs in storage
:param root_dir: dir do recusively search for empty dis
:return: iterator over empty dir paths
"""
for dirpath, dirs, files in os.walk(root_dir):
if not dirs and not files:
yield dirpath
def execute(self):
print("Remove Empty Dirs...")
for i in self.find_empty_dirs(self.sync_obj.storage.media_path):
os.removedirs(i)
for i in self.find_empty_dirs(self.sync_obj.storage.playlist_path):
os.removedirs(i)
def get_change_info(self):
return "Remove empty dirs."
class RemoveNotWantedMedia(SynchronisationAction):
def __init__(self, *args, **kwargs):
super(RemoveNotWantedMedia, self).__init__(*args, **kwargs)
self.tracks_to_delete = set(self.sync_obj.explore_media())\
.difference(set(self.sync_obj.get_copy_mapping().keys()))
def execute(self):
for t in self.tracks_to_delete:
if not t.startswith(self.sync_obj.storage.media_path):
raise Exception("Track path '%s' does not start with '%s'" % (t, self.sync_obj.storage.media_path))
print("Remove Track: ", t)
os.remove(t)
def get_change_info(self):
return "Tracks to delete: " + json.dumps(list(self.tracks_to_delete), indent=4)
class RemoveNotWantedPlaylist(SynchronisationAction):
def __init__(self, *args, **kwargs):
super(RemoveNotWantedPlaylist, self).__init__(*args, **kwargs)
self.playlists_to_delete = (set(self.sync_obj.explore_playlists())
.difference(set(self.sync_obj.get_playlist_mapping().keys())))
def execute(self):
for p in self.playlists_to_delete:
print("Remove Playlist: ", p)
os.remove(p)
def get_change_info(self):
return "Playlist to delete: " + json.dumps(list(self.playlists_to_delete), indent=4)
class CopyPlaylists(SynchronisationAction):
def __init__(self, sync, update_playlists):
super(CopyPlaylists, self).__init__(sync)
self.update_playlists = update_playlists
self.playlists_to_create = set(self.sync_obj.get_playlist_mapping().keys())\
.difference(set(self.sync_obj.explore_playlists()))
def execute(self):
mapping = self.sync_obj.get_playlist_mapping()
for p in (mapping.keys() if self.update_playlists else self.playlists_to_create):
content = mapping[p].get_m3u_content(self.sync_obj.library.media_path, self.sync_obj.storage)
if not self.sync_obj.storage.content_equals(p, content):
print("Updated Playlist: ", p)
self.sync_obj.storage.save(p, content)
def get_change_info(self):
return ("Playlist to create: %s\n All Playlists will be updated."
% json.dumps(list(self.playlists_to_create), indent=4))
class LibrarySynchronisation(object):
"""
Synchronises Library to storage
"""
def __init__(self, library, storage):
"""
:param library: library to synchronise
:param storage: storagee to synchronise to
"""
self.library = library
self.storage = storage
self._playlists = set([])
self._tracks = set([])
def add(self, playlists=None, tracks=None):
"""
add data that should be in storage
:param playlists: playlists
:param tracks: tracks
:return:
"""
if playlists:
self._playlists = self._playlists.union(playlists)
for playlist in playlists:
self._tracks = self._tracks.union(playlist.tracks)
if tracks:
self._tracks = self._tracks.union(tracks)
@property
def tracks(self):
return self._tracks or [self.library.songs[k] for k in self.library.songs]
@property
def playlists(self):
return self._playlists or self.library.play_lists
def explore_media(self):
"""
gather all media files
:return: (media_files, playlist_files)
"""
audio = ["3gp", "aa", "aac", "aax", "act", "aiff", "amr", "ape", "au", "awb", "dct", "dss", "dvf", "flac",
"gsm", "iklax", "ivs", "m4a", "m4b", "m4p", "mmf", "mp3", "mpc", "msv", "ogg", "oga", "mogg", "opus",
"ra", "rm", "raw", "sln", "tta", "vox", "wav", "wma", "wv", "webm"]
media_files = []
for dirpath, dirnames, filenames in os.walk(self.storage.media_path):
media_files += [os.path.join(dirpath, filename)
for filename in filenames
if filename.lower().endswith(tuple(audio))]
return media_files
def explore_playlists(self):
"""
gather all media files
:return: (media_files, playlist_files)
"""
playlist_files = []
for dirpath, dirnames, filenames in os.walk(self.storage.playlist_path):
playlist_files += [os.path.join(dirpath, filename) for filename in filenames if filename.endswith("m3u")]
return playlist_files
def sync(self, update_playlists=True, remove_other_files=True):
if os.path.abspath(self.storage.media_path) == os.path.abspath(self.library.media_path) and remove_other_files:
raise Exception("Removing files in original library is not allowed.")
actions = [
CopyMediaAction(self),
CopyPlaylists(self, update_playlists),
RemoveNotWantedPlaylist(self)
]
if remove_other_files:
actions += [
RemoveNotWantedMedia(self),
RemoveEmptyDirs(self)
]
for action in actions:
print(action.__class__.__name__)
print(action)
value = ""
while value != "y":
value = input("Continue with changes? [y/N]: ")
if value == "n":
print("Cancel")
exit(0)
for action in actions:
print(action.__class__)
action.execute()
print("DONE.")
def get_copy_mapping(self):
"""
generate mapping, where tracks should be moved
:return: dict
"""
tracks = set(self.tracks)
mapping = {}
for t in tracks:
if t.location:
mapping[fix_path(self.translate_media_path(t.location))] = t
return mapping
def translate_media_path(self, path):
return fix_path(path.replace(self.library.media_path, self.storage.media_path))
def get_playlist_mapping(self):
"""
generate mapping, where playlists should be moved
:return: dict
"""
playlists = set(self.playlists)
mapping = {}
for p in playlists:
if self.storage.ignore_folders:
path = os.path.join(self.storage.playlist_path,
self.library.get_playlist_full_name(p).replace(os.sep, "#") + ".m3u")
else:
path = os.path.join(self.storage.playlist_path, self.library.get_playlist_full_name(p) + ".m3u")
mapping[fix_path(path)] = p
return mapping

48
src/utils.py Normal file
View File

@ -0,0 +1,48 @@
import os
def fix_path(path):
return path.replace(":", "_").replace("?", "_").replace("mtp_host", "mtp:host").replace("ftp_host", "ftp:host")
def path_insensitive(path):
return _path_insensitive(path) or path
def _path_insensitive(path):
if path == '' or os.path.exists(path):
return path
base = os.path.basename(path) # may be a directory or a file
dirname = os.path.dirname(path)
suffix = ''
if not base: # dir ends with a slash?
if len(dirname) < len(path):
suffix = path[:len(path) - len(dirname)]
base = os.path.basename(dirname)
dirname = os.path.dirname(dirname)
if not os.path.exists(dirname):
dirname = _path_insensitive(dirname)
if not dirname:
return
# at this point, the directory exists but not the file
try: # we are expecting dirname to be a directory, but it could be a file
files = os.listdir(dirname)
except OSError:
return
baselow = base.lower()
try:
basefinal = next(fl for fl in files if fl.lower() == baselow)
except StopIteration:
return
if basefinal:
return os.path.join(dirname, basefinal) + suffix
else:
return