使用網路與搖桿、手機控制

一、利用socket傳送命令

利用Python的socket,建立自走車伺服器,接收來自前端的命令。

# 自走車伺服器端程式碼

import socket

#簡易口訣:伺建結聽收
BUF_SIZE = 1024  #設定緩衝區大小
server_addr = ('192.168.0.104', 8088)  #自走車ip, port
#建立socket物件時,可不傳入參數,以預設值(socket.AF_INET, socket.SOCK_STREAM)
#表示使用ipv4 TCP協定
server = socket.socket()  #建立socket物件
server.bind(server_addr)  #繫結
print('socket與位址繫結完成..')
server.listen(5)  #監聽, 最大監聽數為5
print('開始等待前端連線中..')
client, client_addr = server.accept()  #接收TCP連線, 傳回前端連線物件和位址
print(f'前端位址為:{client_addr}')

while True :
    command = client.recv(BUF_SIZE).decode('utf-8')  #從客戶端接收資料
    
    if command == 'W':
        print('向前')
    elif command == 'X':
        print('後退')
    elif command == 'A':
        print('左轉')
    elif command == 'D':
        print('右轉')
    elif command == 'S':
        print('停止')
    elif command == 'E':
        print('結束連線!')
        break

server.close()
# 前端(控制端)程式碼

from getkey import getkey, keys
import socket

#簡易口訣:前建連送收
car_addr = ('192.168.0.104', 8088)   #遠端自走車的 ip, port
client = socket.socket()
client.connect(car_addr)  #連線

while True:
    command = getkey()
    if command == keys.UP or command == 'W' or command == 'w':
        command = 'W'
    elif command == keys.DOWN or command == 'X' or command == 'x':
        command = 'X'
    elif command == keys.LEFT or command == 'A' or command == 'a':
        command = 'A'
    elif command == keys.RIGHT or command == 'D' or command == 'd':
        command = 'D'
    elif command == keys.SPACE or command == 'S' or command == 's':
        command = 'S'
    elif command == 'E' or command == 'e':
        command = 'E'
    elif command == keys.ESCAPE:
        break
    client.send(command.encode('utf-8'))  #傳送命令到伺服器

client.close()

二、使用鍵盤按鍵

使用鍵盤按鍵,將車子行動指令透過網路傳到遙控車上。

遙控車使用自製的Motor以及Car類別來處理馬達運作事宜。

# 前端控制程式
from getkey import getkey, keys
import socket

#簡易口訣:前建連送收
car_addr = ('192.168.0.104', 8088)   #遠端自走車的 ip, port
client = socket.socket()
client.connect(car_addr)  #連線

while True:
    command = getkey()
    if command == keys.UP or command == 'W' or command == 'w':
        command = 'W'
    elif command == keys.DOWN or command == 'X' or command == 'x':
        command = 'X'
    elif command == keys.LEFT or command == 'A' or command == 'a':
        command = 'A'
    elif command == keys.RIGHT or command == 'D' or command == 'd':
        command = 'D'
    elif command == keys.SPACE or command == 'S' or command == 's':
        command = 'S'
    elif command == 'L' or command == 'l':
        command = 'L'
    elif command == 'H' or command == 'h':
        command = 'H'
    elif command == 'E' or command == 'e':
        command = 'E'
    elif command == 'T' or command == 't':
        command = 'T'
    elif command == 'Y' or command == 'y':
        command = 'Y'        
    elif command == keys.ESCAPE:
        break
    client.send(command.encode('utf-8'))  #傳送命令到伺服器

client.close()
# -*- coding: UTF-8 -*-
# 自走車馬達類別
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

class Motor:
    def __init__(self, en, in1, in2):
        self._en = en
        self._in1 = in1
        self._in2 = in2
        GPIO.setup(self._en, GPIO.OUT, initial=GPIO.LOW)
        # 利用PWN來改變車子馬達轉速(改變車子速度),初始值為全速100
        self._speed = 100
        self._pwm_speed = GPIO.PWM(self._en, 600)
        self._pwm_speed.start(0)
        self._pwm_speed.ChangeDutyCycle(self._speed)
        GPIO.setup(self._in1, GPIO.OUT, initial=GPIO.LOW)
        GPIO.setup(self._in2, GPIO.OUT, initial=GPIO.LOW)
    
    @property
    def motor_speed(self):
        return self._speed
    
    @motor_speed.setter
    def motor_speed(self, value):
        self._speed = value
        self._pwm_speed.ChangeDutyCycle(value)
    
    def forward(self):
        GPIO.output(self._en, True)
        GPIO.output(self._in1, True)
        GPIO.output(self._in2, False)
    
    def backward(self):
        GPIO.output(self._en, True)
        GPIO.output(self._in1, False)
        GPIO.output(self._in2, True)
    
    def stop(self):
        GPIO.output(self._en, False)
        GPIO.output(self._in1, False)
        GPIO.output(self._in2, False)


class Car:
    def __init__(self, left, right):
        if not isinstance(left, tuple) or not isinstance(right, tuple):
            print('左和右馬達腳位,都必須使用tuples格式')
            quit()
        self._left_motor = Motor(left[0], left[1], left[2])
        self._right_motor = Motor(right[0], right[1], right[2])
    
    @property
    def right_speed(self):
        return self._right_motor.motor_speed

    @right_speed.setter
    def right_speed(self, value):
        self._right_motor.motor_speed = value

    @property
    def left_speed(self):
        return self._left_motor.motor_speed

    @left_speed.setter
    def left_speed(self, value):
        self._left_motor.motor_speed = value

    @property
    def speed(self):
        return (self.left_speed, self.right_speed)        

    @speed.setter
    def speed(self, value):
        if not isinstance(value, tuple):
            print('使用(left, right)tuples格式')
            quit()
        self.right_speed = value[0]
        self.left_speed = value[1]

    def forward(self, speed=None):
        if speed:
            self.speed = speed
        self._right_motor.forward()
        self._left_motor.forward()
    
    def backward(self, speed=None):
        if speed:
            self.speed = speed
        self._right_motor.backward()
        self._left_motor.backward()
    
    def turn_left(self):
        self._left_motor.backward()
        self._right_motor.forward()
    
    def turn_right(self):
        self._left_motor.forward()
        self._right_motor.backward()
    
    def stop(self):
        self._left_motor.stop()
        self._right_motor.stop()
# 遙控車主程式
from car import Car
import socket
mycar = Car(left=(13, 19, 16), right=(20, 21, 26))
speed = 100

BUF_SIZE = 1024  #設定緩衝區大小
car_addr = ('192.168.0.104', 8088)  #自走車IP位址
car_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  #建立socket物件
car_server.bind(car_addr)  #繫結
car_server.listen(5)  #設定最大監聽數為5
print('等待行動中...')
client, client_addr = car_server.accept()  #接收連線,傳回前端位址
print(f'前端位址為:{client_addr}')

while True :
    command = client.recv(BUF_SIZE).decode('utf-8')  #從前端接收行動命令
    if command == 'W':
        print('向前')
        mycar.forward()
    elif command == 'X':
        print('後退')
        mycar.backward()
    elif command == 'A':
        print('左轉')
        mycar.turn_left()
    elif command == 'D':
        print('右轉')
        mycar.turn_right()
    elif command == 'S':
        print('停止')
        mycar.stop()
    elif command == 'L':
        speed -= 10
        if speed <= 30:
            speed = 30
        mycar.speed = (speed, speed)
        # print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
    elif command == 'H':
        speed += 10
        if speed >= 100:
            speed = 100
        mycar.speed = (speed, speed)
        # print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
    elif command == 'T':
        print('左行')
        mycar.speed = (speed-20, speed)
    elif command == 'Y':
        print('右行')
        mycar.speed = (speed, speed-20)    
    elif command == 'E':
        mycar.stop()
        break
    print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')

三、使用搖桿(遊戲手把)

可以先用evtest了解基本的搖桿事件訊號,為了程式設計方便,採用pygame套件模組的joystick模組來完成搖桿的訊號接收與判斷。同時為了使樹莓派開機就可以使用,本次採用systemd的方式來完成工作。

參考網站:樹莓派讓程式開機自動執行的五種方法:https://www.dexterindustries.com/howto/run-a-program-on-your-raspberry-pi-at-startup/

# 前端搖桿(遊戲手把)的程式碼,可嘗試把註解拿掉,以便更了解搖桿的各種訊號
import pygame as pg
import socket
import pickle
from time import sleep

#簡易口訣:前建連送收
# car_addr = ('192.168.0.104', 8099)   #遠端自走車的 ip, port
sleep(20)
client = socket.socket()
client.settimeout(10.0)
client.connect(('192.168.0.104', 8099))  #連線

# 左上方的上下左右按鈕回傳值為tuple
# hats = {'hatup':(0, 1), 'hatdown':(0, -1), 'hatleft':(-1, 0), 'hatright':(1, 0)}
hats = {'F':(0, 1), 'B':(0, -1), 'L':(-1, 0), 'R':(1, 0)}
# LeftSpeed(左速ls) , RightSpeed(右速rs) 預設為100全速
bs = 50 # 開始速度
sp = 30 # 初始最低速
cmd = list()

pg.init()
pg.joystick.init()

# 底下程式碼可以檢查所有的搖桿名稱
# joysticks = [pg.joystick.Joystick(x) for x in range(pg.joystick.get_count())]
# for js in joysticks:
#     print(js.get_name())

js = pg.joystick.Joystick(0)

# print(f'joystick init = {js.get_init()}')
# print(f'joystick name = {js.get_name()}')
# print(f'number axer = {js.get_numaxes()}')
# print(f'number buttons = {js.get_numbuttons()}')
# print(f'number hats = {js.get_numhats()}')
def send_cmd(cmd):
    print(f'command = {cmd}')
    command = pickle.dumps(cmd)
    client.send(command)

try:
    while True:
        events = pg.event.get()
        for event in events:
            if event.type == pg.JOYAXISMOTION:
                # print(event.dict, event.instance_id, event.axis, event.value, js.get_axis(event.axis))
                fsp = (lambda s : s if s >= 30 else 30)(int(-event.value * 100))
                sp = fsp
                if event.axis == 4 : # 右邊的搖捍軸,左右輪速度
                    cmd = ('lrs', fsp)
                elif event.axis == 2 : # 左下方按鍵式搖捍軸,左輪速度
                    cmd = ('ls', fsp)
                elif event.axis == 5 : # 右下方按鍵式搖捍軸,右輪速度
                    cmd = ('rs', fsp)
                send_cmd(cmd)
            elif event.type == pg.JOYHATMOTION and event.value != (0, 0):
                #print(event.dict, event.instance_id, event.hat, event.value)
                for key, value in hats.items():
                    if value == event.value:
                        cmd = (key, max(bs,sp))
                send_cmd(cmd)
            # elif event.type == pg.JOYBALLMOTION:
            #     print(event.dict, event.instance_id, event.ball, event.rel)
            # elif event.type == pg.JOYBUTTONDOWN:
            #     print(event.dict, event.instance_id, event.button, 'pressed')
            elif event.type == pg.JOYBUTTONUP:
                #print(event.dict, event.instance_id, event.button, 'released')
                if event.button == 0: # 按鈕A被放開
                    send_cmd(('S', 0))
except KeyboardInterrupt:
    print("EXITING NOW")
    js.quit()
    pg.joystick.quit()
    client.close()
#! /usr/bin/env python3
# 後端自走車伺服器的程式碼
from car import Car
import RPi.GPIO as gpio
import socket
import pickle
from time import sleep

#自走車上的LED燈
leds = (10, 9, 25)
gpio.setmode(gpio.BCM)	
gpio.setwarnings(False)
#先把LED全部關掉
for led in leds:
    gpio.setup(led, gpio.OUT, initial=gpio.HIGH)
gpio.output(leds[0],False)

mycar = Car(left=(20, 21, 26), right=(13, 19, 16))

BUF_SIZE = 1024  #設定緩衝區大小
# car_addr = ('192.168.0.104', 8099)  #自走車IP位址
# car_host = socket.gethostname()
car_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  #建立socket物件
car_server.bind(('', 8099))  #繫結
gpio.output(leds[1],False)

car_server.listen(5)  #設定最大監聽數為5
print('等待行動中...')
client, client_addr = car_server.accept()  #接收連線,傳回前端位址
gpio.output(leds[2],False)
print(f'前端位址為:{client_addr}')

try:
    while True :
        cmd = pickle.loads(client.recv(BUF_SIZE))  #從前端接收行動命令
        print(f'read command = {cmd}')
        command = cmd[0]
        sp = cmd[1]
        if command == 'F':
            print('向前')
            mycar.forward((sp, sp))
        elif command == 'B':
            print('後退')
            mycar.backward((sp, sp))
        elif command == 'L':
            print('左轉')
            mycar.turn_left()
        elif command == 'R':
            print('右轉')
            mycar.turn_right()
        elif command == 'lrs':
            mycar.speed = (sp, sp)
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
        elif command == 'rs':
            mycar.right_speed = sp
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
        elif command == 'ls':
            mycar.left_speed = sp
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
        elif command == 'S':
            mycar.stop()
except:
    client.close()
    gpio.cleanup()

前端 /lib/systemd/system/client_car.service

 [Unit]
 Description=Client Car Service
 Wants=network-online.target
 After=network-online.target
 
 [Service]
 Type=simple
 User=pi
 WorkingDirectory=/home/pi/raspi_gpio/JOYSTICK
 ExecStart=/usr/bin/python3 /home/pi/raspi_gpio/JOYSTICK/js_test.py > /dev/null 2>&1

 [Install]
 WantedBy=multi-user.target

後端 /lib/systemd/system/jscar.service

[Unit]
 Description=My JoystickCar Service
 Wants=network-online.target
 After=network-online.target 

[Service]
 Type=simple
 User=pi
 WorkingDirectory=/home/pi/socket_car
 ExecStart=/usr/bin/python3 /home/pi/socket_car/joystickcar.py > /dev/null 2>&1

[Install]
 WantedBy=multi-user.target

指令參考:

sudo systemctl daemon-reload

sudo systemctl enable XXXXX.service

sudo systemctl status XXXXX.service

四、前端connect例外處理

透過搖捍按鈕來進行連接動作,並且嘗試五次,可避免一定要先開好自走車伺服器的問題。

import pygame as pg
import socket
import pickle
from time import sleep
import sys

#簡易口訣:前建連送收
# car_addr = ('192.168.0.104', 8099)   #遠端自走車的 ip, port
# sleep(20)
client = None
con_times = 1
check_connect = True

# 左上方的上下左右按鈕回傳值為tuple
# hats = {'hatup':(0, 1), 'hatdown':(0, -1), 'hatleft':(-1, 0), 'hatright':(1, 0)}
hats = {'F':(0, 1), 'B':(0, -1), 'L':(-1, 0), 'R':(1, 0)}
# LeftSpeed(左速ls) , RightSpeed(右速rs) 預設為100全速
bs = 50 # 開始速度
sp = 30 # 初始最低速
cmd = list()

pg.init()
pg.joystick.init()

# 底下程式碼可以檢查所有的搖桿名稱
# joysticks = [pg.joystick.Joystick(x) for x in range(pg.joystick.get_count())]
# for js in joysticks:
#     print(js.get_name())

js = pg.joystick.Joystick(0)

# print(f'joystick init = {js.get_init()}')
# print(f'joystick name = {js.get_name()}')
# print(f'number axer = {js.get_numaxes()}')
# print(f'number buttons = {js.get_numbuttons()}')
# print(f'number hats = {js.get_numhats()}')
def send_cmd(cmd):
    if client:
        print(f'command = {cmd}')
        command = pickle.dumps(cmd)
        client.send(command)

try:
    while True:
        events = pg.event.get()
        for event in events:
            if event.type == pg.JOYAXISMOTION:
                # print(event.dict, event.instance_id, event.axis, event.value, js.get_axis(event.axis))
                fsp = (lambda s : s if s >= 30 else 30)(int(-event.value * 100))
                sp = fsp
                if event.axis == 4 : # 右邊的搖捍軸,左右輪速度
                    cmd = ('lrs', fsp)
                elif event.axis == 2 : # 左下方按鍵式搖捍軸,左輪速度
                    cmd = ('ls', fsp)
                elif event.axis == 5 : # 右下方按鍵式搖捍軸,右輪速度
                    cmd = ('rs', fsp)
                send_cmd(cmd)
            elif event.type == pg.JOYHATMOTION and event.value != (0, 0):
                #print(event.dict, event.instance_id, event.hat, event.value)
                for key, value in hats.items():
                    if value == event.value:
                        cmd = (key, max(bs,sp))
                send_cmd(cmd)
            # elif event.type == pg.JOYBALLMOTION:
            #     print(event.dict, event.instance_id, event.ball, event.rel)
            # elif event.type == pg.JOYBUTTONDOWN:
            #     print(event.dict, event.instance_id, event.button, 'pressed')
            elif event.type == pg.JOYBUTTONUP:
                print(event.dict, event.instance_id, event.button, 'released')
                if event.button == 0: # 按鈕A被放開
                    send_cmd(('S', 0))
                elif event.button == 1: # 按鈕B被放開
                    print('command is button B')
                    # 也可以將connect程式區塊放在這裡,也就是按下B鍵放開後,才開始連接自走車
                    while check_connect:
                        try:
                            client = socket.socket()
                            client.settimeout(5.0)
                            client.connect(('192.168.0.104', 8099))  #連線
                            check_connect = False
                            client.settimeout(None)
                        except OSError as msg:
                            sleep(3)
                            print(f'error={con_times} message={msg}')
                            con_times += 1
                            client.close()
                            if con_times > 5:    # 嘗試連接次數
                                sys.exit(1)
except KeyboardInterrupt:
    print("EXITING NOW")
    js.quit()
    pg.joystick.quit()
    client.close()

五、打造自走車無線基地台

在沒有網路的環境之下(如野外運動場),無法連接到自走車進行遙控,此時,將樹莓派打造成無線基地台,提供前端另一塊樹莓派彧是手機等行動裝置連接,就可以在「封閉式網路」遙控自走車了。

官方設定方式網站:https://www.raspberrypi.org/documentation/configuration/wireless/access-point-routed.md

六、使用手機控制

利用app inventor(AI2)來快速開發手機應用程式。

ClientSocket extension 下載位置:https://community.appinventor.mit.edu/t/tcp-ip-extension/7142

#! /usr/bin/env python3
# 請注意,如果使用的檔名並非第三步驟的檔名,請配合一起修正system daemon service
from car import Car
import RPi.GPIO as gpio
import socket
import pickle
from time import sleep

#自走車上的LED燈
leds = (10, 9, 25)
gpio.setmode(gpio.BCM)	
gpio.setwarnings(False)
#先把LED全部關掉
for led in leds:
    gpio.setup(led, gpio.OUT, initial=gpio.HIGH)
gpio.output(leds[0],False)

mycar = Car(left=(20, 21, 26), right=(13, 19, 16))

BUF_SIZE = 1024  #設定緩衝區大小
# car_addr = ('192.168.0.104', 8099)  #自走車IP位址
# car_host = socket.gethostname()
car_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  #建立socket物件
car_server.bind(('', 8099))  #繫結
gpio.output(leds[1],False)

car_server.listen(5)  #設定最大監聽數為5
print('等待行動中...')
client, client_addr = car_server.accept()  #接收連線,傳回前端位址
gpio.output(leds[2],False)
print(f'前端位址為:{client_addr}')

# 初始速度
lsp = rsp = 100
try:
    while True :
        # cmd = pickle.loads(client.recv(BUF_SIZE))  #從前端接收行動命令
        read_data = client.recv(BUF_SIZE).decode('utf-8')
        print(f'read data = {read_data}')
        command = read_data.split('-')[0]
        print(f'command = {command}')
        # 先決定車速
        if command == 'LS':
            lsp = int(float(read_data.split('-')[1]))
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
            print(f'lsp={lsp}')
        elif command == 'RS':
            rsp = int(float(read_data.split('-')[1]))
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
            print(f'rsp={rsp}')
        
        mycar.speed=(lsp, rsp)
        print(f'car speed = {mycar.speed}')
        
        if command == 'F':
            print('向前')
            mycar.forward()
        elif command == 'B':
            print('後退')
            mycar.backward()
        elif command == 'L':
            print('左轉')
            mycar.turn_left()
        elif command == 'R':
            print('右轉')
            mycar.turn_right()
        elif command == 'S':
            mycar.stop()
except:
    client.close()
    gpio.cleanup()

七、番外篇:藍芽

利用android手機的藍芽和樹莓派的藍芽連接,從而控制自走車的方向,也是重要課題。

AI2 手機程式下載:https://drive.google.com/file/d/1QNxFC-2z-FGC7GlmxEfjArjxbx4sd4Jt/view?usp=sharing

sudo nano /lib/systemd/system/bluetooth.service
[Unit]
Description=Bluetooth service
Documentation=man:bluetoothd(8)
ConditionPathIsDirectory=/sys/class/bluetooth

[Service]
Type=dbus
BusName=org.bluez
ExecStart=/usr/lib/bluetooth/bluetoothd -C
ExecStartPost=/usr/bin/sdptool add SP
NotifyAccess=main
#WatchdogSec=10
#Restart=on-failure
CapabilityBoundingSet=CAP_NET_ADMIN CAP_NET_BIND_SERVICE
LimitNPROC=1
ProtectHome=true
ProtectSystem=full

[Install]
WantedBy=bluetooth.target
Alias=dbus-org.bluez.service
sudo chmod 777 /var/run/sdp
sdptool browse local
sudo apt install libbluetooth-dev
pip3 install pybluez2
import RPi.GPIO as gpio
from car import Car
import bluetooth as bt

#自走車上的LED燈
leds = (10, 9, 25)
gpio.setmode(gpio.BCM)	
gpio.setwarnings(False)
#先把LED全部關掉
for led in leds:
    gpio.setup(led, gpio.OUT, initial=gpio.HIGH)
gpio.output(leds[0],False)

mycar = Car(left=(20, 21, 26), right=(13, 19, 16))

server_socket=bt.BluetoothSocket(bt.RFCOMM)
server_socket.bind(("", bt.PORT_ANY))
gpio.output(leds[1],False)

server_socket.listen(1)
print('等待行動中...')
# port = server_socket.getsockname()[1]
# print(f'port = {port}')
client_sock, address = server_socket.accept()
gpio.output(leds[2],False)
print(f'前端連接藍芽位址:{address}')

lsp = rsp = 100
try:
    while True :
        read_data = client_sock.recv(1024).decode('utf-8')  #從前端接收行動命令
        print(f'read data = {read_data}')
        command = read_data.split('-')[0]
        print(f'command = {command}')
        # 先決定車速
        if command == 'LS':
            lsp = int(float(read_data.split('-')[1]))
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
            print(f'lsp={lsp}')
        elif command == 'RS':
            rsp = int(float(read_data.split('-')[1]))
            print(f'目前速度為:左={mycar.speed[0]},右={mycar.speed[1]}')
            print(f'rsp={rsp}')
        
        mycar.speed=(lsp, rsp)
        print(f'car speed = {mycar.speed}')
        
        if command == 'F':
            print('向前')
            mycar.forward()
        elif command == 'B':
            print('後退')
            mycar.backward()
        elif command == 'L':
            print('左轉')
            mycar.turn_left()
        elif command == 'R':
            print('右轉')
            mycar.turn_right()
        elif command == 'S':
            print('停止')
            mycar.stop()
except:
    client_sock.close()
    server_socket.close()
    gpio.cleanup()

Last updated