[TOC]
wave wave
模块提供了一个处理 WAV 声音格式的便利接口。它不支持压缩/解压,但是支持单声道/立体声。
install
example 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def read_wav (audio_name ): with wave.open (audio_name, 'rb' ) as rf: frames = [] frame = rf.readframes(3200 ) while frame: frames.append(frame) frame = rf.readframes(3200 ) return frames def write_wav (save_file, frames, CHANNELS=1 , SIMPLE_SIZE=2 , RATE=16000 ): if save_file is not None : wf = wave.open (save_file, 'wb' ) wf.setnchannels(CHANNELS) wf.setsampwidth(SIMPLE_SIZE) wf.setframerate(RATE) wf.writeframes(b'' .join(frames)) wf.close()
wave可以读取和保存音频文件,但是不能做时频处理、特征提取等问题,如果你读取rate=16000的文件,保存为rate=8000的文件,音频的时长增加了一倍,播放速度降低了一倍。
参考:https://docs.python.org/zh-cn/3/library/wave.html#module-wave
Librosa Librosa 是一个用于音频、音乐分析、处理的python工具包,一些常见的时频处理、特征提取、绘制声音图形等功能应有尽有,功能十分强大。
install 1 2 3 4 pip install librosa # conda install conda install -c conda-forge librosa
example 1 2 3 4 5 # 改变频谱并保存 def change_sample_rate(read_file, save_file, orig_sr=48000, target_sr=8000): y, sr = librosa.load(read_file, sr=orig_sr) y_16k = librosa.resample(y, sr, target_sr) librosa.output.write_wav(save_file, y_16k, target_sr)
参考: http://librosa.github.io/librosa/tutorial.html
pyaudio pyaudio是一个可以读取麦克风和音频文件和播放音频的Python模块。
install
example wave读取音频文件,pyaudio实现播放音频
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 """PyAudio Example: Play a wave file.""" import pyaudio import wave import sys CHUNK = 1024 if len(sys.argv) < 2: print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0]) sys.exit(-1) wf = wave.open(sys.argv[1], 'rb') # instantiate PyAudio (1) p = pyaudio.PyAudio() # open stream (2) stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # read data data = wf.readframes(CHUNK) # play stream (3) while len(data) > 0: stream.write(data) data = wf.readframes(CHUNK) # stop stream (4) stream.stop_stream() stream.close() # close PyAudio (5) p.terminate()
读取麦克风并通过阿里语音识别API实时识别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class MyCallback(SpeechRecognizerCallback): def __init__(self, name='default'): self._name = name self.completed = None self.result = None def on_started(self, message): print('MyCallback.OnRecognitionStarted: %s' % message) def on_result_changed(self, message): self.result = message['payload']['result'] print(self.result) def on_completed(self, message): self.completed = {'status': message['header']['status'], 'file': self._name, 'task_id': message['header']['task_id'], 'result': message['payload']['result']} def on_task_failed(self, message): print('MyCallback.OnRecognitionTaskFailed: %s' % message) def on_channel_closed(self): print('MyCallback.OnRecognitionChannelClosed') class Ali_Speech(): def __init__(self): access_key_id = 'access_key_id' access_key_secret = 'access_key_secret' self.token, _ = ali_speech.NlsClient.create_token(access_key_id, access_key_secret) self.client = ali_speech.NlsClient() self.client.set_log_level('INFO') self.callback = MyCallback() self.CHUNK = 8092 self.FORMAT = 8 self.CHANNELS = 1 self.RATE = 16000 def start(self): self.p = pyaudio.PyAudio() self.stream = self.p.open(format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK) def stop(self): self.stream.stop_stream() self.stream.close() self.p.terminate() def ali_api(self, record_seconds=60, wave_save_path=None): self.recognizer = self.client.create_recognizer(self.callback) self.recognizer.set_appkey("set_appkey") self.recognizer.set_token(self.token) self.recognizer.set_format(ASRFormat.PCM) self.recognizer.set_sample_rate(ASRSampleRate.SAMPLE_RATE_16K) self.recognizer.set_enable_intermediate_result(True) self.recognizer.set_enable_punctuation_prediction(True) self.recognizer.set_enable_inverse_text_normalization(True) RECORD_SECONDS = record_seconds try: ret = self.recognizer.start() if ret < 0: return ret for i in range(0, int(self.RATE / self.CHUNK * RECORD_SECONDS)): data = self.stream.read(self.CHUNK) ret = self.recognizer.send(data) if ret < 0: break self.recognizer.stop() res = self.callback.completed return res except Exception as e: print(str(e)) finally: self.recognizer.close()