Transmit broadcast packet

import socket

socket_udp1 = None


#******************************************
#******************************************
#********** GET LOCAL IP ADDRESS **********
#******************************************
#******************************************
def get_local_ip():
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    try:
        sock.connect(('10.255.255.255', 1))     #Doesn't neeed to be a reachable IP address
        local_ip = sock.getsockname()[0]
    except Exception:
        local_ip = '127.0.0.1'
    finally:
        sock.close()

    return local_ip


#*********************************
#*********************************
#********** OPEN SOCKET **********
#*********************************
#*********************************
def udp_open_socket(local_port, local_ip):
    global socket_udp1
    socket_udp1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)     #Create a UDP socket
    socket_udp1.bind((local_ip, local_port))                   #Bind the socket to the local IP and port
    
    socket_udp1.settimeout(0.001)                              #<<<<SET TIMEOUT FOR calls to recvfrom()  (prevents blocking indefinitely, can't use 0, so set to say 0.001=1mS for applications where we don't want to stall)


#**********************************
#**********************************
#********** CLOSE SOCKET **********
#**********************************
#**********************************
def udp_close_socket():
    socket_udp1.close()


#***************************************************
#***************************************************
#********** TRANSMIT BROADCAST UDP PACKET **********
#***************************************************
#***************************************************
def udp_tx_broadcast(packet, remote_port, broadcast_ip):
    global socket_udp1

    socket_udp1.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)      #Enable broadcasting mode
    socket_udp1.sendto(packet, (broadcast_ip, remote_port))                #Send the byte array to the broadcast address


#************************************************
#************************************************
#********** CHECK FOR RECEIVED PACKETS **********
#************************************************
#************************************************
def udp_check_for_rx():
    global socket_udp1

    try:
        while True:     #<Loop forever, our sockets settimeout() will cause "except socket.timeout:" to fire on no more packets received
            data, addr = socket_udp1.recvfrom(1824)         #Wait for a response, timeout will trigger an exception

            #Debug print the response:
            #print("New response:")
            #print(data.decode('utf-8'))
            
            rx_data = data.decode('utf-8')
          

    except socket.timeout:
        #No rx waiting
        pass


#########################
##### EXAMPLE USAGE #####
#########################
    packet = bytes([0x01, 0x02, 0x04, 0x04])
    broadcast_ip = "192.168.1.255"
    port = 12345
    local_ip = get_local_ip()
    udp_tx_broadcast(packet, port, broadcast_ip, local_ip)
    time.sleep(3)
    udp_check_for_rx()    #Non blocking, will exit if nothing received

Feel free to comment if you can add help to this page or point out issues and solutions you have found. I do not provide support on this site, if you need help with a problem head over to stack overflow.

Comments

Your email address will not be published. Required fields are marked *