Compare commits

..

3 Commits

Author SHA1 Message Date
eca3ba4a64 Move transcoding logic to separate class 2025-01-18 18:02:29 +00:00
b013416e2f Implement logging levels 2025-01-18 17:59:23 +00:00
94d6ae4101 Implement logging 2025-01-18 17:41:00 +00:00
7 changed files with 232 additions and 80 deletions

6
src/log/__init__.py Normal file
View File

@ -0,0 +1,6 @@
INFO=0
WARNING=5
ERROR=10
from .log import Log
from .logcat import LogCat

81
src/log/log.py Normal file
View File

@ -0,0 +1,81 @@
from datetime import datetime
from multiprocessing import Lock
from threading import Thread
from queue import Queue, Empty
from . import INFO, WARNING, ERROR
class Log(object):
def __init__(self, path, queue=None, print_output=True, timeout=1, level=WARNING):
self.__terminated = False
self.level = level
self.file_lock = Lock()
if not path.exists():
path.mkdir()
elif not path.is_dir():
raise Exception("Path exists and is not a directory")
self.log_dir = path
self.print = print_output
self.today_date = None
self.log_file = None
self.thread = Thread(target=self.write_loop, args=(64, timeout))
if queue is None:
self.queue = Queue()
else:
self.queue = queue
try:
self.thread.start()
except KeyboardInterrupt:
self.__terminated = True
except Exception:
raise Exception("Exception in main log loop.")
def stop(self):
self.__terminated = True
def write_loop(self, lines, timeout):
while not self.__terminated:
log_msg_list = []
try:
while len(log_msg_list) < lines:
try:
log_line = self.queue.get(block=True, timeout=timeout)
level, message = log_line
if self.print and level >= self.level:
print(message)
log_msg_list.append(f"{message.encode('cp1252', errors='replace').decode('cp1252')}\n")
except Empty:
break
finally:
self.write_log(log_msg_list)
self.save()
def save(self):
self.write_log(self.queue_get_remaining())
def queue_get_remaining(self):
log_msg_list = []
while True:
try:
log_msg_list.append(f"{self.queue.get(block=False)}\n")
except Empty:
break
return log_msg_list
def set_log_file(self):
with self.file_lock:
self.today_date = datetime.today().date()
today_string = self.today_date.strftime("%Y-%m-%d")
self.log_file = self.log_dir / f"{today_string}.log"
def write_log(self, log_msg_list: list):
if len(log_msg_list) > 0:
if self.today_date != datetime.today().date() or self.log_file is None:
self.set_log_file()
with self.file_lock:
with open(self.log_file, mode='a') as log_file:
log_file.writelines(log_msg_list)

32
src/log/logcat.py Normal file
View File

@ -0,0 +1,32 @@
from datetime import datetime
from multiprocessing import Queue
from . import INFO, WARNING, ERROR
class LogCat(object):
def __init__(self, log_queue: Queue, category: str):
self.queue = log_queue
self.category = category
def _write(self, level: int, function: str, message: str) -> None:
time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
self.queue.put((level, f"{self.level_string(level)} | {time} - {self.category.upper()}.{function.upper()}: {message}"))
def info(self, function: str, message: str) -> None:
self._write(INFO, function, message)
def warning(self, function: str, message: str) -> None:
self._write(WARNING, function, message)
def error(self, function: str, message: str) -> None:
self._write(ERROR, function, message)
@staticmethod
def level_string(level: int) -> str:
if level == INFO:
return "INFO"
elif level == WARNING:
return "WARN"
elif level == ERROR:
return "ERR "

View File

@ -1,6 +1,8 @@
import argparse import argparse
from os.path import realpath
from pathlib import Path from pathlib import Path
from transcode import Transcoder from transcode import Transcoder
from log import Log
def get_args(): def get_args():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -11,9 +13,11 @@ def get_args():
def main(input_dir: Path, output_dir: Path, encoder: Path, out_extension: str = 'opus'): def main(input_dir: Path, output_dir: Path, encoder: Path, out_extension: str = 'opus'):
wd = Path(realpath(__file__)).parent.parent
log_path = wd / "logs"
if encoder.parts[-1] == "qaac64.exe": if encoder.parts[-1] == "qaac64.exe":
out_extension = "m4a" out_extension = "m4a"
transcoder = Transcoder(encoder, out_extension, input_dir, output_dir) transcoder = Transcoder(encoder, out_extension, input_dir, output_dir, log_path)
transcoder.transcode() transcoder.transcode()

View File

@ -1,2 +1,3 @@
from .file import File from .file import File
from .worker import Worker
from .transcoder import Transcoder from .transcoder import Transcoder

View File

@ -1,96 +1,61 @@
from pathlib import Path from pathlib import Path
import shutil from multiprocessing import Pool, Manager, set_start_method
import subprocess from log import Log, LogCat
from multiprocessing import Pool from . import Worker
from . import File
class Transcoder: class Transcoder:
def __init__(self, encoder: Path, extension: str, input_root: Path, output_root: Path): def __init__(self, encoder: Path, extension: str, input_root: Path, output_root: Path, log_path: Path):
self.encoder = encoder self.encoder = encoder
self.extension = extension self.extension = extension
self.input_root = input_root self.input_root = input_root
self.output_root = output_root self.output_root = output_root
self.log_path = log_path
self.__log = Log(log_path)
self.log = LogCat(self.__log.queue, "TCD")
def transcode(self): def transcode(self):
transcode_list = [] transcode_list = []
try:
for artist in self.input_root.iterdir(): for artist in self.input_root.iterdir():
if artist.is_dir(): if artist.is_dir():
for album in artist.iterdir(): for album in artist.iterdir():
if album.is_dir(): if album.is_dir():
for file in album.iterdir(): for file in album.iterdir():
if file.is_file(): if file.is_file():
transcode_list.append(File(file)) transcode_list.append(file)
else: else:
print(f"Warning, skipping non-dir '{album}' found in artist '{artist.parts[-1]}'") self.log.warning("TRK", f"Warning, skipping non-dir '{album}' found in artist '{artist.parts[-1]}'")
continue continue
else: else:
print(f"Warning, skipping non-dir '{artist}' found in root") self.log.warning("TRK", f"Warning, skipping non-dir '{artist}' found in root")
continue continue
self._transcode(transcode_list, self.encoder) self._transcode(transcode_list, self.encoder)
finally:
self.__log.stop()
def _transcode(self, transcode_list: list, encoder: Path, workers=16): def _transcode(self, transcode_list: list, encoder: Path, workers=16):
worker_args = [(track, encoder) for track in transcode_list] manager = Manager()
with Pool(workers) as p: queue = manager.Queue()
results = p.starmap_async(self.transcode_worker, worker_args) log = Log(self.log_path, queue)
p.close() logcat = LogCat(log.queue, "TCD")
p.join() args = [(str(self.output_root), self.extension, track, encoder, logcat) for track in transcode_list]
for result in results.get(): with Pool(workers) as pool:
print(result) pool.starmap(self.worker, args)
pool.close()
pool.join()
log.stop()
def _transcode_single_thread(self, transcode_list: list, encoder: Path): def _transcode_single_thread(self, transcode_list: list, encoder: Path):
log = Log(self.log_path)
logcat = LogCat(log.queue, "TCD")
worker_args = [(track, encoder) for track in transcode_list] worker_args = [(track, encoder) for track in transcode_list]
for args in worker_args: for track, encoder in worker_args:
print(self.transcode_worker(*args)) self.worker(str(self.output_root), self.extension, track, encoder, logcat)
log.stop()
@staticmethod
def transcode_worker(self, track, encoder): def worker(output_root, extension, track, encoder, log):
if track.is_art: w = Worker(output_root, extension)
return self.copy_album_art(track) w.transcode_worker(track, encoder, log)
elif track.is_audio:
return self.transcode_audio(track, encoder)
else:
return f"File {track.path} ignored"
def copy_album_art(self, file: File):
self.create_directories(file)
output_path = file.output_file(self.output_root, file.extension[1:])
if output_path.exists():
return f"Skipped {output_path}"
try:
shutil.copy(file.path, output_path)
return f"Successfully copied {output_path}"
except FileExistsError:
pass
except Exception:
return f"{file.path} failed to copy"
def transcode_audio(self, track, encoder):
enc_filename = encoder.parts[-1]
track_output = track.output_file(self.output_root, self.extension)
if track_output.exists():
return f"Skipped {track_output}"
self.create_directories(track)
if enc_filename == "opusenc.exe":
additional_args = ('--bitrate', '128', '--music')
subprocess_args = (str(encoder),) + additional_args + (str(track.path), str(track_output))
elif enc_filename == "qaac64.exe":
additional_args = (str(track), '-o', str(track_output), '--threading')
subprocess_args = (str(encoder),) + additional_args
try:
subprocess.run(subprocess_args, capture_output=True, text=True, check=True)
return f"Transcoded '{track}' successfully."
except Exception:
return f"ERROR: Transcoding of '{track}' failed."
def create_directories(self, track: File):
if not track.artist_out(self.output_root).exists():
try:
track.artist_out(self.output_root).mkdir()
except FileExistsError:
pass
if not track.album_out(self.output_root).exists():
try:
track.album_out(self.output_root).mkdir()
except FileExistsError:
pass

63
src/transcode/worker.py Normal file
View File

@ -0,0 +1,63 @@
import shutil
import subprocess
from . import File
from pathlib import Path
class Worker:
def __init__(self, output_root, extension):
self.output_root = Path(output_root)
self.extension = extension
def transcode_worker(self, track, encoder, log):
track = File(track)
if track.is_art:
return self.copy_album_art(track, log)
elif track.is_audio:
return self.transcode_audio(track, encoder, log)
else:
log.info("WRK", f"File {track.path} ignored")
def copy_album_art(self, file: File, log):
self.create_directories(file)
output_path = file.output_file(self.output_root, file.extension[1:])
if output_path.exists():
log.info("ART", f"Skipped {output_path}")
return
try:
shutil.copy(file.path, output_path)
log.info("ART", f"Successfully copied {output_path}")
except FileExistsError:
pass
except Exception:
log.error("ART", f"{file.path} failed to copy")
def transcode_audio(self, track, encoder, log):
enc_filename = encoder.parts[-1]
track_output = track.output_file(self.output_root, self.extension)
if track_output.exists():
log.info("AUD", f"Skipped {track_output}")
return
self.create_directories(track)
if enc_filename == "opusenc.exe":
additional_args = ('--bitrate', '128', '--music')
subprocess_args = (str(encoder),) + additional_args + (str(track.path), str(track_output))
elif enc_filename == "qaac64.exe":
additional_args = (str(track), '-o', str(track_output), '--threading')
subprocess_args = (str(encoder),) + additional_args
try:
subprocess.run(subprocess_args, capture_output=True, text=True, check=True)
log.info("AUD", f"Transcoded '{track}' successfully.")
except Exception:
log.error("AUD", f"ERROR: Transcoding of '{track}' failed.")
def create_directories(self, track: File):
if not track.artist_out(self.output_root).exists():
try:
track.artist_out(self.output_root).mkdir()
except FileExistsError:
pass
if not track.album_out(self.output_root).exists():
try:
track.album_out(self.output_root).mkdir()
except FileExistsError:
pass