# coding: utf-8
import logging
import math
import os
import subprocess
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple, Union
import cv2
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from matplotlib.figure import Figure
from PIL import Image
from tqdm import tqdm
from ..utils._colorings import toACCENT, toBLUE, toGREEN
from ..utils._loggers import get_logger
from ..utils._path import OPENING_TEMPLATE_PATH
from ..utils.audio_utils import overlay_audio, synthesize_audio
from ..utils.generic_utils import Cycler, str_strip
from ..utils.image_utils import arr2pil, draw_text_in_pil, pil2arr
from ..utils.video_utils import capture2writor, show_frames
[docs]class BaseVideoHandler(ABC):
def __init__(self):
pass
[docs] @staticmethod
def get_frame(
pos: int,
video_path: str = OPENING_TEMPLATE_PATH,
as_pil: bool = False,
) -> Union[npt.NDArray[np.uint8], Image.Image]:
"""Get the ``pos``-th frame in the video at ``video_path``.
Args:
pos (int) : The position of the frame to get.
video_path (str, optional) : Path to the video. Defaults to ``OPENING_TEMPLATE_PATH``.
as_pil (bool, optional) : Whether to return in type ``Image.Image``. Defaults to ``False``.
Returns:
Union[npt.NDArray[np.uint8], Image.Image]: ``pos``-th frame in the video at ``video_path``.
.. plot::
:class: popup-img
>>> import matplotlib.pyplot as plt
>>> from wed.chaptors import BaseVideoHandler
>>> from wed.utils import cv2plot
>>> fig = plt.figure(figsize=(18,8))
>>> for i,pos in enumerate([129,240,300,361,492], start=1):
... ax = fig.add_subplot(2, 3, i)
... frame = BaseVideoHandler.get_frame(pos=pos)
... ax = cv2plot(frame=frame,ax=ax, isBGR=True)
... ax.set_title(f"Frame No. {pos}")
>>> fig.tight_layout()
>>> fig.show()
"""
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_FRAMES, pos)
ret, frame = cap.read()
cap.release()
if as_pil and (frame is not None):
frame = arr2pil(frame)
return frame
[docs] @staticmethod
def show_frames(
start: int = 0,
end: Optional[int] = None,
step: int = 1,
ncols: int = 6,
figsize: Tuple[int, int] = (4, 3),
fig: Optional[Figure] = None,
) -> Figure:
"""Cut out frames from the video and plot them.
Args:
start (int, optional) : Draw subsequent frames from ``start``. Defaults to ``0``.
end (Optional[int], optional) : Draw up to ``end``-th frame. If not specified, draw to the end. Defaults to ``None``.
ncols (int, optional) : Number of images lined up side by side (number of columns). Defaults to ``6``.
figsize (Tuple[int, int], optional) : Size of one image. Defaults to ``(4,3)``.
fig (Optional[Figure], optional) : Figure instance you want to draw in. Defaults to ``None``.
Returns:
Figure: Figure where frames from ``start`` to ``end`` are drawn.
.. plot::
:class: popup-img
>>> from wed.chaptors import BaseVideoHandler
>>> fig = BaseVideoHandler.show_frames(step=30, ncols=4)
>>> fig.show()
"""
return show_frames(
video=OPENING_TEMPLATE_PATH,
start=start,
end=end,
step=step,
ncols=ncols,
figsize=figsize,
fig=fig,
)
[docs]class BaseWedOPEditor(BaseVideoHandler):
"""Abstract Editor Class for Wednesday's Downtown OP.
Args:
positions (Tuple[int, int]) : Positions in videos assigned to this editor. (``start_pos``, ``end_pos``)
image_paths (Dict[str,str], optional) : [description]. Defaults to ``{}``.
texts (dict[str,str], optional) : [description]. Defaults to ``{}``.
Attributes:
EDITOR_IDX (int) : Number to prevent the same editor from using the same ``logger``.
logger (logging.Logger) : Editor assigned to this editor. An instance of ``logging.Logger``.
start_pos, end_pos (int) : Positions in videos assigned to this editor.
duration (int) : Duration assigned to this editor. Same as ``end_pos`` - ``start_pos`` + ``1``.
"""
EDITOR_IDX: int = 0
TOTAL_TEXT_LENGTH: int = 30
FRAME_SIZE: Tuple[int, int] = (640, 360)
FRAME_WIDTH, FRAME_HEIGHT = FRAME_SIZE
FPS: float = 30.0
def __init__(
self,
positions: Tuple[int, int],
image_paths: Dict[str, str] = {},
texts: Dict[str, str] = {},
):
self.logger = get_logger(name=self.editor_name)
self.set_image_attributes(**image_paths)
self.set_text_attributes(**texts)
self.start_pos, self.end_pos = positions
self.duration = self.end_pos - self.start_pos + 1
BaseWedOPEditor.EDITOR_IDX += 1
@property
def editor_name(self):
"""Same as ``self.__class__.__name__``."""
return f"{BaseWedOPEditor.EDITOR_IDX}.{self.__class__.__name__}"
[docs] def set_attribute(self, name: str, value: str, msg: Optional[str] = None) -> None:
"""Set attribute to this class with logs using ``setattr``.
Args:
name (str) : An attribute name.
value (str) : An attribute value.
msg (str, optional) : Additional log message. Defaults to ``""``.
Examples:
>>> from wed.chaptors import MarqueeEditor
>>> editor = MarqueeEditor
>>> editor.set_attribute(name="hoge", value=1)
>>> hasattr(editor, "hoge")
True
>>> editor.hoge
1
"""
if msg is None:
msg = ""
else:
msg = str(msg)
if len(msg) == 0:
msg = str(value)
msg = " " + msg
self.logger.info(f"Set attribute {toGREEN(name)}.{msg}")
setattr(self, name, value)
[docs] def set_text_attributes(self, **texts) -> None:
"""Set attributes for text.
Examples:
>>> from wed.chaptors import MarqueeEditor
>>> editor = MarqueeEditor()
>>> editor.set_text_attributes(hoge_text="HOGE")
>>> hasattr(editor, "hoge_text") and hasattr(editor, "hoge_texts")
True
"""
for name, text in texts.items():
texts = (
(text + " ")
* math.ceil(BaseWedOPEditor.TOTAL_TEXT_LENGTH / (len(text) + 1))
)[:-1]
self.set_attribute(name=f"{name}_text", value=text, msg=text)
self.set_attribute(name=f"{name}_texts", value=texts, msg=texts)
[docs] def set_image_attributes(self, **image_paths) -> None:
"""Set attributes for images.
Raises:
FileNotFoundError: When file is not found.
Examples:
>>> from wed.utils import ROTATING_SQUARE_IMAGE_PATH
>>> from wed.chaptors import RotatingRectangleEditor
>>> editor = RotatingRectangleEditor()
>>> editor.set_image_attributes(hoge_image=ROTATING_SQUARE_IMAGE_PATH)
>>> hasattr(editor, "hoge_image_arr") and hasattr(editor, "hoge_image_pil")
True
"""
for name, path in image_paths.items():
if not os.path.exists(path):
raise FileNotFoundError(f"{toBLUE(path)} is not found.")
img_arr = cv2.imread(path)
self.set_attribute(
name=f"{name}_arr",
value=img_arr,
msg=f"Image size (w,h) = {img_arr.shape[:2][::-1]}",
)
img_pil = Image.open(path)
self.set_attribute(
name=f"{name}_pil",
value=img_pil,
msg=f"Image size (w,h) = {img_pil.size}",
)
[docs] def set_fontcyrcler_attributes(
self, prefix, cycler: Cycler, ttfontname, **kwargs
) -> None:
"""[summary]
Args:
prefix ([type]) : [description].
cycler (Cycler) : [description].
ttfontname ([type]) : [description].
"""
self.set_attribute(f"{prefix}_cycler", value=cycler)
x, img = (0, None)
for s in cycler.sizes:
img, (x, _) = draw_text_in_pil(
text="A",
img=img,
xy=(x, 0),
fontsize=s,
ttfontname=ttfontname,
**kwargs,
)
self.set_attribute(
f"{prefix}_fontwidth",
value=int(x / cycler.sizes_len),
msg="",
)
[docs] @abstractmethod
def edit(self, frame: npt.NDArray[np.uint8], pos: int) -> npt.NDArray[np.uint8]:
"""Edit the image if it is an assigned chapter (``pos``)
Args:
frame (npt.NDArray[np.uint8]) : Current frame in the video.
pos (int) : Current position in the video.
Returns:
npt.NDArray[np.uint8]: Edited frame.
"""
if self.start_pos <= pos <= self.end_pos:
frame = 0
return frame
[docs] def audio_for_overlay_create(self) -> Tuple[bool, str]:
return (False, "")
[docs] def overlayed_audio_create(self, video_path) -> str:
is_ok, overlay_media_path = self.audio_for_overlay_create()
if is_ok:
overlayed_audio_path = overlay_audio(
base_media_path=video_path,
overlay_media_path=overlay_media_path,
position=int(self.start_pos / BaseWedOPEditor.FPS * 1000),
)
self.logger.info(
f"Overlayed audio file is created at {toBLUE(overlayed_audio_path)}"
)
return overlayed_audio_path
return video_path
[docs] def check_works(
self,
video_path: str = OPENING_TEMPLATE_PATH,
audio_path: Optional[str] = None,
out_path: Optional[str] = None,
codec: str = "H264",
open: bool = True,
**kwargs,
) -> str:
"""Check the editing results of this editor.
Args:
video_path (str, optional) : Path to the input video. Defaults to ``OPENING_TEMPLATE_PATH``.
audio_path (str, optional) : Path to the audio file. Defaults to ``None``. (Same as ``video_path``.)
out_path (Optional[str], optional) : Path to the created video. Defaults to ``None``.
codec (str, optional) : Video codec for the created video. Defaults to ``"H264"``.
open (bool, optional) : Whether to open output file or not. Defaults to ``True``.
Returns:
str: Path to the created video.
Examples:
>>> from wed.chaptors import MarqueeEditor
>>> editor = MarqueeEditor(upper_text="IWASAKI SHUTO", lower_text="INFTY")
>>> out_path = editor.check_works()
"""
cap = cv2.VideoCapture(video_path)
out, out_path = capture2writor(cap, out_path=out_path, codec=codec)
cap.set(cv2.CAP_PROP_POS_FRAMES, self.start_pos)
for i in tqdm(range(self.end_pos - self.start_pos + 1), desc=self.editor_name):
pos = cap.get(cv2.CAP_PROP_POS_FRAMES)
is_ok, frame = cap.read()
if (not is_ok) or (frame is None):
break
frame = self.edit(frame=frame, pos=pos, **kwargs)
out.write(frame)
out.release()
cap.release()
# Synthesize Audio.
audio_path = self.overlayed_audio_create(video_path=video_path)
out_synthesized_path = synthesize_audio(
video_path=out_path,
audio_path=audio_path,
start=int(1000 * self.start_pos / BaseWedOPEditor.FPS),
end=int(1000 * self.end_pos / BaseWedOPEditor.FPS),
open=open,
delete_intermidiates=True,
logger=self.logger,
)
return out_synthesized_path
[docs] def check_work(
self, pos: int, video_path: str = OPENING_TEMPLATE_PATH, as_pil: bool = True
) -> Union[npt.NDArray[np.uint8], Image.Image]:
"""Check the editing result for ``pos`` frame in video at ``video_path`` of this editor.
Args:
pos (int) : The position in the video.
video_path (str, optional) : Path to the video file. Defaults to ``OPENING_TEMPLATE_PATH``.
as_pil (bool, optional) : Whether to return object as ``Image.Image`` or ``npt.NDArray[npt.uint8]``. Defaults to ``True``.
Returns:
Union[npt.NDArray[np.uint8], Image.Image]: Edutubg resykt fir the ``pos``-th frame.
Examples:
>>> import matplotlib.pyplot as plt
>>> from wed.utils import cv2plot
>>> from wed.chaptors import MarqueeEditor
>>> editor = MarqueeEditor(upper_text="IWASAKI SHUTO", lower_text="INFTY")
>>> frame = editor.check_work(pos=125, as_pil=False)
>>> fig, ax = plt.subplots()
>>> ax = cv2plot(frame, ax=ax)
>>> fig.show()
"""
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_FRAMES, pos)
is_ok, frame = cap.read()
if is_ok and (frame is not None):
frame = self.edit(frame=frame, pos=pos)
if as_pil:
frame = arr2pil(frame)
cap.release()
return frame
[docs] def show_frames_in_charge(
self,
step: int = 1,
ncols: int = 6,
video_path: str = OPENING_TEMPLATE_PATH,
figsize: Tuple[int, int] = (4, 3),
fig: Optional[Figure] = None,
) -> Figure:
"""Cut out frames from the video and plot them.
Args:
ncols (int, optional) : Number of images lined up side by side (number of columns). Defaults to ``6``.
video_path (str, optional) : Path to video. Defaults to ``OPENING_TEMPLATE_PATH``.
figsize (Tuple[int, int], optional) : Size of one image. Defaults to ``(4,3)``.
fig (Optional[Figure], optional) : Figure instance you want to draw in. Defaults to ``None``.
Returns:
Figure: Figure where frames from ``start`` to ``end`` are drawn.
.. plot::
:class: popup-img
>>> from wed.chaptors import BaseVideoHandler
>>> fig = BaseVideoHandler.show_frames(step=30, ncols=4)
>>> fig.show()
"""
return super().show_frames(
start=self.start_pos,
end=self.end_pos,
step=step,
video_path=video_path,
figsize=figsize,
fig=fig,
)