Python Client Examples
Complete Python integration examples for camera control and data access.
Installation and Setup
Requirements
pip install requests websocket-client opencv-python pillow numpy
Basic Configuration
import requests
import json
import cv2
import websocket
from PIL import Image
import io
import base64
import threading
import time
# Camera device configuration
CAMERA_IP = "192.168.1.100"
HTTP_PORT = 8080
HTTPS_PORT = 8081
WS_PORT = 8080
RTSP_PORT = 554
# Authentication
API_TOKEN = "1a2B3c4D5e6f7G8h"
USERNAME = "admin"
PASSWORD = "password"
# Base URLs
BASE_URL = f"http://{CAMERA_IP}:{HTTP_PORT}"
SECURE_URL = f"https://{CAMERA_IP}:{HTTPS_PORT}"
WS_URL = f"ws://{CAMERA_IP}:{WS_PORT}/ws"
RTSP_URL = f"rtsp://{USERNAME}:{PASSWORD}@{CAMERA_IP}:{RTSP_PORT}/stream0"
HTTP API Client
Basic API Client Class
class RotorDreamClient:
def __init__(self, base_url, token=None, username=None, password=None):
self.base_url = base_url.rstrip('/')
self.token = token
self.session = requests.Session()
# Configure authentication
if token:
self.session.headers.update({"Authorization": f"Bearer {token}"})
elif username and password:
self.session.auth = (username, password)
# Configure session
self.session.headers.update({
"User-Agent": "RotorDream-Python-Client/1.0",
"Accept": "application/json"
})
def _request(self, method, endpoint, **kwargs):
"""Make HTTP request with error handling"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
# Return JSON if possible, otherwise raw content
if 'application/json' in response.headers.get('content-type', ''):
return response.json()
return response.content
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Response: {e.response.text}")
raise
def get_info(self):
"""Get system information"""
return self._request("GET", "/api/info")
def get_camera_status(self):
"""Get camera hardware status"""
return self._request("GET", "/api/camera/status")
def capture_image(self, quality=85):
"""Capture current camera frame"""
params = {"quality": quality} if quality != 85 else {}
return self._request("GET", "/camera.jpeg", params=params)
def start_recording(self, filename=None, duration=None, quality="high"):
"""Start video recording"""
data = {
"filename": filename or f"recording_{int(time.time())}.avi",
"quality": quality
}
if duration:
data["duration"] = duration
return self._request("POST", "/api/recording/start", json=data)
def stop_recording(self):
"""Stop current recording"""
return self._request("POST", "/api/recording/stop")
def get_recording_status(self):
"""Get recording status"""
return self._request("GET", "/api/recording/status")
def list_files(self, path="/media/data", recursive=False, filter_ext=None):
"""List files in directory"""
params = {"path": path}
if recursive:
params["recursive"] = "true"
if filter_ext:
params["filter"] = filter_ext
return self._request("GET", "/files", params=params)
def download_file(self, filepath, local_path=None):
"""Download file from device"""
content = self._request("GET", f"/download/{filepath}")
if local_path:
with open(local_path, 'wb') as f:
f.write(content)
return local_path
return content
def upload_file(self, file_path, remote_path=None, overwrite=False):
"""Upload file to device"""
with open(file_path, 'rb') as f:
files = {'file': f}
data = {}
if remote_path:
data['path'] = remote_path
if overwrite:
data['overwrite'] = 'true'
return self._request("POST", "/upload", files=files, data=data)
def delete_file(self, filepath):
"""Delete file from device"""
return self._request("DELETE", f"/files/{filepath}")
Usage Examples
# Initialize client
client = RotorDreamClient(BASE_URL, token=API_TOKEN)
# Get system information
try:
info = client.get_info()
print(f"Device version: {info['version']}")
print(f"Uptime: {info['uptime']} seconds")
print(f"Temperature: {info['system']['temperature']}°C")
except Exception as e:
print(f"Failed to get info: {e}")
# Capture image and save locally
try:
image_data = client.capture_image(quality=95)
with open("current_frame.jpg", "wb") as f:
f.write(image_data)
print("Image captured successfully")
except Exception as e:
print(f"Failed to capture image: {e}")
# Start recording
try:
recording_info = client.start_recording(
filename="experiment_001.avi",
duration=300, # 5 minutes
quality="high"
)
print(f"Recording started: {recording_info['recording_id']}")
# Monitor recording status
while True:
status = client.get_recording_status()
if not status['recording']:
break
print(f"Recording... Duration: {status['duration']}s")
time.sleep(5)
except Exception as e:
print(f"Recording failed: {e}")
# List and download files
try:
files = client.list_files("/media/data/recordings", filter_ext=".avi")
print(f"Found {len(files['files'])} video files")
for file_info in files['files']:
if file_info['type'] == 'file':
print(f"- {file_info['name']} ({file_info['size']} bytes)")
# Download the file
local_filename = f"downloads/{file_info['name']}"
client.download_file(f"recordings/{file_info['name']}", local_filename)
print(f"Downloaded to {local_filename}")
except Exception as e:
print(f"File operations failed: {e}")
WebSocket Real-time Client
WebSocket Client Class
import websocket
import json
import threading
class RotorDreamWebSocketClient:
def __init__(self, ws_url, token=None):
self.ws_url = ws_url
self.token = token
self.ws = None
self.connected = False
self.callbacks = {}
def connect(self):
"""Connect to WebSocket server"""
headers = []
if self.token:
headers.append(f"Authorization: Bearer {self.token}")
self.ws = websocket.WebSocketApp(
self.ws_url,
header=headers,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
# Start in separate thread
self.ws_thread = threading.Thread(target=self.ws.run_forever)
self.ws_thread.daemon = True
self.ws_thread.start()
# Wait for connection
for _ in range(50): # 5 second timeout
if self.connected:
break
time.sleep(0.1)
return self.connected
def disconnect(self):
"""Disconnect from WebSocket server"""
if self.ws:
self.ws.close()
def _on_open(self, ws):
"""WebSocket connection opened"""
self.connected = True
print("WebSocket connected")
def _on_message(self, ws, message):
"""Handle incoming WebSocket message"""
try:
data = json.loads(message)
message_type = data.get('type', 'unknown')
# Call registered callback
if message_type in self.callbacks:
self.callbacks[message_type](data)
else:
print(f"Unhandled message type: {message_type}")
print(f"Data: {data}")
except json.JSONDecodeError:
print(f"Invalid JSON message: {message}")
def _on_error(self, ws, error):
"""Handle WebSocket error"""
print(f"WebSocket error: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""WebSocket connection closed"""
self.connected = False
print("WebSocket disconnected")
def send_message(self, message_type, data=None):
"""Send message to server"""
if not self.connected:
raise Exception("WebSocket not connected")
message = {"type": message_type}
if data:
message.update(data)
self.ws.send(json.dumps(message))
def subscribe_to_state(self, callback):
"""Subscribe to state updates"""
self.callbacks['state_update'] = callback
self.send_message('subscribe', {'event': 'state_changes'})
def subscribe_to_camera_events(self, callback):
"""Subscribe to camera events"""
self.callbacks['camera_event'] = callback
self.send_message('subscribe', {'event': 'camera_events'})
def get_current_state(self):
"""Request current system state"""
self.send_message('get_state')
def set_parameter(self, parameter, value):
"""Set system parameter"""
self.send_message('set_parameter', {
'parameter': parameter,
'value': value
})
WebSocket Usage Examples
# WebSocket event handlers
def on_state_update(data):
"""Handle state change notifications"""
print(f"State updated: {data['parameter']} = {data['value']}")
# React to specific state changes
if data['parameter'] == 'camera.recording':
if data['value']:
print("Recording started!")
else:
print("Recording stopped!")
elif data['parameter'] == 'system.temperature':
temp = data['value']
if temp > 70:
print(f"WARNING: High temperature: {temp}°C")
def on_camera_event(data):
"""Handle camera events"""
event_type = data.get('event')
if event_type == 'frame_ready':
print("New frame available")
elif event_type == 'camera_disconnected':
print("Camera disconnected!")
elif event_type == 'camera_connected':
print("Camera connected")
# Connect and subscribe
ws_client = RotorDreamWebSocketClient(WS_URL, token=API_TOKEN)
if ws_client.connect():
# Subscribe to events
ws_client.subscribe_to_state(on_state_update)
ws_client.subscribe_to_camera_events(on_camera_event)
# Get current state
ws_client.get_current_state()
# Set some parameters
ws_client.set_parameter('camera.resolution', '1920x1080')
ws_client.set_parameter('camera.framerate', 30)
# Keep running to receive events
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
ws_client.disconnect()
else:
print("Failed to connect to WebSocket")
RTSP Video Streaming
OpenCV RTSP Client
import cv2
import numpy as np
import threading
import queue
class RTSPVideoStream:
def __init__(self, rtsp_url, buffer_size=1):
self.rtsp_url = rtsp_url
self.buffer_size = buffer_size
self.cap = None
self.frame_queue = queue.Queue(maxsize=buffer_size)
self.running = False
self.thread = None
def start(self):
"""Start video capture thread"""
self.cap = cv2.VideoCapture(self.rtsp_url)
# Configure capture
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size)
self.cap.set(cv2.CAP_PROP_FPS, 30)
if not self.cap.isOpened():
raise Exception(f"Failed to open RTSP stream: {self.rtsp_url}")
self.running = True
self.thread = threading.Thread(target=self._capture_frames)
self.thread.daemon = True
self.thread.start()
return True
def stop(self):
"""Stop video capture"""
self.running = False
if self.thread:
self.thread.join()
if self.cap:
self.cap.release()
def _capture_frames(self):
"""Capture frames in background thread"""
while self.running:
ret, frame = self.cap.read()
if not ret:
print("Failed to capture frame")
break
# Add to queue, dropping old frames if full
try:
self.frame_queue.put_nowait(frame)
except queue.Full:
try:
self.frame_queue.get_nowait() # Remove old frame
self.frame_queue.put_nowait(frame) # Add new frame
except queue.Empty:
pass
def get_frame(self, timeout=1.0):
"""Get latest frame"""
try:
return self.frame_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_properties(self):
"""Get stream properties"""
if not self.cap:
return None
return {
'width': int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
'height': int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
'fps': self.cap.get(cv2.CAP_PROP_FPS),
'fourcc': int(self.cap.get(cv2.CAP_PROP_FOURCC))
}
# Usage example
def rtsp_stream_example():
"""Example of RTSP stream processing"""
stream = RTSPVideoStream(RTSP_URL)
try:
if stream.start():
props = stream.get_properties()
print(f"Stream properties: {props}")
frame_count = 0
start_time = time.time()
while True:
frame = stream.get_frame()
if frame is None:
continue
frame_count += 1
# Process frame (example: edge detection)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# Display frames
cv2.imshow('Original', frame)
cv2.imshow('Edges', edges)
# Calculate FPS
if frame_count % 30 == 0:
elapsed = time.time() - start_time
fps = frame_count / elapsed
print(f"Processing FPS: {fps:.2f}")
# Press 'q' to quit
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except Exception as e:
print(f"RTSP stream error: {e}")
finally:
stream.stop()
cv2.destroyAllWindows()
# Run the example
if __name__ == "__main__":
rtsp_stream_example()
Advanced Integration Examples
Automated Monitoring System
import schedule
import smtplib
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
class CameraMonitoringSystem:
def __init__(self, client, ws_client=None):
self.client = client
self.ws_client = ws_client
self.alerts_enabled = True
self.email_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': 'alerts@company.com',
'password': 'app_password',
'recipients': ['admin@company.com']
}
def check_system_health(self):
"""Check system health and send alerts if needed"""
try:
info = self.client.get_info()
temp = info['system']['temperature']
cpu = info['system']['cpu_usage']
disk_free = info['system']['disk']['available']
# Check temperature
if temp > 70:
self.send_alert(f"High temperature: {temp}°C", include_image=True)
# Check CPU usage
if cpu > 90:
self.send_alert(f"High CPU usage: {cpu}%")
# Check disk space (less than 1GB)
if disk_free < 1024 * 1024 * 1024:
self.send_alert(f"Low disk space: {disk_free / (1024**3):.1f}GB remaining")
print(f"Health check: Temp={temp}°C, CPU={cpu}%, Disk={disk_free/(1024**3):.1f}GB")
except Exception as e:
self.send_alert(f"Health check failed: {e}")
def capture_and_analyze(self):
"""Capture image and perform basic analysis"""
try:
# Capture image
image_data = self.client.capture_image(quality=95)
# Convert to OpenCV format
nparr = np.frombuffer(image_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Basic analysis (brightness, motion detection, etc.)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
brightness = np.mean(gray)
# Check if image is too dark (possible camera issue)
if brightness < 10:
self.send_alert("Camera image too dark - possible hardware issue",
include_image=True)
# Save timestamped image
timestamp = int(time.time())
filename = f"captures/capture_{timestamp}.jpg"
cv2.imwrite(filename, frame)
return {
'brightness': brightness,
'filename': filename,
'timestamp': timestamp
}
except Exception as e:
print(f"Capture analysis failed: {e}")
return None
def send_alert(self, message, include_image=False):
"""Send email alert"""
if not self.alerts_enabled:
return
try:
msg = MIMEMultipart()
msg['From'] = self.email_config['username']
msg['To'] = ', '.join(self.email_config['recipients'])
msg['Subject'] = f"Camera Alert: {message}"
# Add text
body = f"""
Camera Alert Notification
Message: {message}
Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}
Device IP: {CAMERA_IP}
Please check the camera system.
"""
msg.attach(MIMEText(body, 'plain'))
# Attach image if requested
if include_image:
try:
image_data = self.client.capture_image()
img = MIMEImage(image_data)
img.add_header('Content-Disposition', 'attachment', filename='alert_image.jpg')
msg.attach(img)
except:
pass # Continue without image if capture fails
# Send email
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
print(f"Alert sent: {message}")
except Exception as e:
print(f"Failed to send alert: {e}")
def run_monitoring(self):
"""Start monitoring system"""
# Schedule regular checks
schedule.every(5).minutes.do(self.check_system_health)
schedule.every(1).hours.do(self.capture_and_analyze)
print("Starting monitoring system...")
while True:
schedule.run_pending()
time.sleep(60)
# Initialize monitoring
client = RotorDreamClient(BASE_URL, token=API_TOKEN)
monitor = CameraMonitoringSystem(client)
# Start monitoring (run in background)
# monitor.run_monitoring()
Time-lapse Creation
def create_timelapse(duration_hours=24, interval_seconds=300, output_file="timelapse.mp4"):
"""Create time-lapse video from captured frames"""
client = RotorDreamClient(BASE_URL, token=API_TOKEN)
frames_dir = "timelapse_frames"
os.makedirs(frames_dir, exist_ok=True)
total_frames = int((duration_hours * 3600) / interval_seconds)
frame_count = 0
print(f"Creating time-lapse: {total_frames} frames over {duration_hours} hours")
try:
for i in range(total_frames):
# Capture frame
image_data = client.capture_image(quality=90)
# Save frame
frame_path = os.path.join(frames_dir, f"frame_{i:06d}.jpg")
with open(frame_path, "wb") as f:
f.write(image_data)
frame_count += 1
print(f"Captured frame {frame_count}/{total_frames}")
# Wait for next interval
if i < total_frames - 1:
time.sleep(interval_seconds)
# Create video from frames using ffmpeg
cmd = [
"ffmpeg", "-y",
"-framerate", "30",
"-i", os.path.join(frames_dir, "frame_%06d.jpg"),
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-crf", "23",
output_file
]
import subprocess
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"Time-lapse created: {output_file}")
else:
print(f"FFmpeg error: {result.stderr}")
except KeyboardInterrupt:
print(f"Time-lapse interrupted. Created {frame_count} frames.")
except Exception as e:
print(f"Time-lapse creation failed: {e}")
# Create 24-hour time-lapse with 5-minute intervals
# create_timelapse(duration_hours=24, interval_seconds=300)
Error Handling and Debugging
Comprehensive Error Handling
import logging
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('camera_client.log'),
logging.StreamHandler()
]
)
class RobustRotorDreamClient(RotorDreamClient):
def __init__(self, base_url, token=None, username=None, password=None):
super().__init__(base_url, token, username, password)
# Configure retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.logger = logging.getLogger(self.__class__.__name__)
def _request(self, method, endpoint, **kwargs):
"""Enhanced request with better error handling"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
self.logger.debug(f"Making {method} request to {url}")
response = self.session.request(method, url, timeout=30, **kwargs)
response.raise_for_status()
if 'application/json' in response.headers.get('content-type', ''):
return response.json()
return response.content
except requests.exceptions.Timeout:
self.logger.error(f"Request timeout for {url}")
raise
except requests.exceptions.ConnectionError:
self.logger.error(f"Connection error for {url}")
raise
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP error {e.response.status_code} for {url}")
if e.response.status_code == 401:
self.logger.error("Authentication failed - check credentials")
elif e.response.status_code == 403:
self.logger.error("Access forbidden - check permissions")
elif e.response.status_code == 404:
self.logger.error("Endpoint not found")
raise
except Exception as e:
self.logger.error(f"Unexpected error for {url}: {e}")
raise
# Test connection and diagnostics
def test_connection():
"""Test connection and diagnose issues"""
client = RobustRotorDreamClient(BASE_URL, token=API_TOKEN)
# Test basic connectivity
try:
response = requests.get(f"{BASE_URL}/health", timeout=5)
print(f"Health check: {response.status_code}")
except Exception as e:
print(f"Health check failed: {e}")
return False
# Test authentication
try:
info = client.get_info()
print(f"Authentication successful. Device version: {info['version']}")
except Exception as e:
print(f"Authentication failed: {e}")
return False
# Test camera
try:
status = client.get_camera_status()
print(f"Camera status: {status}")
except Exception as e:
print(f"Camera check failed: {e}")
return True
if __name__ == "__main__":
print("Testing camera connection...")
if test_connection():
print("✓ Connection test passed")
else:
print("✗ Connection test failed")
Related Documentation
- JavaScript Client - Web browser integration
- C# Client - .NET application integration
- HTTP API - Complete REST API reference
- WebSocket API - Real-time communication protocol
Python examples tested with Python 3.8+ and libraries: requests 2.28+, opencv-python 4.6+, websocket-client 1.4+