rawsocketpy

This library allows you to implement a custom layer 2 communication using raw sockets in Python 2 and Python 3, synchronous and asynchronous, with and without callbacks.


License
MIT
Install
pip install rawsocketpy==0.4.0

Documentation

RawSocketPy

API Documentation

Raw socket is a layer 2 python library for communication using the MAC addresses only.

This allows you to create a custom made Ethernet/WiFi communication system which is not using IP nor TCP/UDP or to debug custom frames such as SERCOS III, Profibus, ARP, PTP, ...

Python versions tested:

  • 2.7.x
  • 3.5.x
  • 3.8.x

OSes:

  • Linux 14.04
  • Linux 16.04
  • Linux 18.04
  • Linux 20.04

Not working on:

  • Windows 10 - Due to Raw socket limitation
  • Mac OSX - Due to implementation (can likely be made working)

Pros:

  • Low level
  • Not using TCP-UDP/IP
  • Dead simple
  • Can broadcast
  • MTU of 1500
  • Asynchronous server
  • Stateful capability

Cons:

  • Low level
  • Not using TCP-UDP/IP
  • No encryption
  • No fragmentation
  • Requires root
  • MTU of 1500 (Ethernet frames)

Installation

# Soon
sudo -H pip install rawsocketpy

# for now:
git clone https://github.com/AlexisTM/rawsocket_python
cd rawsocket_python
sudo python setup.py install

Fast testing

On one computer:

sudo python -c "from rawsocketpy import RawSocket
sock = RawSocket('wlp2s0', 0xEEFA)
while True: print(sock.recv())"

# 12:34:56:78:9A:BC == 0xEEFA => FF:FF:FF:FF:FF:FF - OK:
# Boo

On the second computer over the same router (or same network on Zerotier):

sudo python -c "from rawsocketpy import RawSocket; import time
sock = RawSocket('wlp2s0', 0xEEFA)
while True:
  sock.send('Boo')
  print('Boo has been sent')
  time.sleep(0.5)"

In-depth

from rawsocketpy import RawSocket

# 0xEEFA is the ethertype
# The most common are available here: https://en.wikipedia.org/wiki/EtherType
# The full official list is available here: https://regauth.standards.ieee.org/standards-ra-web/pub/view.html#registries
# Direct link: https://standards.ieee.org/develop/regauth/ethertype/eth.csv
# You can use whatever you want but using a already use type can have unexpected behaviour.
sock = RawSocket("wlp2s0", 0xEEFA)
sock.send("some data") # Broadcast "some data" with an ethertype of 0xEEFA
sock.send("personal data", dest="\xAA\xBB\xCC\xDD\xEE\xFF") # Send "personal data to \xAA\xBB\xCC\xDD\xEE\xFF with an ether type of 0xEEFA
sock.send("other data", ethertype="\xEE\xFF") # Broadcast "other data" with an ether type of 0xEEFF

Receiving

On another machine, you can run the following:

from rawsocketpy import RawSocket, to_str

sock = RawSocket("wlp2s0", 0xEEFA)
packet = sock.recv()
# The type of packet is RawPacket() which allows pretty printing and unmarshal the raw data.

# If you are using Python2, all data is encoded as unicode strings "\x01.." while Python3 uses bytearray.

print(packet) # Pretty print
packet.dest   # string "\xFF\xFF\xFF\xFF\xFF\xFF" or bytearray(b"\xFF\xFF\xFF\xFF\xFF\xFF")
packet.src    # string "\x12\x12\x12\x12\x12\x13" or bytearray(b"\x12\x12\x12\x12\x12\x13")
packet.type   # string "\xEE\xFA" or bytearray([b"\xEE\xFA"]
packegt.data  # string "some data" or bytearray(b"some data"]

print(to_str(packet.dest))     # Human readable MAC:  FF:FF:FF:FF:FF:FF
print(to_str(packet.type, "")) # Human readable type: EEFA

Stateless blocking Server

from rawsocketpy import RawServer

class LongTaskTest(RawRequestHandler):
    def handle(self):
        time.sleep(1)
        print(self.packet)

    def finish(self):
        print("End")

    def setup(self):
        print("Begin")

def main():
    rs = RawServer("wlp2s0", 0xEEFA, LongTaskTest)
    rs.spin()

if __name__ == '__main__':
    main()

Stateful Asynchronous server

Available only if gevent is installed.

from rawsocketpy import RawRequestHandler, RawAsyncServerCallback
import time

def callback(handler, server):
    print("callback")
    handler.setup()
    handler.handle()
    handler.finish()


class LongTaskTest(RawRequestHandler):
    def handle(self):
        time.sleep(1)
        print(self.packet)

    def finish(self):
        print("End")

    def setup(self):
        print("Begin")

def main():
    rs = RawAsyncServerCallback("wlp2s0", 0xEEFA, LongTaskTest, callback)
    rs.spin()

if __name__ == '__main__':
    main()

I want to contribue!!

You are free to contribue, the following capabilities are welcome:

  • Windows compatibility (even possible?)
  • MacOS compatibility
  • More Python versions and OS tests
  • Bring a major refactor ;)

Credits