関連記事
はじめに
この記事では、議事録作成システムを開発していきます。1時間以上の会議を想定し、会議の音声データを文字起こし、議事録作成まで説明していきます。GoogleのSpeech to TextやOpenAIのAPIを活用し、実装します。
1つの記事に纏めるのは難しいので、記事を何回かに分けたいと思います。開発の流れは以下のようになります。
- 音声データの分割
- 分割した音声データを文字起こし
- 文字起こしした文章から議事録を作成
今回は1. 音声データの分割について説明していきます。
音声データの分割
今回は1時間以上の長い会議を想定しています。そのため、音声データを一気に文字起こししてしまうと処理が重くなってしまいます。そこで、音声データを分割し、処理しやすい長さにします。
また、GoogleのSpeech to Textで文字起こしする際に、1分以上の音声データに対しては非同期処理を行う必要があります。GoogleのSpeech to TextのAPIで非同期処理を行うには、Google Cloud Storage (GCS)に分割したファイルをアップロードする必要があります。
音声データの分割の流れは以下の通りです。
- ステレオ音声からモノラル音声に変更
- 音声ファイルを無音部分で分割
- GCSにアップロード
コード全体を先に示しておきます。全体像を知りたい方はご覧ください。
コード全体
import os
from google.cloud import storage
from pydub import AudioSegment
from pydub.silence import split_on_silence
def convert_and_save_audio(chunk, output):
# ステレオ音声からモノラル音声に変換
# 音声データのサンプリングレートを16000Hzに変換
chunk = chunk.set_channels(1)
chunk = chunk.set_frame_rate(16000)
chunk.export(output, format="wav")
def split_audio(file_path, work_dir, max_file_size, min_silence_len, keep_silence, silence_thresh):
# 音声ファイルを分割
"""
file_path (str): 音声ファイルのパス
work_dir (str): 分割された音声ファイルの出力先
max_file_size (int): 出力される音声ファイルの最大サイズ
min_silence_len (int): 無音として識別される最小の持続時間
keep_silence (int): 音声が無音部分で分割された後、各チャンクの先頭または末尾に保持する無音の持続時間
silence_thresh (int): 無音として識別される音量の閾値
"""
sound = AudioSegment.from_file(file_path, format="wav")
total_length = len(sound)
total_size = os.path.getsize(file_path)
max_length = total_length * (max_file_size / total_size) # ファイルサイズと時間から、分割する最大時間を取得する
# 無音部分をカットして分割
chunks = split_on_silence(
sound,
min_silence_len=min_silence_len,
silence_thresh=silence_thresh,
keep_silence=keep_silence,
)
# 分割したチャンクをmax_lengthごとに結合
current_chunk = None
for i, c in enumerate(chunks):
if current_chunk is None:
current_chunk = c
continue
temp_chunk = current_chunk + c
outFilePath = f"{work_dir}/audio_part_{i + 1}.wav"
if len(temp_chunk) > max_length:
convert_and_save_audio(current_chunk, outFilePath)
current_chunk = c
else:
if i == len(chunks) - 1:
convert_and_save_audio(temp_chunk, outFilePath)
else:
current_chunk += c
def upload_to_bucket(bucket_name, work_dir):
# 音声ファイルをGCSにアップロード
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
for filename in os.listdir(work_dir):
blob = bucket.blob(filename)
blob.upload_from_filename(os.path.join(work_dir, filename))
print("Files uploaded to {}.".format(bucket_name))
if __name__ == "__main__":
api_key_path = "api_key_path"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = api_key_path
file_path = "file_path"
work_dir = "work_dir"
os.makedirs(work_dir, exist_ok=True)
max_file_size = 50 * 1000 * 1000 * 0.9 # 50MBぐらいの音声ファイルに分割
min_silence_len = 500
keep_silence = 500
silence_thresh = -90
split_audio(file_path, work_dir, max_file_size, min_silence_len, keep_silence, silence_thresh)
upload_to_bucket("bucket_name", work_dir)
1. ステレオ音声からモノラル音声に変更
ステレオ音声からモノラル音声に変換しないと、その後の文字起こしフェーズでエラーが出てしまいます。そのため、ステレオ音声からモノラル音声に変換します。
以下の関数でこの操作を行っています。
def convert_and_save_audio(chunk, output):
# ステレオ音声からモノラル音声に変換
# 音声データのサンプリングレートを16000Hzに変換
chunk = chunk.set_channels(1)
chunk = chunk.set_frame_rate(16000)
chunk.export(output, format="wav")
サンプリングレートは今回16000Hzにしています。
※補足
文字起こししたいファイルがMP4の場合、GoogleのSpeech to Textでは文字起こしができません。wav形式に変換する必要があります。はじめにWAV形式に変換してから作業を進めてください。
変換する場合は以下のコードで行うことができます。
file_path = "file_path "
output_path = "output_path"
audio = AudioSegment.from_file(file_path, format="mp4")
audio.export(output_path, format="wav")
2. 音声ファイルを無音部分で分割
音声データの分割は、長い音声ファイルを処理しやすいサイズに分割する重要なステップです。無音部分で音声を分割することで、自然な区切りで音声を分割することができ、後続の処理が効率的に行えます。
以下の関数でこの操作を行っています。
def split_audio(file_path, work_dir, max_file_size, min_silence_len, keep_silence, silence_thresh):
# 音声ファイルを分割
"""
file_path (str): 音声ファイルのパス
work_dir (str): 分割された音声ファイルの出力先
max_file_size (int): 出力される音声ファイルの最大サイズ
min_silence_len (int): 無音として識別される最小の持続時間
keep_silence (int): 音声が無音部分で分割された後、各チャンクの先頭または末尾に保持する無音の持続時間
silence_thresh (int): 無音として識別される音量の閾値
"""
sound = AudioSegment.from_file(file_path, format="wav")
total_length = len(sound)
total_size = os.path.getsize(file_path)
max_length = total_length * (max_file_size / total_size) # ファイルサイズと時間から、分割する最大時間を取得する
# 無音部分をカットして分割
chunks = split_on_silence(
sound,
min_silence_len=min_silence_len,
silence_thresh=silence_thresh,
keep_silence=keep_silence,
)
# 分割したチャンクをmax_lengthごとに結合
current_chunk = None
for i, c in enumerate(chunks):
if current_chunk is None:
current_chunk = c
continue
temp_chunk = current_chunk + c
outFilePath = f"{work_dir}/audio_part_{i + 1}.wav"
if len(temp_chunk) > max_length:
convert_and_save_audio(current_chunk, outFilePath)
current_chunk = c
else:
if i == len(chunks) - 1:
convert_and_save_audio(temp_chunk, outFilePath)
else:
current_chunk += c
split_on_silence
関数を使用して音声ファイルを無音部分で分割しています。
3. GCSにアップロード
GCSにアップロードするにはあらかじめバケットを作成する必要があります。以下のサイトを参考に、バケットを作成してください。
分割された音声ファイルをGCSにアップロードすることで、GoogleのSpeech-to-Text APIを使用して非同期に音声を文字に変換できます。これにより、大量の音声データを効率的に処理することが可能となります。
以下の関数でこの操作を行っています。
def upload_to_bucket(bucket_name, work_dir):
# 音声ファイルをGCSにアップロード
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
for filename in os.listdir(work_dir):
blob = bucket.blob(filename)
blob.upload_from_filename(os.path.join(work_dir, filename))
print("Files uploaded to {}.".format(bucket_name))
bucket_name
はアップロードするGCSのバケット名で、work_dir
は分割された音声ファイルが保存されているディレクトリのパスです。
まとめ
本記事では、議事録作成システムの開発における「音声データの分割」に焦点をあてて説明しました。1時間以上の長い会議の音声データを効率的に取り扱うため、分割のプロセスは不可欠です。次回はGoogleのSpeech to Textを使った文字起こしについて説明したいと思います。
参考