Transmit broadcast packet
import socket
socket_udp1 = None
#******************************************
#******************************************
#********** GET LOCAL IP ADDRESS **********
#******************************************
#******************************************
def get_local_ip():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect(('10.255.255.255', 1)) #Doesn't neeed to be a reachable IP address
local_ip = sock.getsockname()[0]
except Exception:
local_ip = '127.0.0.1'
finally:
sock.close()
return local_ip
#*********************************
#*********************************
#********** OPEN SOCKET **********
#*********************************
#*********************************
def udp_open_socket(local_port, local_ip):
global socket_udp1
socket_udp1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) #Create a UDP socket
socket_udp1.bind((local_ip, local_port)) #Bind the socket to the local IP and port
socket_udp1.settimeout(0.001) #<<<<SET TIMEOUT FOR calls to recvfrom() (prevents blocking indefinitely, can't use 0, so set to say 0.001=1mS for applications where we don't want to stall)
#**********************************
#**********************************
#********** CLOSE SOCKET **********
#**********************************
#**********************************
def udp_close_socket():
socket_udp1.close()
#***************************************************
#***************************************************
#********** TRANSMIT BROADCAST UDP PACKET **********
#***************************************************
#***************************************************
def udp_tx_broadcast(packet, remote_port, broadcast_ip):
global socket_udp1
socket_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) #Enable broadcasting mode
socket_udp1.sendto(packet, (broadcast_ip, remote_port)) #Send the byte array to the broadcast address
#************************************************
#************************************************
#********** CHECK FOR RECEIVED PACKETS **********
#************************************************
#************************************************
def udp_check_for_rx():
global socket_udp1
try:
while True: #<Loop forever, our sockets settimeout() will cause "except socket.timeout:" to fire on no more packets received
data, addr = socket_udp1.recvfrom(1824) #Wait for a response, timeout will trigger an exception
#Debug print the response:
#print("New response:")
#print(data.decode('utf-8'))
rx_data = data.decode('utf-8')
except socket.timeout:
#No rx waiting
pass
#########################
##### EXAMPLE USAGE #####
#########################
packet = bytes([0x01, 0x02, 0x04, 0x04])
broadcast_ip = "192.168.1.255"
port = 12345
local_ip = get_local_ip()
udp_tx_broadcast(packet, port, broadcast_ip, local_ip)
time.sleep(3)
udp_check_for_rx() #Non blocking, will exit if nothing received
Feel free to comment if you can add help to this page or point out issues and solutions you have found. I do not provide support on this site, if you need help with a problem head over to stack overflow.