字幕翻訳改善提案
作成日: 2024年10月7日
使用モデル OpenAI 01 Mini by OpenAI01
作成日: 2024年10月7日
使用モデル OpenAI 01 Mini by OpenAI01
改善の方向を模索中
ラインブレーカーのパフォーマンス向上と、翻訳精度の向上を求めている。モデルを変更せずに、コードの改良を進めようとしている。
改善点を見極める
add_line_breaks
関数の改善を提案中。日本語の自然な切れ目を意識し、Janome トークナイザーを有効活用する方法を模索している。
あなたのスクリプトは既に非常に洗練されていますが、以下の点でさらに改善できます。
改行関数 (add_line_breaks
) の高性能化:
翻訳精度の向上:
以下に、これらの改善を反映したコードを示します。
def add_line_breaks(text): """テキストに適切な改行を追加します。 Args: text (str): 改行を追加するテキスト。 Returns: str: 改行が追加されたテキスト。 """ tokenizer = Tokenizer() tokens = tokenizer.tokenize(text, wakati=True) lines = [] current_line = "" for token in tokens: # 句読点で改行 if token in ['。', '!', '?']: current_line += token lines.append(current_line) current_line = "" else: if len(current_line) + len(token) > MAX_LINE_LENGTH: lines.append(current_line) current_line = token else: current_line += token if current_line: lines.append(current_line) # 不要な文字の削除を一括で行う cleaned_lines = [re.sub(r'[[]「」(){}‘’“”]', '', line) for line in lines] # 最大行数を制限 return '\n'.join(cleaned_lines[:MAX_LINES])
改善点:
翻訳精度を向上させるために、プロンプトに具体的な指示や例を追加します。これにより、モデルが期待される出力をより正確に理解できます。
template = """ あなたはプロの翻訳者であり、字幕翻訳の専門家です。以下の点を考慮して、自然で正確な日本語に翻訳してください。 1. 口語的な表現を適切に使用する 2. 文脈に合わせた適切な語彙を選択する 3. 字幕のニュアンスや感情を正確に伝える 4. 字幕のタイミングと長さを考慮し、簡潔かつ読みやすい表現にする 5. 前後の字幕の内容を考慮し、一貫性を保つ 6. 固有名詞や専門用語はそのまま、または適切に翻訳する 以下に原文と翻訳例を示します。 **原文:** {text} **翻訳例:** - 原文: "Hello, how are you?" - 翻訳: "こんにちは、お元気ですか?" 上記のスタイルに従って翻訳してください。 """
改善点:
ConversationBufferWindowMemory
のサイズや使用方法を調整することで、コンテキストの一貫性を保ちつつ、メモリの負荷を軽減します。
# メモリの初期化を変更 memory = ConversationBufferWindowMemory(k=10, input_key='text', memory_key='chat_history') # 適切なk値に調整
改善点:
MEMORY_SIZE
を20から10に減らすことで、メモリの負荷を軽減しつつ、必要なコンテキストを保持。以下に、上記の改善点を反映した完全なスクリプトを示します。
import os import srt import asyncio from langchain.chat_models import ChatOpenAI from langchain.prompts import ChatPromptTemplate from langchain.chains import LLMChain from langchain.memory import ConversationBufferWindowMemory from srt import compose import nest_asyncio import subprocess from janome.tokenizer import Tokenizer import textwrap import re nest_asyncio.apply() # 定数 OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') MODEL_NAME = "gpt-4o-mini" # 正しいモデル名に修正 TEMPERATURE = 0.1 MEMORY_SIZE = 10 # メモリサイズを調整 MAX_LINE_LENGTH = 40 MAX_LINES = 2 # LLMの初期化 os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY llm = ChatOpenAI(model_name=MODEL_NAME, temperature=TEMPERATURE) template = """ あなたはプロの翻訳者であり、字幕翻訳の専門家です。以下の点を考慮して、自然で正確な日本語に翻訳してください。 1. 口語的な表現を適切に使用する 2. 文脈に合わせた適切な語彙を選択する 3. 字幕のニュアンスや感情を正確に伝える 4. 字幕のタイミングと長さを考慮し、簡潔かつ読みやすい表現にする 5. 前後の字幕の内容を考慮し、一貫性を保つ 6. 固有名詞や専門用語はそのまま、または適切に翻訳する 以下に原文と翻訳例を示します。 **原文:** {text} **翻訳例:** - 原文: "Hello, how are you?" - 翻訳: "こんにちは、お元気ですか?" 上記のスタイルに従って翻訳してください。 """ prompt = ChatPromptTemplate.from_template(template) # ConversationBufferWindowMemoryの初期化 memory = ConversationBufferWindowMemory(k=MEMORY_SIZE, input_key='text', memory_key='chat_history') # 翻訳チェインの定義 chain = LLMChain(llm=llm, prompt=prompt, memory=memory) def get_srt_files(): """現在のディレクトリ内の.srtファイルのリストを取得します。 Returns: list: 現在のディレクトリ内の.srtファイルのリスト。 """ return [f for f in os.listdir('.') if f.endswith('.srt')] def select_srt_file(): """ユーザーに.srtファイルを選択させるインターフェースを提供します。 Returns: str or None: ユーザーが選択した.srtファイルの名前。ファイルが存在しない場合はNoneを返します。 """ srt_files = get_srt_files() if not srt_files: print("No .srt files found in the current directory.") return None print("Available .srt files:") for i, file in enumerate(srt_files, 1): print(f"{i}. {file}") while True: try: choice = int(input("Enter the number of the file you want to use: ")) if 1 <= choice <= len(srt_files): return srt_files[choice - 1] else: print("Invalid choice. Please try again.") except ValueError: print("Invalid input. Please enter a number.") def read_srt(input_srt_file): """指定されたSRTファイルを読み込み、字幕データのリストを返します。 Args: input_srt_file (str): 読み込むSRTファイルのパス。 Returns: list: 読み込まれた字幕データのリスト。読み込みに失敗した場合は空のリストを返します。 """ try: with open(input_srt_file, 'r', encoding='utf-8', errors='ignore') as file: srt_data = file.read() return list(srt.parse(srt_data)) except Exception as e: print(f"Error reading SRT file: {e}") return [] async def translate_subtitles(subtitles): """字幕データを逐次的に翻訳し、メモリを更新します。 Args: subtitles (list): 翻訳する字幕データのリスト。 Returns: list: 翻訳された字幕データのリスト。 """ total_subtitles = len(subtitles) for idx, subtitle in enumerate(subtitles): # 現在の字幕をmemoryに追加 memory.save_context({"text": subtitle.content}, {"response": ""}) result = await chain.apredict(text=subtitle.content) subtitle.content = result.strip() # 翻訳結果をmemoryに追加 memory.save_context({"text": subtitle.content}, {"response": result.strip()}) progress = (idx + 1) / total_subtitles * 100 print(f"Translation progress: {progress:.2f}%") print(f"Current Memory: {memory.buffer}") return subtitles def write_srt(output_srt_file, subtitles): """翻訳された字幕データを指定されたSRTファイルに書き込みます。 Args: output_srt_file (str): 書き込むSRTファイルのパス。 subtitles (list): 書き込む翻訳された字幕データのリスト。 Raises: Exception: 書き込み中にエラーが発生した場合。 """ try: translated_srt_content = compose(subtitles) with open(output_srt_file, "w", encoding="utf-8") as f: f.write(translated_srt_content) except Exception as e: print(f"Error writing SRT file: {e}") def add_line_breaks(text): """テキストに適切な改行を追加します。 Args: text (str): 改行を追加するテキスト。 Returns: str: 改行が追加されたテキスト。 """ tokenizer = Tokenizer() tokens = tokenizer.tokenize(text, wakati=True) lines = [] current_line = "" for token in tokens: # 句読点で改行 if token in ['。', '!', '?']: current_line += token lines.append(current_line) current_line = "" else: if len(current_line) + len(token) > MAX_LINE_LENGTH: lines.append(current_line) current_line = token else: current_line += token if current_line: lines.append(current_line) # 不要な文字の削除を一括で行う cleaned_lines = [re.sub(r'[[]「」(){}‘’“”]', '', line) for line in lines] # 最大行数を制限 return '\n'.join(cleaned_lines[:MAX_LINES]) def add_line_breaks_to_srt(srt_text): """SRTファイルの字幕データに改行を追加します。 Args: srt_text (str): SRTファイルの字幕データ。 Returns: str: 改行が追加されたSRTファイルの字幕データ。 """ subtitles = list(srt.parse(srt_text)) for subtitle in subtitles: subtitle.content = add_line_breaks(subtitle.content) return compose(subtitles) async def process_srt_file(input_srt_file, output_srt_file): """SRTファイルを読み込み、翻訳、改行を追加し、新しいSRTファイルに書き込みます。 Args: input_srt_file (str): 読み込むSRTファイルのパス。 output_srt_file (str): 書き込む新しいSRTファイルのパス。 Returns: None """ subtitles = read_srt(input_srt_file) if not subtitles: print("No subtitles to translate.") return translated_subtitles = await translate_subtitles(subtitles) write_srt(output_srt_file, translated_subtitles) with open(output_srt_file, 'r', encoding='utf-8') as file: srt_text = file.read() result = add_line_breaks_to_srt(srt_text) with open("translated_file_result.srt", "w", encoding='utf-8') as file: file.write(result) def create_video_with_subtitles(video_file, subtitle_file, output_video_file): """字幕付きのビデオを作成します。 Args: video_file (str): 入力ビデオファイルのパス。 subtitle_file (str): 字幕ファイルのパス。 output_video_file (str): 出力ビデオファイルのパス。 Returns: None """ process = subprocess.Popen([ 'ffmpeg', '-hwaccel', 'auto', '-i', f"{video_file}", '-vf', f"subtitles={subtitle_file}:force_style='FontName=Helvetica,FontSize=11'", '-c:v', 'h264_amf', '-c:a', 'copy', '-progress', '-', f'{output_video_file}' ], stderr=subprocess.PIPE, universal_newlines=True) while True: line = process.stderr.readline() if line == '' and process.poll() is not None: break if 'frame=' in line: print(line.strip()) if process.returncode != 0: print(f"Error creating video: {process.returncode}") # 実行 if __name__ == "__main__": input_srt = select_srt_file() if input_srt: output_srt = 'translated_' + input_srt video = input_srt.replace(".srt", ".mp4") output_video = video.replace(".mp4", "_jp.mp4") asyncio.run(process_srt_file(input_srt, output_srt)) create_video_with_subtitles(video, "translated_file_result.srt", output_video) else: print("No file selected. Exiting.")
大量の字幕を翻訳する際、非同期処理を並列化することで処理速度を向上させることができます。以下は、translate_subtitles
関数を並列化する一例です。
async def translate_subtitles_parallel(subtitles): """字幕データを並列的に翻訳し、メモリを更新します。 Args: subtitles (list): 翻訳する字幕データのリスト。 Returns: list: 翻訳された字幕データのリスト。 """ tasks = [] for subtitle in subtitles: tasks.append(chain.apredict(text=subtitle.content)) results = await asyncio.gather(*tasks) for subtitle, result in zip(subtitles, results): subtitle.content = result.strip() return subtitles
注意点:
翻訳中にエラーが発生した場合にリトライする機能を追加することで、信頼性を向上させることができます。
async def translate_subtitles_with_retry(subtitles, max_retries=3): """字幕データを翻訳し、エラーが発生した場合はリトライします。 Args: subtitles (list): 翻訳する字幕データのリスト。 max_retries (int): リトライの最大回数。 Returns: list: 翻訳された字幕データのリスト。 """ total_subtitles = len(subtitles) for idx, subtitle in enumerate(subtitles): retries = 0 while retries < max_retries: try: memory.save_context({"text": subtitle.content}, {"response": ""}) result = await chain.apredict(text=subtitle.content) subtitle.content = result.strip() memory.save_context({"text": subtitle.content}, {"response": result.strip()}) break # 成功したらループを抜ける except Exception as e: retries += 1 print(f"Error translating subtitle {idx + 1}: {e}. Retrying ({retries}/{max_retries})...") if retries == max_retries: print(f"Failed to translate subtitle {idx + 1} after {max_retries} retries.") progress = (idx + 1) / total_subtitles * 100 print(f"Translation progress: {progress:.2f}%") print(f"Current Memory: {memory.buffer}") return subtitles
上記の改善点を実装することで、以下の利点が得られます。
これらの改善を適用することで、字幕翻訳の品質と効率を大幅に向上させることができます。さらにカスタマイズや最適化が必要な場合は、具体的な要件に応じて調整を行ってください。