Compare commits
No commits in common. "cc5f4f77da6037f2b7f023f25a7ce0ee7d360449" and "e06bcb895ab616e8c11bb6485d8ac4b1071a6557" have entirely different histories.
cc5f4f77da
...
e06bcb895a
|
@ -1,2 +1,2 @@
|
|||
from .file import File
|
||||
from .track import Track
|
||||
from .transcoder import Transcoder
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
from pathlib import Path
|
||||
|
||||
|
||||
audio_extensions = ['.flac']
|
||||
audio_extensions = ['flac']
|
||||
art_extensions = ['.jpg', '.jpeg', '.png']
|
||||
|
||||
class File:
|
||||
class Track:
|
||||
def __init__(self, location: Path):
|
||||
self.path = location
|
||||
|
|
@ -2,7 +2,7 @@ from pathlib import Path
|
|||
import shutil
|
||||
import subprocess
|
||||
from multiprocessing import Pool
|
||||
from . import File
|
||||
from . import Track
|
||||
|
||||
|
||||
class Transcoder:
|
||||
|
@ -16,18 +16,29 @@ class Transcoder:
|
|||
transcode_list = []
|
||||
for artist in self.input_root.iterdir():
|
||||
if artist.is_dir():
|
||||
artist_out = Path(self.output_root) / artist.name
|
||||
artist_out.mkdir()
|
||||
for album in artist.iterdir():
|
||||
if album.is_dir():
|
||||
for file in album.iterdir():
|
||||
if file.is_file():
|
||||
transcode_list.append(File(file))
|
||||
album_out = artist_out / album.parts[-1]
|
||||
if album_out.exists():
|
||||
print(f"Skipping '{artist.parts[-1]} / {album.parts[-1]}'")
|
||||
else:
|
||||
album_out.mkdir()
|
||||
done_file = album / 'DONE'
|
||||
open(done_file, 'a').close()
|
||||
self.copy_album_art(album, album_out)
|
||||
for file in album.iterdir():
|
||||
if file.is_file() and file.suffix.lower() == '.flac':
|
||||
transcode_list.append(Track(file))
|
||||
else:
|
||||
print(f"Warning, skipping non-dir '{album}' found in artist '{artist.parts[-1]}'")
|
||||
continue
|
||||
else:
|
||||
print(f"Warning, skipping non-dir '{artist}' found in root")
|
||||
continue
|
||||
self._transcode(transcode_list, self.encoder)
|
||||
self._transcode_single_thread(transcode_list, self.encoder)
|
||||
#self._transcode(transcode_list, self.encoder)
|
||||
|
||||
def _transcode(self, transcode_list: list, encoder: Path, workers=16):
|
||||
worker_args = [(track, encoder) for track in transcode_list]
|
||||
|
@ -41,36 +52,13 @@ class Transcoder:
|
|||
def _transcode_single_thread(self, transcode_list: list, encoder: Path):
|
||||
worker_args = [(track, encoder) for track in transcode_list]
|
||||
for args in worker_args:
|
||||
print(self.transcode_worker(*args))
|
||||
print(f"{args[0]}")
|
||||
self.transcode_worker(*args)
|
||||
|
||||
|
||||
def transcode_worker(self, track, encoder):
|
||||
if track.is_art:
|
||||
return self.copy_album_art(track)
|
||||
elif track.is_audio:
|
||||
return self.transcode_audio(track, encoder)
|
||||
else:
|
||||
return f"File {track.path} ignored"
|
||||
|
||||
def copy_album_art(self, file: File):
|
||||
self.create_directories(file)
|
||||
output_path = file.output_file(self.output_root, file.extension[1:])
|
||||
if output_path.exists():
|
||||
return f"Skipped {output_path}"
|
||||
try:
|
||||
shutil.copy(file.path, output_path)
|
||||
return f"Successfully copied {output_path}"
|
||||
except FileExistsError:
|
||||
pass
|
||||
except Exception:
|
||||
return f"{file.path} failed to copy"
|
||||
|
||||
def transcode_audio(self, track, encoder):
|
||||
enc_filename = encoder.parts[-1]
|
||||
track_output = track.output_file(self.output_root, self.extension)
|
||||
if track_output.exists():
|
||||
return f"Skipped {track_output}"
|
||||
self.create_directories(track)
|
||||
if enc_filename == "opusenc.exe":
|
||||
additional_args = ('--bitrate', '128', '--music')
|
||||
subprocess_args = (str(encoder),) + additional_args + (str(track.path), str(track_output))
|
||||
|
@ -83,14 +71,18 @@ class Transcoder:
|
|||
except Exception:
|
||||
return f"ERROR: Transcoding of '{track}' failed."
|
||||
|
||||
def create_directories(self, track: File):
|
||||
if not track.artist_out(self.output_root).exists():
|
||||
try:
|
||||
track.artist_out(self.output_root).mkdir()
|
||||
except FileExistsError:
|
||||
pass
|
||||
if not track.album_out(self.output_root).exists():
|
||||
try:
|
||||
track.album_out(self.output_root).mkdir()
|
||||
except FileExistsError:
|
||||
pass
|
||||
@staticmethod
|
||||
def copy_album_art(in_dir: Path, out_dir: Path, file_whitelist: list = None):
|
||||
if file_whitelist is None:
|
||||
file_whitelist = ['.jpg', '.jpeg', '.png']
|
||||
art_file_name = "cover.jpg"
|
||||
|
||||
art_found = False
|
||||
for file in in_dir.iterdir():
|
||||
if file.is_file() and file.suffix.lower() in file_whitelist:
|
||||
if file.name == art_file_name:
|
||||
art_found = True
|
||||
shutil.copy(file, out_dir / file.name)
|
||||
|
||||
if not art_found:
|
||||
print(f"Warning: {art_file_name} not found in {in_dir}")
|
||||
|
|
Loading…
Reference in New Issue
Block a user