CerveauPy2026/Messagerie.py
2026-04-22 18:11:26 +02:00

187 lines
6.9 KiB
Python

import struct
import serial
import socket
from time import sleep
from time import time
import socket
def send_msg(message : bytes):
message_binaire = struct.pack('BBB{}sB'.format(message.__len__()), 0xFF, 0xFF, message.__len__() + 1, message, 0x00)
print(message_binaire)
ser.write(message_binaire)
def msg_envoi_donnees(carte_id: bytes, registre : int, charge_utile : bytes):
send_msg(struct.pack('ccBB{}s'.format(charge_utile.__len__()), b'r', carte_id, registre, charge_utile.__len__(), charge_utile))
def msg_demande_donnees(carte_id: bytes, registre : int, charge_utile : bytes):
pass
def led_on():
msg_envoi_donnees(b'D', 0x00, b'\x06')
def led_off():
msg_envoi_donnees(b'D', 0x00, b'\x36')
def lecture_pos():
ser.write(b'\xFF\xFF\x05dP\x00\x0c\x00')
nb_read = ser.in_waiting
data = ser.read(nb_read)
print("nb_read={}".format(nb_read))
list_message = data.split(b"\xff\xffP")
print("nb_read={}, list_message={}".format(nb_read, len(list_message)))
if len(list_message)>1:
if len(list_message[1])>12:
int_value = struct.unpack('fff',list_message[1][1:13])
return (int_value[0],int_value[1],int_value[2])
def lecture_abscisse():
ser.write(b'\xFF\xFF\x05dP\x8C\x04\x00')
while ser.in_waiting == 0:
pass
data = ser.read(ser.in_waiting)
list_message = data.split(b"\xff\xffP")
if len(list_message)>1:
if len(list_message[1])>4:
int_value = struct.unpack('f',list_message[1][1:5])
return int_value[0]
def lecture_pos_consigne():
ser.write(b'\xFF\xFF\x05dP\x00\x08\x00')
while ser.in_waiting == 0:
pass
data = ser.read(ser.in_waiting)
list_message = data.split(b"\xff\xffP")
if len(list_message)>1:
if len(list_message[1])>12:
int_value = struct.unpack('ff',list_message[1][1:9])
return (int_value[0],int_value[1])
def lecture_propulsion():
ser.write(b'\xFF\xFF\x05dP\x80\x21\x00')
nb_read = ser.in_waiting
data = ser.read(nb_read)
# print("nb_read={}".format(nb_read))
list_message = data.split(b"\xff\xffP")
# print("nb_read={}, list_message={}".format(nb_read, len(list_message)))
if len(list_message)>1:
if len(list_message[1])>33:
struct_value = struct.unpack('ffffffffB',list_message[1][1:34])
return (struct_value[0],struct_value[1],struct_value[2], struct_value[3], struct_value[4], struct_value[5], struct_value[6], struct_value[7], struct_value[8])
def envoie_trajectoire(traj):
msg_envoi_donnees(b'P', 0x22, traj)
def envoie_cde_PWM(pwn_gauche, pwm_droit):
commande_PWM = struct.pack('=Bhh',1, pwn_gauche, pwm_droit)
msg_envoi_donnees(b'P', 0x0D, commande_PWM)
def envoie_cde_vitesse_moteur(moteur_gauche_mm_s, moteur_droit_mm_s):
commande_PWM = struct.pack('=Bhhff', 2, 0, 0, moteur_gauche_mm_s, moteur_droit_mm_s)
msg_envoi_donnees(b'P', 0x0D, commande_PWM)
def envoie_cde_set_position(x_mm, y_mm, z_rad):
commande_set_pos = struct.pack('fff', x_mm, y_mm, z_rad)
msg_envoi_donnees(b'P', 0x00, commande_set_pos)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def log_variable(nom_variable, valeur):
sock.sendto(nom_variable.encode() + ":".encode() + str(valeur).encode(), ("localhost", 47269))
def auto_test():
envoie_cde_vitesse_moteur(100, 0)
sleep(3)
envoie_cde_vitesse_moteur(0, 100)
sleep(3)
envoie_cde_vitesse_moteur(100, 100)
sleep(3)
envoie_cde_PWM(0, 0)
if __name__ == "__main__":
# sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
# sock.sendto(bytes(">c:1", "utf-8"), ("127.0.0.1", 47269))
ser = serial.Serial('/dev/ttyACM0' , 115200)
traj1 = struct.pack('bffff', 0, 0.0, 0.0, 100. , 0.)
traj2 = struct.pack('bffff', 0, 100.0, 0.0, 200. , 0.)
traj_actuelle = traj1
# traj = struct.pack('bffffffff', 2,
# 0.0, 0.0,
# 50. , 0.,
# 150. , 200.,
# 200. , 200.,)
# msg_envoi_donnees(b'P', 0x22, traj)
envoie_cde_set_position(0, 0, 0)
# envoie_trajectoire(traj_actuelle)
# envoie_cde_set_position(0, 0, 0)
# envoie_cde_PWM(8000, 8000)
# envoie_cde_vitesse_moteur(100, 100)
auto_test()
# exit()
print("carte\ttemps\tx\ty\torientation\tabscisse\tx_consigne\ty_consigne\tvit_roue_gauche\tvit_roue_droite\tetat_trajet")
while True:
pos = None
données_propulsion= None
données_propulsion = lecture_propulsion()
#abscisse = lecture_abscisse()
#pos_consigne = lecture_pos_consigne()
# if pos is not None and abscisse is not None and pos_consigne is not None :
# print(">pos:{:.2f}:{:.2f}:{:.2f}:{:.2f}:{:.2f}".
# format(pos[0],pos[1], abscisse, pos_consigne[0], pos_consigne[1] ))
if données_propulsion != None :
x, y = données_propulsion[0],données_propulsion[1]
orientation = données_propulsion[2]
abscisse = données_propulsion[3]
x_consigne, y_consigne = données_propulsion[4],données_propulsion[5]
vit_roue_gauche, vit_roue_droite = données_propulsion[6], données_propulsion[7]
etat_trajet = données_propulsion[8]
log_variable("x_pos", x)
log_variable("y_pos", y)
log_variable("Orientation", orientation)
log_variable("abscisse", abscisse)
log_variable("x_consigne", x_consigne)
log_variable("y_consigne", y_consigne)
log_variable("vit_roue_gauche", vit_roue_gauche)
log_variable("vit_roue_droite", vit_roue_droite)
log_variable("etat_trajet", etat_trajet)
print("propulsion\t{:.3f}\t{:.2f}\t{:.2f}\t{:.2f}\t{:.2f}\t{:.2f}\t{:.2f}\t{:.2f}\t{:.2f}\t{}\t".format(
time(), x, y, orientation, abscisse, x_consigne, y_consigne, vit_roue_gauche, vit_roue_droite, etat_trajet))
# if etat_trajet == 1:
# if etat_trajet_flag == 0:
# etat_trajet_flag = 1
# if traj_actuelle == traj1:
# traj2 = struct.pack('bffff', 0, x, y, 0. , 0.)
# print(traj2)
# traj_actuelle = traj2
# envoie_trajectoire(traj_actuelle)
# else:
# traj1 = struct.pack('bffff', 0, x, y, 100. , 0.)
# print(traj1)
# traj_actuelle = traj1
# envoie_trajectoire(traj_actuelle)
# else:
# etat_trajet_flag = 0
else:
# print("propulsion=None")
pass
#abscisse = lecture_abscisse()
# if abscisse != None:
# print(">abscisse:{:.2f}".format(abscisse))
sleep(0.01)