Skip to main content

用Flask搭建屏幕共享工具

· 5 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结
  1. 当你的电脑无法通过视频线连接到电视机,可能是由于线缆长度不足或者接口不兼容,而你的电视机恰好支持浏览器功能。

  2. 当你在外面参加培训或交流活动,现场只提供了一个WiFi网络。大家刚刚熟悉,马上就要开始屏幕分享,你需要一个快速的方式让大家都能看到你的屏幕。

这个时候,你就需要一个工具来捕获和分享你的屏幕和音频(包括设备音频和麦克风输入),并通过网页形式与他人共享。这样,观众无需下载任何会议软件,仅需打开浏览器即可观看。

安装依赖

pip install opencv-python Pillow greenlet pyaudio Flask

代码

share.py
from flask import Flask, Response, render_template_string, stream_with_context
import time
import cv2
from PIL import ImageGrab, Image
import threading
from io import BytesIO
from greenlet import getcurrent as get_ident
import pyaudio


class Audio:
def __init__(self):
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 2
self.RATE = 44100
self.CHUNK = 1024
self.bitsPerSample = 16
self.audio = pyaudio.PyAudio()
self.first_run = True
self.wav_header = self.genHeader(self.RATE, self.bitsPerSample, self.CHANNELS)
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
input_device_index=1,
frames_per_buffer=self.CHUNK,
)
self.get_audio = self._get_audio_first_time

def genHeader(self, sampleRate, bitsPerSample, channels):
datasize = 2000 * 10**6
o = bytes("RIFF", "ascii")
o += (datasize + 36).to_bytes(4, "little")
o += bytes("WAVE", "ascii")
o += bytes("fmt ", "ascii")
o += (16).to_bytes(4, "little")
o += (1).to_bytes(2, "little")
o += (channels).to_bytes(2, "little")
o += (sampleRate).to_bytes(4, "little")
o += (sampleRate * channels * bitsPerSample // 8).to_bytes(4, "little")
o += (channels * bitsPerSample // 8).to_bytes(2, "little")
o += (bitsPerSample).to_bytes(2, "little")
o += bytes("data", "ascii")
o += (datasize).to_bytes(4, "little")
return o

def _get_audio_first_time(self):
data = self.stream.read(self.CHUNK)
self.get_audio = self._get_audio_subsequent_times # switch the method
return self.wav_header + data

def _get_audio_subsequent_times(self):
return self.stream.read(self.CHUNK)

class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].is_set():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError("Must be implemented by subclasses.")

@classmethod
def _thread(cls):
print("Starting camera thread.")
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print("Stopping camera thread due to inactivity.")
break
BaseCamera.thread = None


class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError("Error")
while True:
image = ImageGrab.grab()
image = image.resize((1366, 750), Image.LANCZOS)
output_buffer = BytesIO()
image.save(output_buffer, format="JPEG", quality=100)
frame = output_buffer.getvalue()
yield frame
app = Flask(__name__)

def gen(camera):
while True:
frame = camera.get_frame()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")

def gen_audio(audio):
while True:
data = audio.get_audio()
yield data

@app.route("/video_feed")
def video_feed():
return Response(gen(Camera()), mimetype="multipart/x-mixed-replace; boundary=frame")

@app.route("/audio_feed")
def audio_feed():
return Response(stream_with_context(gen_audio(Audio())))

@app.route("/")
def index():
global mode
video_tag = """<img src="{{ url_for('video_feed') }}">"""
audio_tag = """<audio autoplay style="display:none;"><source src="{{ url_for('audio_feed') }}" type="audio/x-wav; codec=pcm">Your browser does not support the audio element.</audio>"""

tags = {0: video_tag + audio_tag, 1: audio_tag, 2: video_tag}

content = tags[mode]

return render_template_string(
"""<html>
<head>
<title>{title}</title>
<link rel="icon" href="data:image/svg+xml;base64,CjxzdmcgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiB2aWV3Qm94PSIwIDAgNTAgNTAiPgogICAgPGNpcmNsZSBjeD0iMjUiIGN5PSIyNSIgcj0iMjAiIGZpbGw9InJlZCIgLz4KPC9zdmc+Cg==">
</head>
<body>{content}</body>
</html>""".format(
title=["Intranet Broadcast", "Audio Sharing", "Screen Sharing"][mode],
content=content,
)
)

if __name__ == "__main__":
local_host = "127.0.0.1"
ip_host = "0.0.0.0"
port = 8001
mode = int(input("Please select the mode: 0 for Intranet Broadcast, 1 for Audio Sharing, 2 for Screen Sharing: "))
app.run(threaded=True, host=ip_host, port=port)

运行程序后,程序会提示你输入一个数字:

0表示同时分享屏幕和音频
1表示仅分享音频
2表示仅分享屏幕

输入相应数字后按回车键即可。程序运行后,会在控制台输出一个URL。你只需在浏览器中输入这个URL,就可以看到你的屏幕和音频了。

后话

你可以在此项目的基础上进行扩展,增加更多功能,如:

  • 识别当前音频并将其转化为文本,与屏幕共享一起传输,这样观众就可以在屏幕上看到你的讲话内容。
  • 结合翻译API,实现实时翻译功能。
  • 压缩屏幕画面质量,获得更流畅的传输效果等等。