使用網路與搖桿、手機控制
Last updated
Last updated
利用Python的socket,建立自走車伺服器,接收來自前端的命令。
# 自走車伺服器端程式碼
import socket
#簡易口訣:伺建結聽收
BUF_SIZE = 1024 #設定緩衝區大小
server_addr = ('192.168.0.104', 8088) #自走車ip, port
#建立socket物件時,可不傳入參數,以預設值(socket.AF_INET, socket.SOCK_STREAM)
#表示使用ipv4 TCP協定
server = socket.socket() #建立socket物件
server.bind(server_addr) #繫結
print('socket與位址繫結完成..')
server.listen(5) #監聽, 最大監聽數為5
print('開始等待前端連線中..')
client, client_addr = server.accept() #接收TCP連線, 傳回前端連線物件和位址
print(f'前端位址為:{client_addr}')
while True :
command = client.recv(BUF_SIZE).decode('utf-8') #從客戶端接收資料
if command == 'W':
print('向前')
elif command == 'X':
print('後退')
elif command == 'A':
print('左轉')
elif command == 'D':
print('右轉')
elif command == 'S':
print('停止')
elif command == 'E':
print('結束連線!')
break
server.close()
# 前端(控制端)程式碼
from getkey import getkey, keys
import socket
#簡易口訣:前建連送收
car_addr = ('192.168.0.104', 8088) #遠端自走車的 ip, port
client = socket.socket()
client.connect(car_addr) #連線
while True:
command = getkey()
if command == keys.UP or command == 'W' or command == 'w':
command = 'W'
elif command == keys.DOWN or command == 'X' or command == 'x':
command = 'X'
elif command == keys.LEFT or command == 'A' or command == 'a':
command = 'A'
elif command == keys.RIGHT or command == 'D' or command == 'd':
command = 'D'
elif command == keys.SPACE or command == 'S' or command == 's':
command = 'S'
elif command == 'E' or command == 'e':
command = 'E'
elif command == keys.ESCAPE:
break
client.send(command.encode('utf-8')) #傳送命令到伺服器
client.close()
使用鍵盤按鍵,將車子行動指令透過網路傳到遙控車上。
遙控車使用自製的Motor以及Car類別來處理馬達運作事宜。
# 前端控制程式
from getkey import getkey, keys
import socket
#簡易口訣:前建連送收
car_addr = ('192.168.0.104', 8088) #遠端自走車的 ip, port
client = socket.socket()
client.connect(car_addr) #連線
while True:
command = getkey()
if command == keys.UP or command == 'W' or command == 'w':
command = 'W'
elif command == keys.DOWN or command == 'X' or command == 'x':
command = 'X'
elif command == keys.LEFT or command == 'A' or command == 'a':
command = 'A'
elif command == keys.RIGHT or command == 'D' or command == 'd':
command = 'D'
elif command == keys.SPACE or command == 'S' or command == 's':
command = 'S'
elif command == 'L' or command == 'l':
command = 'L'
elif command == 'H' or command == 'h':
command = 'H'
elif command == 'E' or command == 'e':
command = 'E'
elif command == 'T' or command == 't':
command = 'T'
elif command == 'Y' or command == 'y':
command = 'Y'
elif command == keys.ESCAPE:
break
client.send(command.encode('utf-8')) #傳送命令到伺服器
client.close()
# -*- coding: UTF-8 -*-
# 自走車馬達類別
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
class Motor:
def __init__(self, en, in1, in2):
self._en = en
self._in1 = in1
self._in2 = in2
GPIO.setup(self._en, GPIO.OUT, initial=GPIO.LOW)
# 利用PWN來改變車子馬達轉速(改變車子速度),初始值為全速100
self._speed = 100
self._pwm_speed = GPIO.PWM(self._en, 600)
self._pwm_speed.start(0)
self._pwm_speed.ChangeDutyCycle(self._speed)
GPIO.setup(self._in1, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(self._in2, GPIO.OUT, initial=GPIO.LOW)
@property
def motor_speed(self):
return self._speed
@motor_speed.setter
def motor_speed(self, value):
self._speed = value
self._pwm_speed.ChangeDutyCycle(value)
def forward(self):
GPIO.output(self._en, True)
GPIO.output(self._in1, True)
GPIO.output(self._in2, False)
def backward(self):
GPIO.output(self._en, True)
GPIO.output(self._in1, False)
GPIO.output(self._in2, True)
def stop(self):
GPIO.output(self._en, False)
GPIO.output(self._in1, False)
GPIO.output(self._in2, False)
class Car:
def __init__(self, left, right):
if not isinstance(left, tuple) or not isinstance(right, tuple):
print('左和右馬達腳位,都必須使用tuples格式')
quit()
self._left_motor = Motor(left[0], left[1], left[2])
self._right_motor = Motor(right[0], right[1], right[2])
@property
def right_speed(self):
return self._right_motor.motor_speed
@right_speed.setter
def right_speed(self, value):
self._right_motor.motor_speed = value
@property
def left_speed(self):
return self._left_motor.motor_speed
@left_speed.setter
def left_speed(self, value):
self._left_motor.motor_speed = value
@property
def speed(self):
return (self.left_speed, self.right_speed)
@speed.setter
def speed(self, value):
if not isinstance(value, tuple):
print('使用(left, right)tuples格式')
quit()
self.right_speed = value[0]
self.left_speed = value[1]
def forward(self, speed=None):
if speed:
self.speed = speed
self._right_motor.forward()
self._left_motor.forward()
def backward(self, speed=None):
if speed:
self.speed = speed
self._right_motor.backward()
self._left_motor.backward()
def turn_left(self):
self._left_motor.backward()
self._right_motor.forward()
def turn_right(self):
self._left_motor.forward()
self._right_motor.backward()
def stop(self):
self._left_motor.stop()
self._right_motor.stop()
# 遙控車主程式
from car import Car
import socket
mycar = Car(left=(13, 19, 16), right=(20, 21, 26))
speed = 100
BUF_SIZE = 1024 #設定緩衝區大小
car_addr = ('192.168.0.104', 8088) #自走車IP位址
car_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #建立socket物件
car_server.bind(car_addr) #繫結
car_server.listen(5) #設定最大監聽數為5
print('等待行動中...')
client, client_addr = car_server.accept() #接收連線,傳回前端位址
print(f'前端位址為:{client_addr}')
while True :
command = client.recv(BUF_SIZE).decode('utf-8') #從前端接收行動命令
if command == 'W':
print('向前')
mycar.forward()
elif command == 'X':
print('後退')
mycar.backward()
elif command == 'A':
print('左轉')
mycar.turn_left()
elif command == 'D':
print('右轉')
mycar.turn_right()
elif command == 'S':
print('停止')
mycar.stop()
elif command == 'L':
speed -= 10
if speed <= 30:
speed = 30
mycar.speed = (speed, speed)
# print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
elif command == 'H':
speed += 10
if speed >= 100:
speed = 100
mycar.speed = (speed, speed)
# print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
elif command == 'T':
print('左行')
mycar.speed = (speed-20, speed)
elif command == 'Y':
print('右行')
mycar.speed = (speed, speed-20)
elif command == 'E':
mycar.stop()
break
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
可以先用evtest了解基本的搖桿事件訊號,為了程式設計方便,採用pygame套件模組的joystick模組來完成搖桿的訊號接收與判斷。同時為了使樹莓派開機就可以使用,本次採用systemd的方式來完成工作。
參考網站:樹莓派讓程式開機自動執行的五種方法:https://www.dexterindustries.com/howto/run-a-program-on-your-raspberry-pi-at-startup/
# 前端搖桿(遊戲手把)的程式碼,可嘗試把註解拿掉,以便更了解搖桿的各種訊號
import pygame as pg
import socket
import pickle
from time import sleep
#簡易口訣:前建連送收
# car_addr = ('192.168.0.104', 8099) #遠端自走車的 ip, port
sleep(20)
client = socket.socket()
client.settimeout(10.0)
client.connect(('192.168.0.104', 8099)) #連線
# 左上方的上下左右按鈕回傳值為tuple
# hats = {'hatup':(0, 1), 'hatdown':(0, -1), 'hatleft':(-1, 0), 'hatright':(1, 0)}
hats = {'F':(0, 1), 'B':(0, -1), 'L':(-1, 0), 'R':(1, 0)}
# LeftSpeed(左速ls) , RightSpeed(右速rs) 預設為100全速
bs = 50 # 開始速度
sp = 30 # 初始最低速
cmd = list()
pg.init()
pg.joystick.init()
# 底下程式碼可以檢查所有的搖桿名稱
# joysticks = [pg.joystick.Joystick(x) for x in range(pg.joystick.get_count())]
# for js in joysticks:
# print(js.get_name())
js = pg.joystick.Joystick(0)
# print(f'joystick init = {js.get_init()}')
# print(f'joystick name = {js.get_name()}')
# print(f'number axer = {js.get_numaxes()}')
# print(f'number buttons = {js.get_numbuttons()}')
# print(f'number hats = {js.get_numhats()}')
def send_cmd(cmd):
print(f'command = {cmd}')
command = pickle.dumps(cmd)
client.send(command)
try:
while True:
events = pg.event.get()
for event in events:
if event.type == pg.JOYAXISMOTION:
# print(event.dict, event.instance_id, event.axis, event.value, js.get_axis(event.axis))
fsp = (lambda s : s if s >= 30 else 30)(int(-event.value * 100))
sp = fsp
if event.axis == 4 : # 右邊的搖捍軸,左右輪速度
cmd = ('lrs', fsp)
elif event.axis == 2 : # 左下方按鍵式搖捍軸,左輪速度
cmd = ('ls', fsp)
elif event.axis == 5 : # 右下方按鍵式搖捍軸,右輪速度
cmd = ('rs', fsp)
send_cmd(cmd)
elif event.type == pg.JOYHATMOTION and event.value != (0, 0):
#print(event.dict, event.instance_id, event.hat, event.value)
for key, value in hats.items():
if value == event.value:
cmd = (key, max(bs,sp))
send_cmd(cmd)
# elif event.type == pg.JOYBALLMOTION:
# print(event.dict, event.instance_id, event.ball, event.rel)
# elif event.type == pg.JOYBUTTONDOWN:
# print(event.dict, event.instance_id, event.button, 'pressed')
elif event.type == pg.JOYBUTTONUP:
#print(event.dict, event.instance_id, event.button, 'released')
if event.button == 0: # 按鈕A被放開
send_cmd(('S', 0))
except KeyboardInterrupt:
print("EXITING NOW")
js.quit()
pg.joystick.quit()
client.close()
#! /usr/bin/env python3
# 後端自走車伺服器的程式碼
from car import Car
import RPi.GPIO as gpio
import socket
import pickle
from time import sleep
#自走車上的LED燈
leds = (10, 9, 25)
gpio.setmode(gpio.BCM)
gpio.setwarnings(False)
#先把LED全部關掉
for led in leds:
gpio.setup(led, gpio.OUT, initial=gpio.HIGH)
gpio.output(leds[0],False)
mycar = Car(left=(20, 21, 26), right=(13, 19, 16))
BUF_SIZE = 1024 #設定緩衝區大小
# car_addr = ('192.168.0.104', 8099) #自走車IP位址
# car_host = socket.gethostname()
car_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #建立socket物件
car_server.bind(('', 8099)) #繫結
gpio.output(leds[1],False)
car_server.listen(5) #設定最大監聽數為5
print('等待行動中...')
client, client_addr = car_server.accept() #接收連線,傳回前端位址
gpio.output(leds[2],False)
print(f'前端位址為:{client_addr}')
try:
while True :
cmd = pickle.loads(client.recv(BUF_SIZE)) #從前端接收行動命令
print(f'read command = {cmd}')
command = cmd[0]
sp = cmd[1]
if command == 'F':
print('向前')
mycar.forward((sp, sp))
elif command == 'B':
print('後退')
mycar.backward((sp, sp))
elif command == 'L':
print('左轉')
mycar.turn_left()
elif command == 'R':
print('右轉')
mycar.turn_right()
elif command == 'lrs':
mycar.speed = (sp, sp)
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
elif command == 'rs':
mycar.right_speed = sp
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
elif command == 'ls':
mycar.left_speed = sp
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
elif command == 'S':
mycar.stop()
except:
client.close()
gpio.cleanup()
前端 /lib/systemd/system/client_car.service
[Unit]
Description=Client Car Service
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/raspi_gpio/JOYSTICK
ExecStart=/usr/bin/python3 /home/pi/raspi_gpio/JOYSTICK/js_test.py > /dev/null 2>&1
[Install]
WantedBy=multi-user.target
後端 /lib/systemd/system/jscar.service
[Unit]
Description=My JoystickCar Service
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi/socket_car
ExecStart=/usr/bin/python3 /home/pi/socket_car/joystickcar.py > /dev/null 2>&1
[Install]
WantedBy=multi-user.target
指令參考:
sudo systemctl daemon-reload
sudo systemctl enable XXXXX.service
sudo systemctl status XXXXX.service
透過搖捍按鈕來進行連接動作,並且嘗試五次,可避免一定要先開好自走車伺服器的問題。
import pygame as pg
import socket
import pickle
from time import sleep
import sys
#簡易口訣:前建連送收
# car_addr = ('192.168.0.104', 8099) #遠端自走車的 ip, port
# sleep(20)
client = None
con_times = 1
check_connect = True
# 左上方的上下左右按鈕回傳值為tuple
# hats = {'hatup':(0, 1), 'hatdown':(0, -1), 'hatleft':(-1, 0), 'hatright':(1, 0)}
hats = {'F':(0, 1), 'B':(0, -1), 'L':(-1, 0), 'R':(1, 0)}
# LeftSpeed(左速ls) , RightSpeed(右速rs) 預設為100全速
bs = 50 # 開始速度
sp = 30 # 初始最低速
cmd = list()
pg.init()
pg.joystick.init()
# 底下程式碼可以檢查所有的搖桿名稱
# joysticks = [pg.joystick.Joystick(x) for x in range(pg.joystick.get_count())]
# for js in joysticks:
# print(js.get_name())
js = pg.joystick.Joystick(0)
# print(f'joystick init = {js.get_init()}')
# print(f'joystick name = {js.get_name()}')
# print(f'number axer = {js.get_numaxes()}')
# print(f'number buttons = {js.get_numbuttons()}')
# print(f'number hats = {js.get_numhats()}')
def send_cmd(cmd):
if client:
print(f'command = {cmd}')
command = pickle.dumps(cmd)
client.send(command)
try:
while True:
events = pg.event.get()
for event in events:
if event.type == pg.JOYAXISMOTION:
# print(event.dict, event.instance_id, event.axis, event.value, js.get_axis(event.axis))
fsp = (lambda s : s if s >= 30 else 30)(int(-event.value * 100))
sp = fsp
if event.axis == 4 : # 右邊的搖捍軸,左右輪速度
cmd = ('lrs', fsp)
elif event.axis == 2 : # 左下方按鍵式搖捍軸,左輪速度
cmd = ('ls', fsp)
elif event.axis == 5 : # 右下方按鍵式搖捍軸,右輪速度
cmd = ('rs', fsp)
send_cmd(cmd)
elif event.type == pg.JOYHATMOTION and event.value != (0, 0):
#print(event.dict, event.instance_id, event.hat, event.value)
for key, value in hats.items():
if value == event.value:
cmd = (key, max(bs,sp))
send_cmd(cmd)
# elif event.type == pg.JOYBALLMOTION:
# print(event.dict, event.instance_id, event.ball, event.rel)
# elif event.type == pg.JOYBUTTONDOWN:
# print(event.dict, event.instance_id, event.button, 'pressed')
elif event.type == pg.JOYBUTTONUP:
print(event.dict, event.instance_id, event.button, 'released')
if event.button == 0: # 按鈕A被放開
send_cmd(('S', 0))
elif event.button == 1: # 按鈕B被放開
print('command is button B')
# 也可以將connect程式區塊放在這裡,也就是按下B鍵放開後,才開始連接自走車
while check_connect:
try:
client = socket.socket()
client.settimeout(5.0)
client.connect(('192.168.0.104', 8099)) #連線
check_connect = False
client.settimeout(None)
except OSError as msg:
sleep(3)
print(f'error={con_times} message={msg}')
con_times += 1
client.close()
if con_times > 5: # 嘗試連接次數
sys.exit(1)
except KeyboardInterrupt:
print("EXITING NOW")
js.quit()
pg.joystick.quit()
client.close()
在沒有網路的環境之下(如野外運動場),無法連接到自走車進行遙控,此時,將樹莓派打造成無線基地台,提供前端另一塊樹莓派彧是手機等行動裝置連接,就可以在「封閉式網路」遙控自走車了。
官方設定方式網站:https://www.raspberrypi.org/documentation/configuration/wireless/access-point-routed.md
利用app inventor(AI2)來快速開發手機應用程式。
ClientSocket extension 下載位置:https://community.appinventor.mit.edu/t/tcp-ip-extension/7142
#! /usr/bin/env python3
# 請注意,如果使用的檔名並非第三步驟的檔名,請配合一起修正system daemon service
from car import Car
import RPi.GPIO as gpio
import socket
import pickle
from time import sleep
#自走車上的LED燈
leds = (10, 9, 25)
gpio.setmode(gpio.BCM)
gpio.setwarnings(False)
#先把LED全部關掉
for led in leds:
gpio.setup(led, gpio.OUT, initial=gpio.HIGH)
gpio.output(leds[0],False)
mycar = Car(left=(20, 21, 26), right=(13, 19, 16))
BUF_SIZE = 1024 #設定緩衝區大小
# car_addr = ('192.168.0.104', 8099) #自走車IP位址
# car_host = socket.gethostname()
car_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #建立socket物件
car_server.bind(('', 8099)) #繫結
gpio.output(leds[1],False)
car_server.listen(5) #設定最大監聽數為5
print('等待行動中...')
client, client_addr = car_server.accept() #接收連線,傳回前端位址
gpio.output(leds[2],False)
print(f'前端位址為:{client_addr}')
# 初始速度
lsp = rsp = 100
try:
while True :
# cmd = pickle.loads(client.recv(BUF_SIZE)) #從前端接收行動命令
read_data = client.recv(BUF_SIZE).decode('utf-8')
print(f'read data = {read_data}')
command = read_data.split('-')[0]
print(f'command = {command}')
# 先決定車速
if command == 'LS':
lsp = int(float(read_data.split('-')[1]))
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
print(f'lsp={lsp}')
elif command == 'RS':
rsp = int(float(read_data.split('-')[1]))
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
print(f'rsp={rsp}')
mycar.speed=(lsp, rsp)
print(f'car speed = {mycar.speed}')
if command == 'F':
print('向前')
mycar.forward()
elif command == 'B':
print('後退')
mycar.backward()
elif command == 'L':
print('左轉')
mycar.turn_left()
elif command == 'R':
print('右轉')
mycar.turn_right()
elif command == 'S':
mycar.stop()
except:
client.close()
gpio.cleanup()
利用android手機的藍芽和樹莓派的藍芽連接,從而控制自走車的方向,也是重要課題。
AI2 手機程式下載:https://drive.google.com/file/d/1QNxFC-2z-FGC7GlmxEfjArjxbx4sd4Jt/view?usp=sharing
sudo nano /lib/systemd/system/bluetooth.service
[Unit]
Description=Bluetooth service
Documentation=man:bluetoothd(8)
ConditionPathIsDirectory=/sys/class/bluetooth
[Service]
Type=dbus
BusName=org.bluez
ExecStart=/usr/lib/bluetooth/bluetoothd -C
ExecStartPost=/usr/bin/sdptool add SP
NotifyAccess=main
#WatchdogSec=10
#Restart=on-failure
CapabilityBoundingSet=CAP_NET_ADMIN CAP_NET_BIND_SERVICE
LimitNPROC=1
ProtectHome=true
ProtectSystem=full
[Install]
WantedBy=bluetooth.target
Alias=dbus-org.bluez.service
sudo chmod 777 /var/run/sdp
sdptool browse local
sudo apt install libbluetooth-dev
pip3 install pybluez2
import RPi.GPIO as gpio
from car import Car
import bluetooth as bt
#自走車上的LED燈
leds = (10, 9, 25)
gpio.setmode(gpio.BCM)
gpio.setwarnings(False)
#先把LED全部關掉
for led in leds:
gpio.setup(led, gpio.OUT, initial=gpio.HIGH)
gpio.output(leds[0],False)
mycar = Car(left=(20, 21, 26), right=(13, 19, 16))
server_socket=bt.BluetoothSocket(bt.RFCOMM)
server_socket.bind(("", bt.PORT_ANY))
gpio.output(leds[1],False)
server_socket.listen(1)
print('等待行動中...')
# port = server_socket.getsockname()[1]
# print(f'port = {port}')
client_sock, address = server_socket.accept()
gpio.output(leds[2],False)
print(f'前端連接藍芽位址:{address}')
lsp = rsp = 100
try:
while True :
read_data = client_sock.recv(1024).decode('utf-8') #從前端接收行動命令
print(f'read data = {read_data}')
command = read_data.split('-')[0]
print(f'command = {command}')
# 先決定車速
if command == 'LS':
lsp = int(float(read_data.split('-')[1]))
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
print(f'lsp={lsp}')
elif command == 'RS':
rsp = int(float(read_data.split('-')[1]))
print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
print(f'rsp={rsp}')
mycar.speed=(lsp, rsp)
print(f'car speed = {mycar.speed}')
if command == 'F':
print('向前')
mycar.forward()
elif command == 'B':
print('後退')
mycar.backward()
elif command == 'L':
print('左轉')
mycar.turn_left()
elif command == 'R':
print('右轉')
mycar.turn_right()
elif command == 'S':
print('停止')
mycar.stop()
except:
client_sock.close()
server_socket.close()
gpio.cleanup()