练习
练习
修改下面的代码。尝试传入不同的参数,将图片进行不同的处理。
调用同网段的摄像头
sender.py
import os
from importlib import import_module
from flask import Flask, render_template, Response,render_template_string
from io import BytesIO
import cv2
from PIL import ImageGrab, Image
import time
import threading
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class CameraEvent(object):
def __init__(self):
self.events = {}
def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()
def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]
def clear(self):
self.events[get_ident()][0].clear()
class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()
def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
while self.get_frame() is None:
time.sleep(0)
def get_frame(self):
BaseCamera.last_access = time.time()
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame
@staticmethod
def frames():
raise RuntimeError('Must be implemented by subclasses.')
@classmethod
def _thread(cls):
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None
class Camera(BaseCamera):
video_source = 0
@staticmethod
def set_video_source(source):
Camera.video_source = source
@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Error')
while True:
image = ImageGrab.grab() # 获取屏幕数据
# w, h = image.size
image = image.resize((1366, 750)) # 图片缩放
output_buffer = BytesIO() # 创建二进制对象
image.save(output_buffer, format='JPEG', quality=100) # quality提升图片分辨率
frame = output_buffer.getvalue() # 获取二进制数据
yield frame # 生成器返回一张图片的二进制数据
app = Flask(__name__)
@app.route('/')
def index():
"""
视图函数
:return:
"""
return render_template_string('''<html>
<head>
<title>屏幕共享</title>
</head>
<body>
<img src="{{ url_for('video_feed') }}">
</body>
</html>''')
def gen(camera):
"""
流媒体发生器
"""
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed')
def video_feed():
"""流媒体数据"""
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
ip_host = '127.0.0.1'# 本机ip地址
ip_host2 = '0.0.0.0'# 内网ip地址
app.run(threaded=True, host=ip_host2, port=80)
processing.py
import cv2
url_path = "http://192.168.7.70/video_feed"
cap = cv2.VideoCapture(url_path)
while True:
ret, frame = cap.read()
if not ret:
break
cv2.imshow("frame", frame)
cv2.waitKey(1)
cap.release()
cv2.destroyAllWindows()