wave librosa pyaudio

[TOC]

wave

wave 模块提供了一个处理 WAV 声音格式的便利接口。它不支持压缩/解压,但是支持单声道/立体声。

install

1
pip install wave

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 读取音频文件数据
def read_wav(audio_name):
with wave.open(audio_name, 'rb') as rf:
frames = []
frame = rf.readframes(3200)
while frame:
frames.append(frame)
frame = rf.readframes(3200)
return frames

# 保存音频文件数据
def write_wav(save_file, frames, CHANNELS=1, SIMPLE_SIZE=2, RATE=16000):
if save_file is not None:
wf = wave.open(save_file, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(SIMPLE_SIZE)
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()

wave可以读取和保存音频文件,但是不能做时频处理、特征提取等问题,如果你读取rate=16000的文件,保存为rate=8000的文件,音频的时长增加了一倍,播放速度降低了一倍。

参考:https://docs.python.org/zh-cn/3/library/wave.html#module-wave

Librosa

Librosa 是一个用于音频、音乐分析、处理的python工具包,一些常见的时频处理、特征提取、绘制声音图形等功能应有尽有,功能十分强大。

install

1
2
3
4
pip install librosa

# conda install
conda install -c conda-forge librosa

example

1
2
3
4
5
# 改变频谱并保存
def change_sample_rate(read_file, save_file, orig_sr=48000, target_sr=8000):
y, sr = librosa.load(read_file, sr=orig_sr)
y_16k = librosa.resample(y, sr, target_sr)
librosa.output.write_wav(save_file, y_16k, target_sr)

参考: http://librosa.github.io/librosa/tutorial.html

pyaudio

pyaudio是一个可以读取麦克风和音频文件和播放音频的Python模块。

install

1
pip install pyaudio

example

wave读取音频文件,pyaudio实现播放音频

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""PyAudio Example: Play a wave file."""

import pyaudio
import wave
import sys

CHUNK = 1024

if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
sys.exit(-1)

wf = wave.open(sys.argv[1], 'rb')

# instantiate PyAudio (1)
p = pyaudio.PyAudio()

# open stream (2)
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)

# read data
data = wf.readframes(CHUNK)

# play stream (3)
while len(data) > 0:
stream.write(data)
data = wf.readframes(CHUNK)

# stop stream (4)
stream.stop_stream()
stream.close()

# close PyAudio (5)
p.terminate()

读取麦克风并通过阿里语音识别API实时识别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class MyCallback(SpeechRecognizerCallback):

def __init__(self, name='default'):
self._name = name
self.completed = None
self.result = None

def on_started(self, message):
print('MyCallback.OnRecognitionStarted: %s' % message)

def on_result_changed(self, message):
self.result = message['payload']['result']
print(self.result)

def on_completed(self, message):
self.completed = {'status': message['header']['status'], 'file': self._name,
'task_id': message['header']['task_id'],
'result': message['payload']['result']}

def on_task_failed(self, message):
print('MyCallback.OnRecognitionTaskFailed: %s' % message)

def on_channel_closed(self):
print('MyCallback.OnRecognitionChannelClosed')

class Ali_Speech():
def __init__(self):
access_key_id = 'access_key_id'
access_key_secret = 'access_key_secret'
self.token, _ = ali_speech.NlsClient.create_token(access_key_id, access_key_secret)
self.client = ali_speech.NlsClient()
self.client.set_log_level('INFO')
self.callback = MyCallback()

self.CHUNK = 8092
self.FORMAT = 8
self.CHANNELS = 1
self.RATE = 16000

def start(self):
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=self.FORMAT, channels=self.CHANNELS,
rate=self.RATE, input=True, frames_per_buffer=self.CHUNK)

def stop(self):
self.stream.stop_stream()
self.stream.close()
self.p.terminate()

def ali_api(self, record_seconds=60, wave_save_path=None):
self.recognizer = self.client.create_recognizer(self.callback)
self.recognizer.set_appkey("set_appkey")
self.recognizer.set_token(self.token)
self.recognizer.set_format(ASRFormat.PCM)
self.recognizer.set_sample_rate(ASRSampleRate.SAMPLE_RATE_16K)
self.recognizer.set_enable_intermediate_result(True)
self.recognizer.set_enable_punctuation_prediction(True)
self.recognizer.set_enable_inverse_text_normalization(True)

RECORD_SECONDS = record_seconds
try:
ret = self.recognizer.start()
if ret < 0:
return ret
for i in range(0, int(self.RATE / self.CHUNK * RECORD_SECONDS)):
data = self.stream.read(self.CHUNK)
ret = self.recognizer.send(data)
if ret < 0:
break
self.recognizer.stop()
res = self.callback.completed
return res
except Exception as e:
print(str(e))
finally:
self.recognizer.close()