問題

import os
import srt
import asyncio
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain
from langchain.memory import ConversationBufferWindowMemory
from srt import compose
import nest_asyncio
import subprocess
from janome.tokenizer import Tokenizer
import textwrap
import re

nest_asyncio.apply()

# 定数
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
MODEL_NAME = "gpt-4o-mini"
TEMPERATURE = 0.1
MEMORY_SIZE = 20
MAX_LINE_LENGTH = 40
MAX_LINES = 2

# LLMの初期化
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
llm = ChatOpenAI(model_name=MODEL_NAME, temperature=TEMPERATURE)


template = """あなたはプロの翻訳者であり、字幕翻訳の専門家です。以下の点を考慮して、自然で正確な日本語に翻訳してください。
* 口語的な表現を適切に使用する
* 文脈に合わせた適切な語彙を選択する
* 字幕のニュアンスや感情を正確に伝える
* 字幕のタイミングと長さを考慮し、簡潔かつ読みやすい表現にする
* 前後の字幕の内容を考慮し、一貫性を保つ
* 固有名詞や専門用語はそのまま、または適切に翻訳する

原文:
{text}
"""
prompt = ChatPromptTemplate.from_template(template)

# ConversationBufferWindowMemoryの初期化
memory = ConversationBufferWindowMemory(k=MEMORY_SIZE, input_key='text', memory_key='chat_history')

# 翻訳チェインの定義
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)

def get_srt_files():
"""現在のディレクトリ内の.srtファイルのリストを取得します。

この関数は、現在の作業ディレクトリに存在するすべてのSRTファイルを検索し、
そのファイル名のリストを返します。ファイル名は拡張子が.srtである必要があります。

Returns:
list: 現在のディレクトリ内の.srtファイルのリスト。
"""
return [f for f in os.listdir('.') if f.endswith('.srt')]

def select_srt_file():
"""ユーザーに.srtファイルを選択させるインターフェースを提供します。

この関数は、利用可能なSRTファイルのリストを表示し、ユーザーに
その中から1つを選択させます。選択されたファイル名を返します。

Returns:
str or None: ユーザーが選択した.srtファイルの名前。ファイルが存在しない場合はNoneを返します。
"""
srt_files = get_srt_files()
if not srt_files:
print("No .srt files found in the current directory.")
return None

print("Available .srt files:")
for i, file in enumerate(srt_files, 1):
print(f"{i}. {file}")

while True:
try:
choice = int(input("Enter the number of the file you want to use: "))
if 1 <= choice <= len(srt_files):
return srt_files[choice - 1]
else:
print("Invalid choice. Please try again.")
except ValueError:
print("Invalid input. Please enter a number.")

def read_srt(input_srt_file):
"""指定されたSRTファイルを読み込み、字幕データのリストを返します。

Args:
input_srt_file (str): 読み込むSRTファイルのパス。

Returns:
list: 読み込まれた字幕データのリスト。読み込みに失敗した場合は空のリストを返します。
"""
try:
with open(input_srt_file, 'r', encoding='utf-8', errors='ignore') as file:
srt_data = file.read()
return list(srt.parse(srt_data))
except Exception as e:
print(f"Error reading SRT file: {e}")
return []

async def translate_subtitles(subtitles):
"""字幕データを逐次的に翻訳し、メモリを更新します。

Args:
subtitles (list): 翻訳する字幕データのリスト。

Returns:
list: 翻訳された字幕データのリスト。
"""
total_subtitles = len(subtitles)
for idx, subtitle in enumerate(subtitles):
# 現在の字幕をmemoryに追加
memory.save_context({"text": subtitle.content}, {"response": ""})

result = await chain.apredict(text=subtitle.content)
subtitle.content = result.strip()

# 翻訳結果をmemoryに追加
memory.save_context({"text": subtitle.content}, {"response": result.strip()})

progress = (idx + 1) / total_subtitles * 100
print(f"Translation progress: {progress:.2f}%")
print(f"Current Memory: {memory.buffer}")
return subtitles

def write_srt(output_srt_file, subtitles):
"""翻訳された字幕データを指定されたSRTファイルに書き込みます。

Args:
output_srt_file (str): 書き込むSRTファイルのパス。
subtitles (list): 書き込む翻訳された字幕データのリスト。

Raises:
Exception: 書き込み中にエラーが発生した場合。
"""
try:
translated_srt_content = compose(subtitles)
with open(output_srt_file, "w", encoding="utf-8") as f:
f.write(translated_srt_content)
except Exception as e:
print(f"Error writing SRT file: {e}")

def add_line_breaks(text):
"""テキストに適切な改行を追加します。

Args:
text (str): 改行を追加するテキスト。

Returns:
str: 改行が追加されたテキスト。
"""
tokenizer = Tokenizer()
tokens = tokenizer.tokenize(text, wakati=True)
lines = []
current_line = ""
for token in tokens:
# 句読点で改行
if re.match(r'[。!?]', token):
current_line += token
lines.append(current_line)
current_line = ""
elif len(current_line) + len(token) <= MAX_LINE_LENGTH:
current_line += token
else:
lines.append(current_line)
current_line = token
if current_line:
lines.append(current_line)

lines = [re.sub(r'[$$$$[]「」$$()\{\}{}‘’“”]', '', line) for line in lines]

return '\n'.join(lines[:MAX_LINES])

def add_line_breaks_to_srt(srt_text):
"""SRTファイルの字幕データに改行を追加します。

Args:
srt_text (str): SRTファイルの字幕データ。

Returns:
str: 改行が追加されたSRTファイルの字幕データ。
"""
subtitles = list(srt.parse(srt_text))
for subtitle in subtitles:
subtitle.content = add_line_breaks(subtitle.content)
return compose(subtitles)

async def process_srt_file(input_srt_file, output_srt_file):
"""SRTファイルを読み込み、翻訳、改行を追加し、新しいSRTファイルに書き込みます。

Args:
input_srt_file (str): 読み込むSRTファイルのパス。
output_srt_file (str): 書き込む新しいSRTファイルのパス。

Returns:
None
"""
subtitles = read_srt(input_srt_file)
if not subtitles:
print("No subtitles to translate.")
return
translated_subtitles = await translate_subtitles(subtitles)
write_srt(output_srt_file, translated_subtitles)

with open(output_srt_file, 'r', encoding='utf-8') as file:
srt_text = file.read()

result = add_line_breaks_to_srt(srt_text)

with open("translated_file_result.srt", "w", encoding='utf-8') as file:
file.write(result)

def create_video_with_subtitles(video_file, subtitle_file, output_video_file):
"""字幕付きのビデオを作成します。

Args:
video_file (str): 入力ビデオファイルのパス。
subtitle_file (str): 字幕ファイルのパス。
output_video_file (str): 出力ビデオファイルのパス。

Returns:
None
"""
process = subprocess.Popen([
'ffmpeg',
'-hwaccel', 'auto',
'-i', f"{video_file}",
'-vf', f"subtitles={subtitle_file}:force_style='FontName=Helvetica,FontSize=11'",
'-c:v', 'h264_amf',
'-c:a', 'copy',
'-progress', '-',
f'{output_video_file}'
], stderr=subprocess.PIPE, universal_newlines=True)

while True:
line = process.stderr.readline()
if line == '' and process.poll() is not None:
break
if 'frame=' in line:
print(line.strip())

if process.returncode != 0:
print(f"Error creating video: {process.returncode}")

# 実行
if __name__ == "__main__":
input_srt = select_srt_file()
if input_srt:
output_srt = 'translated_' + input_srt
video = input_srt.replace(".srt", ".mp4")
output_video = video.replace(".mp4", "_jp.mp4")
asyncio.run(process_srt_file(input_srt, output_srt))
create_video_with_subtitles(video, "translated_file_result.srt", output_video)
else:
print("No file selected. Exiting.")

改行関数を高性能にしたい
翻訳精度を更に高性能にしたい
モデルは現状のままお願いします

回答

字幕の精度向上を模索
SRTファイルを処理し、翻訳後の字幕を動画に組み込むPythonスクリプトを改善中。改行機能の性能向上や、翻訳精度の更なる向上を目指す。

改善の提案

add_line_breaksのパフォーマンス向上や翻訳精度向上のための最適化を提案中だよ。

回答

改行関数の高性能化と翻訳精度の向上について

以下では、add_line_breaks 関数のパフォーマンス向上と翻訳精度の改善方法について説明し、具体的なコードの改善案を提供します。


1. 改行関数 (add_line_breaks) の高性能化

現状の課題:

  • 現在の add_line_breaks 関数では、Janome トークナイザーを使用してトークン化し、ループ内で正規表現を多用しています。これにより、特に大量の字幕データを処理する際にパフォーマンスが低下する可能性があります。

改善案:

  • 正規表現の使用を最小限に抑え、トークン化と改行処理を効率化します。
  • 句読点での改行を優先しつつ、行の長さ制限を適用するロジックを簡素化します。
  • re.sub を一度だけ使用し、不要な文字の除去を効率化します。

改善後の add_line_breaks 関数:

def add_line_breaks(text): """テキストに適切な改行を追加します。 Args: text (str): 改行を追加するテキスト。 Returns: str: 改行が追加されたテキスト。 """ tokenizer = Tokenizer() tokens = tokenizer.tokenize(text, wakati=True) lines = [] current_line = "" punctuation = {'。', '!', '?'} for token in tokens: current_line += token if token in punctuation or len(current_line) >= MAX_LINE_LENGTH: lines.append(current_line) current_line = "" if current_line: lines.append(current_line) # 不要な文字の一括置換 cleaned_lines = [re.sub(r'[[]「」(){}‘’“”]', '', line) for line in lines] return '\n'.join(cleaned_lines[:MAX_LINES])

主な変更点:

  • 句読点をセットにして高速なメンバーシップチェックを使用。
  • 行の長さ制限に達した場合に即座に改行。
  • 不要な文字の除去をリスト内包表記で一括処理。

2. 翻訳精度の向上

現状の課題:

  • 翻訳精度を向上させるためには、モデルへの指示(プロンプト)の最適化が重要です。
  • メモリ管理や文脈の保持方法も翻訳品質に影響を与えます。

改善案:

  • プロンプトをより具体的にし、翻訳の品質を高めるための追加指示を加えます。
  • メモリの管理方法を見直し、前後のコンテキストをより適切に保持します。
  • 必要に応じて、翻訳結果の後処理(例: 文法チェックやスタイル調整)を追加します。

改善後のプロンプトテンプレート:

template = """あなたはプロの翻訳者であり、字幕翻訳の専門家です。以下の点を厳守し、自然で正確な日本語に翻訳してください。 * 口語的な表現を適切に使用する * 文脈に合わせた適切な語彙を選択する * 字幕のニュアンスや感情を正確に伝える * 字幕のタイミングと長さを考慮し、簡潔かつ読みやすい表現にする * 前後の字幕の内容を考慮し、一貫性を保つ * 固有名詞や専門用語はそのまま、または適切に翻訳する * 翻訳後のテキストが自然で流暢な日本語になるように調整する 原文: {text} """

メモリ管理の改善:

  • ConversationBufferWindowMemoryk パラメータを調整し、適切なコンテキストを保持。
  • 翻訳前後のメモリ保存を明確に区別し、不要な情報をメモリに残さないようにする。

改善後の translate_subtitles 関数:

async def translate_subtitles(subtitles): """字幕データを逐次的に翻訳し、メモリを更新します。 Args: subtitles (list): 翻訳する字幕データのリスト。 Returns: list: 翻訳された字幕データのリスト。 """ total_subtitles = len(subtitles) for idx, subtitle in enumerate(subtitles): # 現在の字幕をmemoryに追加(原文) memory.save_context({"text": subtitle.content}, {"response": ""}) # 翻訳を実行 result = await chain.apredict(text=subtitle.content) translated_text = result.strip() # 翻訳結果をsubtitleに設定 subtitle.content = translated_text # 翻訳結果をmemoryに追加(訳文) memory.save_context({"text": translated_text}, {"response": ""}) progress = (idx + 1) / total_subtitles * 100 print(f"Translation progress: {progress:.2f}%") print(f"Current Memory: {memory.buffer}") return subtitles

主な変更点:

  • プロンプトを強化し、翻訳後のテキストの自然さを強調。
  • メモリへの保存を翻訳前と翻訳後で明確に区別し、必要なコンテキストのみを保持。

追加の改善提案

  1. 非同期処理の最適化:

    • translate_subtitles 関数内での非同期呼び出しを最適化し、必要に応じてバッチ処理を検討。
  2. エラーハンドリングの強化:

    • 翻訳中に発生する可能性のあるエラーに対して、リトライ機構を追加。
  3. ロギングの導入:

    • 処理の進行状況やエラー情報を詳細に記録するために、logging モジュールを使用。
  4. パラメータの調整:

    • MEMORY_SIZEMAX_LINE_LENGTH などのパラメータを適宜調整し、最適な値を見つける。

最終的な改善後のコード例

以下に、上述の改善点を反映した最終的なコードの一部を示します。

import os import srt import asyncio from langchain.chat_models import ChatOpenAI from langchain.prompts import ChatPromptTemplate from langchain.chains import LLMChain from langchain.memory import ConversationBufferWindowMemory from srt import compose import nest_asyncio import subprocess from janome.tokenizer import Tokenizer import re import logging nest_asyncio.apply() # ログ設定 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 定数 OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') MODEL_NAME = "gpt-4o-mini" TEMPERATURE = 0.1 MEMORY_SIZE = 20 MAX_LINE_LENGTH = 40 MAX_LINES = 2 # LLMの初期化 os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY llm = ChatOpenAI(model_name=MODEL_NAME, temperature=TEMPERATURE) template = """あなたはプロの翻訳者であり、字幕翻訳の専門家です。以下の点を厳守し、自然で正確な日本語に翻訳してください。 * 口語的な表現を適切に使用する * 文脈に合わせた適切な語彙を選択する * 字幕のニュアンスや感情を正確に伝える * 字幕のタイミングと長さを考慮し、簡潔かつ読みやすい表現にする * 前後の字幕の内容を考慮し、一貫性を保つ * 固有名詞や専門用語はそのまま、または適切に翻訳する * 翻訳後のテキストが自然で流暢な日本語になるように調整する 原文: {text} """ prompt = ChatPromptTemplate.from_template(template) # ConversationBufferWindowMemoryの初期化 memory = ConversationBufferWindowMemory(k=MEMORY_SIZE, input_key='text', memory_key='chat_history') # 翻訳チェインの定義 chain = LLMChain(llm=llm, prompt=prompt, memory=memory) # ...(その他の関数は前述の通り)... async def translate_subtitles(subtitles): """字幕データを逐次的に翻訳し、メモリを更新します。 Args: subtitles (list): 翻訳する字幕データのリスト。 Returns: list: 翻訳された字幕データのリスト。 """ total_subtitles = len(subtitles) for idx, subtitle in enumerate(subtitles): try: # 現在の字幕をmemoryに追加(原文) memory.save_context({"text": subtitle.content}, {"response": ""}) # 翻訳を実行 result = await chain.apredict(text=subtitle.content) translated_text = result.strip() # 翻訳結果をsubtitleに設定 subtitle.content = translated_text # 翻訳結果をmemoryに追加(訳文) memory.save_context({"text": translated_text}, {"response": ""}) progress = (idx + 1) / total_subtitles * 100 logging.info(f"Translation progress: {progress:.2f}%") except Exception as e: logging.error(f"Error translating subtitle {subtitle.index}: {e}") return subtitles # ...(メイン実行部分も前述の通り)...

主な追加点:

  • ログ機能を導入し、処理状況やエラーを詳細に記録。
  • 翻訳処理内でのエラーハンドリングを強化。

これらの改善により、改行処理のパフォーマンスが向上し、翻訳の品質も高まることが期待されます。必要に応じて、さらに細かい調整や追加の最適化を行ってください。

分享這個問答