import async_mqtt_uart.py
import os
import time
import ubinascii
import machine
import micropython
import network
import esp
esp.osdebug(None)
import gc
gc.collect()
import upip
# set wifi information
ssid = 'photon'
password = 'particle'
#create station instance
station = network.WLAN(network.STA_IF)
#set active
station.active(True)
# if the station is not connected,
if not station.isconnected():
# connect the station with ssid and password
station.connect(ssid, password)
# wait until the station is connected
while station.isconnected() == False:
pass
# print connection successful
print('Connection successful')
# print full connection information
print(station.ifconfig())
#if there is a path named 'lib' in the current directory
if 'lib' in os.listdir('./'):
# if logging.py is not in the lib directory
if 'logging.py' not in os.listdir('lib/'):
# install the micropython logging library
upip.install('micropython-logging')
# if mqtt_async.py is not in the lib directory,
if 'mqtt_async.py' not in os.listdir('lib/'):
# install the micropython mqtt library
upip.install('micropython-mqtt')
# if there is no 'lib directory', nothing has been downloaded
else:
# so download both libraries
upip.install('micropython-logging')
upip.install('micropython-mqtt')
# once the logging library has been loaded, import it
import logging
# set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# write one debug message
logger.debug(station.ifconfig())
# Derived from:
# * https://github.com/peterhinch/micropython-async/blob/master/v3/as_demos/auart.py
# * https://github.com/tve/mqboard/blob/master/mqtt_async/hello-world.py
from mqtt_async import MQTTClient, config
import uasyncio as asyncio
import time
from machine import UART
import my_oled
import logging
logging.basicConfig(level=logging.DEBUG)
MAXTX = 4
# Change the following configs to suit your environment
TOPIC_PUB = 'EGR314/Team203/mls'
TOPIC_SUB = 'EGR314/Instructor'
config.server = 'egr3x4.ddns.net' # can also be a hostname
config.ssid = 'photon'
config.wifi_pw = 'particle'
uart = UART(2, 9600,tx=17,rx=16)
uart.init(9600, bits=8, parity=None, stop=1,flow=0) # init with given parameters
async def receiver():
b = b''
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.read(1)
if res==b'\r':
await client.publish(TOPIC_PUB, b, qos=1)
print('published', b)
b = b''
else:
b+=res
def callback(topic, msg, retained, qos):
print('callback',topic, msg, retained, qos)
# my_oled.print_data(msg)
# my_oled.plot_data(msg)
while (not not msg):
uart.write(msg[:MAXTX])
time.sleep(.01)
msg = msg[MAXTX:]
uart.write('\r\n')
time.sleep(.01)
async def conn_callback(client): await client.subscribe(TOPIC_SUB, 1)
async def main(client):
await client.connect()
asyncio.create_task(receiver())
while True:
await asyncio.sleep(1)
config.subs_cb = callback
config.connect_coro = conn_callback
client = MQTTClient(config)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(client))
# used for ESP 32 testing
# import modules
from machine import UART
from machine import Pin
import time
# initialize a new UART class
uart = UART(2, 9600,tx=17,rx=16)
# run the init method with more details including baudrate and parity
uart.init(9600, bits=8, parity=None, stop=1)
led = Pin(2,Pin.OUT)
# run forever
while True:
# read one byte
c = uart.read(1)
# if c is not empty:
if c is not None:
# write the byte back out to uart
uart.write(c)
# print the byte to the shell
print(c)
# toggle the onboard LED
led.value(led.value()^1)
# sleep for a very small amount of time
time.sleep(.01)