summaryrefslogtreecommitdiff
path: root/network_manager.py
blob: c0f7f17c2b4056e859f2bc71de57274e838760f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import rp2
import network
import machine
import uasyncio


class NetworkManager:
    _ifname = ("Client", "Access Point")

    def __init__(self, country="GB", client_timeout=30, access_point_timeout=5, status_handler=None, error_handler=None):
        rp2.country(country)
        self._ap_if = network.WLAN(network.AP_IF)
        self._sta_if = network.WLAN(network.STA_IF)

        self._mode = network.STA_IF
        self._client_timeout = client_timeout
        self._access_point_timeout = access_point_timeout
        self._status_handler = status_handler
        self._error_handler = error_handler
        self.UID = ("{:02X}" * 8).format(*machine.unique_id())

    def isconnected(self):
        return self._sta_if.isconnected() or self._ap_if.isconnected()

    def config(self, var):
        if self._sta_if.active():
            return self._sta_if.config(var)
        else:
            if var == "password":
                return self.UID
            return self._ap_if.config(var)

    def mode(self):
        if self._sta_if.isconnected():
            return self._ifname[0]
        if self._ap_if.isconnected():
            return self._ifname[1]
        return None

    def ifaddress(self):
        if self._sta_if.isconnected():
            return self._sta_if.ifconfig()[0]
        if self._ap_if.isconnected():
            return self._ap_if.ifconfig()[0]
        return '0.0.0.0'

    def disconnect(self):
        if self._sta_if.isconnected():
            self._sta_if.disconnect()
        if self._ap_if.isconnected():
            self._ap_if.disconnect()

    async def wait(self, mode):
        while not self.isconnected():
            self._handle_status(mode, None)
            await uasyncio.sleep_ms(1000)

    def _handle_status(self, mode, status):
        if callable(self._status_handler):
            self._status_handler(self._ifname[mode], status, self.ifaddress())

    def _handle_error(self, mode, msg):
        if callable(self._error_handler):
            if self._error_handler(self._ifname[mode], msg):
                return
        raise RuntimeError(msg)

    async def client(self, ssid, psk):
        if self._sta_if.isconnected():
            self._handle_status(network.STA_IF, True)
            return

        self._ap_if.disconnect()
        self._ap_if.active(False)

        self._sta_if.active(True)
        self._sta_if.connect(ssid, psk)
        self._sta_if.config(pm=0xa11140)

        try:
            await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout)
            self._handle_status(network.STA_IF, True)

        except uasyncio.TimeoutError:
            self._sta_if.active(False)
            self._handle_status(network.STA_IF, False)
            self._handle_error(network.STA_IF, "WIFI Client Failed")

    async def access_point(self):
        if self._ap_if.isconnected():
            self._handle_status(network.AP_IF, True)
            return

        self._sta_if.disconnect()
        self._sta_if.active(False)

        self._ap_if.ifconfig(("10.10.1.1", "255.255.255.0", "10.10.1.1", "10.10.1.1"))
        self._ap_if.config(password=self.UID)
        self._ap_if.active(True)

        try:
            await uasyncio.wait_for(self.wait(network.AP_IF), self._access_point_timeout)
            self._handle_status(network.AP_IF, True)

        except uasyncio.TimeoutError:
            self._sta_if.active(False)
            self._handle_status(network.AP_IF, False)
            self._handle_error(network.AP_IF, "WIFI Client Failed")