网站平台都有哪些,html网页模板之家,杭州网站优化企业,包图网登录入口转载请注明出处#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你#xff0c;欢迎[点赞、收藏、关注]哦~ 目录
整体思路
部署操作
扩展方向 亲测可行#xff0c;教程描述较为简单#xff0c;仅做记录。 整体思路 Termux#xff08;Android 原生层…转载请注明出处小锋学长生活大爆炸[xfxuezhagn.cn]如果本文帮助到了你欢迎[点赞、收藏、关注]哦~目录整体思路部署操作扩展方向亲测可行教程描述较为简单仅做记录。整体思路TermuxAndroid 原生层用termux-microphone-record真正调用手机麦克风录音。Termux 启一个 TCP 服务端收到请求就录音dur秒然后把录音文件 bytes 返回给客户端。Ubuntuproot作为客户端去请求录音拿到 bytes 后先落盘再用 ffmpeg 解码成 16k 单声道 float32 PCM然后送进 YAMNet 进行声音类型的判断。推理库这里用ai-edge-litert的InterpreterLiteRT它是官方的移动/端侧 TFLite 推理方案之一且不少平台上用它替代tflite-runtime。效果如图部署操作1、在安卓手机中安装F-Droid app。https://f-droid.org/zh_Hans/2、在F-Droid中搜索和安装Termux和Termux API两个app。3、把Termux API的所有权限都给了尤其是存储和麦克风权限再给一下“自启动”和电池白名单不然无法被Termux调用。对于Termux app也要加给一下“自启动”和电池白名单4、打开Termux执行以下命令获取存储权限。执行完在手机上点允许访问存储(若有)termux-setup-storage5、创建一个目录后面会用mkdir storage/crybuf6、输入以下命令安装包pkg update -y pkg install -y python termux-api7、输入以下命令安装ubuntu系统因为一些包比如ai-edge-litert没法在Termux内置环境中装pkg install wget openssl-tool proot -y hash -r wget https://raw.githubusercontent.com/EXALAB/AnLinux-Resources/master/Scripts/Installer/Ubuntu/ubuntu.sh bash ubuntu.sh8、先不进入ubuntu直接在外面新建一个mic_server.py文件#!/data/data/com.termux/files/usr/bin/python import base64, json, os, time, subprocess, socketserver from datetime import datetime HOST 127.0.0.1 PORT 8765 MAX_DUR 30.0 TMPDIR os.path.join(os.path.expanduser(~), .mic_tmp) os.makedirs(TMPDIR, exist_okTrue) def log(msg: str): ts datetime.now().strftime(%H:%M:%S.%f)[:-3] print(f[{ts}] {msg}, flushTrue) def run(cmd): return subprocess.run(cmd, stdoutsubprocess.PIPE, stderrsubprocess.PIPE, textTrue) def mic_info(): p run([termux-microphone-record, -i]) if p.returncode ! 0: raise RuntimeError(p.stderr.strip() or termux-microphone-record -i 执行失败) try: return json.loads(p.stdout) except Exception: raise RuntimeError(f无法解析 -i 输出{p.stdout[:200]}) def mic_quit(): run([termux-microphone-record, -q]) def mic_record_to_file(path: str, dur: float): log(f开始录音时长{dur} 秒 - {path}) p run([termux-microphone-record, -f, path, -l, str(dur)]) if p.stderr.strip(): log(f录音命令 stderr{p.stderr.strip()}) deadline time.time() dur 10.0 while time.time() deadline: info mic_info() if not info.get(isRecording, False): break time.sleep(0.05) log(录音结束) # 给系统一点时间写尾/flush避免产生半成品 3gp time.sleep(0.25) class Handler(socketserver.StreamRequestHandler): def handle(self): peer f{self.client_address[0]}:{self.client_address[1]} line self.rfile.readline().decode(utf-8, ignore).strip() log(**50) if not line: log(f收到空请求来自 {peer}) return log(f收到请求来自 {peer}内容{line[:200]}) try: req json.loads(line) dur float(req.get(dur, 1.0)) dur int(round(dur)) dur max(1, min(dur, int(MAX_DUR))) try: info mic_info() if info.get(isRecording, False): log(检测到正在录音先停止上一段-q) mic_quit() # 等到 isRecordingfalse deadline time.time() 5.0 while time.time() deadline: if not mic_info().get(isRecording, False): break time.sleep(0.05) except Exception as e: log(f录音状态查询失败继续尝试录音{e}) fn fseg_{int(time.time()*1000)}.3gp path os.path.join(TMPDIR, fn) if os.path.exists(path): os.remove(path) mic_record_to_file(path, dur) if (not os.path.exists(path)) or os.path.getsize(path) 64: raise RuntimeError(录音文件不存在或过小可能录音失败/权限问题) size os.path.getsize(path) log(f录音文件已生成大小{size} 字节) with open(path, rb) as f: data f.read() # 不保留缓存文件 os.remove(path) b64 base64.b64encode(data).decode(ascii) log(f准备返回录音数据bytes{len(data)}base64长度{len(b64)}) resp {ok: True, format: 3gp, dur: dur, b64: b64} except Exception as e: log(f处理请求出错{e}) resp {ok: False, err: str(e)} self.wfile.write((json.dumps(resp) \n).encode(utf-8)) log(f已返回响应给 {peer}ok{resp.get(ok)}) class Server(socketserver.TCPServer): allow_reuse_address True if __name__ __main__: log(f麦克风服务端启动监听 {HOST}:{PORT}) log(f临时目录{TMPDIR}) with Server((HOST, PORT), Handler) as srv: srv.serve_forever()9、修改以下文件把注释取消vi ./start-ubuntu.sh10、然后执行以下命令进入ubuntu系统./start-ubuntu.sh11、接下来在ubuntu系统中安装依赖apt update apt install -y python3 python3-venv python3-pip ffmpeg unzip12、创建虚拟环境python3 -m venv ~/cry source ~/cry/bin/activate pip install -U pip pip install numpy ai-edge-litert myprintx requests13、下载模型相关内容mkdir -p ~/yamnet cd ~/yamnet wget https://tfhub.dev/google/lite-model/yamnet/classification/tflite/1?lite-formattflite -O yamnet_pkg unzip yamnet_pkg wget -O labels.txt https://github.com/tensorflow/tflite-support/raw/master/tensorflow_lite_support/metadata/python/tests/testdata/audio_classifier/yamnet_521_labels.txt python -c from ai_edge_litert.interpreter import Interpreter; print(LiteRT OK)模型权重在这里手动下载然后放到yamnet目录下https://huggingface.co/thelou1s/yamnet/blob/main/lite-model_yamnet_classification_tflite_1.tflite为了简单起见可以给模型改个名字最终目录如下14、接下来在ubuntu系统中新建app.py文件import argparse import base64 import json import socket import subprocess import time from pathlib import Path import tempfile import os import numpy as np from ai_edge_litert.interpreter import Interpreter import myprintx myprintx.patch_color() # -------------------- Termux 录音服务端客户端 -------------------- def request_3gp_from_termux(dur: float, host127.0.0.1, port8765, timeout30) - bytes: 向 Termux 录音服务端请求录音返回完整 3gp 文件的 bytes。 req {dur: float(dur)} s socket.create_connection((host, port), timeouttimeout) with s: s.sendall((json.dumps(req) \n).encode(utf-8)) f s.makefile(rb) line f.readline().decode(utf-8, ignore).strip() if not line: raise RuntimeError(服务端返回空响应可能服务端异常退出或连接被中断) resp json.loads(line) if not resp.get(ok, False): raise RuntimeError(f服务端返回错误{resp.get(err, 未知错误)}) fmt resp.get(format) if fmt ! 3gp: raise RuntimeError(f服务端返回的音频格式不符合预期{fmt}) return base64.b64decode(resp[b64]) def decode_3gp_bytes_to_f32(audio_3gp_bytes: bytes, sr16000) - np.ndarray: # 关键先落盘避免 pipe:0 无法 seek 导致 moov atom not found fd, tmp_path tempfile.mkstemp(suffix.3gp) try: with os.fdopen(fd, wb) as f: f.write(audio_3gp_bytes) cmd [ ffmpeg, -hide_banner, -loglevel, error, -i, tmp_path, -ac, 1, -ar, str(sr), -f, f32le, pipe:1 ] p subprocess.run(cmd, stdoutsubprocess.PIPE, stderrsubprocess.PIPE) if p.returncode ! 0: err p.stderr.decode(utf-8, ignore)[:2000] raise RuntimeError(fffmpeg 解码失败{err}) return np.frombuffer(p.stdout, dtypenp.float32) finally: try: os.remove(tmp_path) except OSError: pass # -------------------- LiteRT / TFLite 推理工具 -------------------- def _get_qparams(detail: dict): if quantization in detail: scale, zero detail[quantization] return float(scale), int(zero) qp detail.get(quantization_parameters) or {} scales qp.get(scales) zero_points qp.get(zero_points) if scales is not None and len(scales) 0: z int(zero_points[0]) if zero_points is not None and len(zero_points) 0 else 0 return float(scales[0]), z return 0.0, 0 def _quantize_if_needed(x_f32: np.ndarray, in_detail: dict) - np.ndarray: dtype in_detail[dtype] if dtype np.float32: return x_f32.astype(np.float32) scale, zero _get_qparams(in_detail) if scale 0: raise RuntimeError(模型输入是量化类型但 quantization scale0无法量化输入) q np.round(x_f32 / scale zero) if dtype np.int8: q np.clip(q, -128, 127).astype(np.int8) elif dtype np.uint8: q np.clip(q, 0, 255).astype(np.uint8) else: q q.astype(dtype) return q def _dequantize_if_needed(y: np.ndarray, out_detail: dict) - np.ndarray: dtype out_detail[dtype] if dtype np.float32: return y.astype(np.float32) scale, zero _get_qparams(out_detail) if scale 0: return y.astype(np.float32) return (y.astype(np.float32) - zero) * scale def load_labels(labels_path: Path) - list[str]: labels [] with labels_path.open(r, encodingutf-8) as f: for line in f: t line.strip() if t: labels.append(t) return labels def make_group_scorer(model_path: Path, labels_path: Path, groups: dict[str, list[str]]): itp Interpreter(model_pathstr(model_path)) itp.allocate_tensors() in_det itp.get_input_details()[0] out_det itp.get_output_details()[0] labels load_labels(labels_path) # 每组[(label_name, idx), ...] group_targets: dict[str, list[tuple[str, int]]] {} missing [] for gname, names in groups.items(): t [] for n in names: if n in labels: t.append((n, labels.index(n))) else: missing.append((gname, n)) if t: group_targets[gname] t if missing: tips [] for g, n in missing[:15]: # 给一点“包含匹配”的提示方便你找正确拼写 cand [x for x in labels if n.lower() in x.lower()] tips.append(f - 组缺少标签{n}相似项示例{cand[:3]}) raise RuntimeError(labels.txt 中找不到部分标签名请确保拼写完全一致\n \n.join(tips)) if not group_targets: raise RuntimeError(没有任何分组标签可用请检查 groups 配置与 labels.txt) in_shape in_det[shape] need_len int(in_shape[-1]) def score_groups(wav_f32: np.ndarray) - dict[str, tuple[float, str]]: wav np.clip(wav_f32.astype(np.float32), -1.0, 1.0) if len(wav) need_len: wav np.pad(wav, (0, need_len - len(wav))) elif len(wav) need_len: s (len(wav) - need_len) // 2 wav wav[s:s need_len] x wav.reshape(in_shape) x _quantize_if_needed(x, in_det) itp.set_tensor(in_det[index], x) itp.invoke() y itp.get_tensor(out_det[index]) y _dequantize_if_needed(y, out_det) results: dict[str, tuple[float, str]] {} if y.ndim 1: for gname, target in group_targets.items(): best_name, best_score None, -1.0 for name, idx in target: sc float(y[idx]) if sc best_score: best_score, best_name sc, name results[gname] (best_score, best_name or ) return results # y: (frames, 521) for gname, target in group_targets.items(): idxs [idx for _, idx in target] # 每帧该组最大值 per_frame y[:, idxs].max(axis1) mean_score float(per_frame.mean()) peak_frame int(np.argmax(per_frame)) best_name, best_sc None, -1.0 for name, idx in target: sc float(y[peak_frame, idx]) if sc best_sc: best_sc, best_name sc, name results[gname] (mean_score, best_name or ) return results return score_groups # -------------------- 主程序 -------------------- LABEL_GROUPS { 哭声: [Crying, sobbing, Baby cry, infant cry, Whimper, Wail, moan], 说话: [Speech, Conversation, Narration, monologue, Child speech, kid speaking, Babbling, Whispering], 唱歌: [Singing, Choir, Child singing, Humming, Rapping, Chant], 笑声: [Laughter, Baby laughter, Giggle, Snicker, Chuckle, chortle, Belly laugh], 喊叫: [Screaming, Shout, Yell, Children shouting], } def main(args): consec 0 scorer make_group_scorer(Path(args.model), Path(args.labels), LABEL_GROUPS) print(f正在向 Termux 录音服务端请求音频{args.host}:{args.port}每次录音时长{args.dur} 秒) print(f请求间隔{args.interval} 秒建议 录音时长采样率{args.sr}) print(f【报警组】{args.alarm_group}阈值{args.threshold}连续命中次数{args.need_consec}) if args.flag_file: print(f检测到哭声后将写入标记文件{args.flag_file}) while True: try: data_3gp request_3gp_from_termux(args.dur, hostargs.host, portargs.port) wav decode_3gp_bytes_to_f32(data_3gp, srargs.sr) group_scores scorer(wav) # 打印所有分组的得分按得分排序 items sorted(group_scores.items(), keylambda kv: kv[1][0], reverseTrue) line | .join([f{g}:{s:.1f}({name}) for g, (s, name) in items]) print( line, fg_colorgreen) # 报警只看指定分组 alarm_score, alarm_name group_scores.get(args.alarm_group, (0.0, )) if alarm_score args.threshold: consec 1 else: consec max(0, consec - 1) if consec args.need_consec: print(f 检测到目标得分{alarm_score:.2f} 命中{consec}/{args.need_consec} 最高类别{alarm_name}, fg_colorred) if args.flag_file: try: Path(args.flag_file).write_text(f{time.time()}\n, encodingutf-8) except Exception as e: print(f[警告] 写入标记文件失败{e}) time.sleep(args.interval) except KeyboardInterrupt: print(\n已停止。) break except Exception as e: print(f[错误] {e}) time.sleep(0.5) if __name__ __main__: ap argparse.ArgumentParser() ap.add_argument(--host, default127.0.0.1) ap.add_argument(--port, typeint, default8765) ap.add_argument(--dur, typefloat, default5, help每次请求录音时长(秒)) ap.add_argument(--interval, typefloat, default1, help两次请求间隔(秒)) ap.add_argument(--threshold, typefloat, default0.3) ap.add_argument(--need-consec, typeint, default3) ap.add_argument(--sr, typeint, default16000) ap.add_argument(--model, default./yamnet/model.tflite, helpYAMNet .tflite 路径) ap.add_argument(--labels, default./yamnet/labels.txt, helpYAMNet labels.txt 路径) ap.add_argument(--alarm-group, default哭声, help用于报警的分组名例如哭声/说话/唱歌) ap.add_argument(--flag-file, default, help检测到哭声时写入该文件(例如 /sdcard/cry_detected.txt)) args ap.parse_args() main(args)15、部署完成现在开始运行。开两个终端一个在Termux、一个在Ubuntu。运行Termux下的脚本python mic_server.py运行Ubuntu下的脚本python app.py然后可以找个视频对着手机播放一下看看效果目前问题是响应稍慢大家可以看看能怎么优化。16、为了让脚本能保持后台运行建议使用screen这个工具。Ubuntu开一个screenscreen -S ubuntuTermux开一个screenscreen -S server不过最关键的还是Termux app本身的保活建议在 Termux 里不是 Ubuntu执行以下命令来防止被系统挂起termux-wake-lock扩展方向1、如果要开机自启动可以再装一个Termux:Boot插件按照格式要求把启动命令写到~/.termux/boot/ 启动脚本里。2、在声音检测此基础上还可以加很多功能比如检测到哭声就拍照并发送通知到微信。