Skip to content
Snippets Groups Projects
Verified Commit 8b332c04 authored by Alexander Wellbrock's avatar Alexander Wellbrock
Browse files

rewrite interrupt handler as thread class

This approach moves the interrupt wait logic into it's own thread.

This does not yet work as the previous solution did not too. The
previous solution didn't actually register the interrupt. The new
one does try to register the interrupt but fails, since the line is
already bound as an input type from the pin init method before the
interrupt is called. Tried approaches to free the pin before trying to
bind it again or to find out if the input should be an interrupt did
not work yet.
parent 9159ead9
No related branches found
No related tags found
No related merge requests found
from pi_mqtt_gpio.modules import GenericGPIO, PinDirection, PinPullup, \
InterruptEdge
from threading import Thread
import threading
import queue
from datetime import datetime, timedelta
# Requires libgpiod-devel, libgpiod
REQUIREMENTS = ("gpiod",)
......@@ -19,7 +21,6 @@ PULLUPS = None
INTERRUPT = None
GPIO_INTERRUPT_CALLBACK_LOOKUP = {}
class GPIO(GenericGPIO):
"""
Implementation of GPIO class for libgpiod (linux kernel >= 4.8).
......@@ -74,20 +75,17 @@ class GPIO(GenericGPIO):
callback: the callback function to be called, when interrupt occurs
bouncetime: minimum time between two interrupts
"""
edge = INTERRUPT[edge]
offset = pin
pin = self.chip.get_line(offset)
config = self.io.line_request()
config.consumer = 'pi-mqtt-gpio'
config.request_type = edge
config.request_type = INTERRUPT[edge]
t = Thread(target=self._event_detect, args=(pin,self.interrupt_callback,))
t = GpioThread(chip=self.chip, offset=pin, config=config,
callback=callback, bouncetime=bouncetime)
t.start()
self.watchers[offset] = t
self.GPIO_INTERRUPT_CALLBACK_LOOKUP[offset] = {"handle": handle,
self.GPIO_INTERRUPT_CALLBACK_LOOKUP[offset] = {"handle": t.handle,
"callback": callback}
def set_pin(self, pin, value):
......@@ -101,7 +99,37 @@ class GPIO(GenericGPIO):
def cleanup(self):
pass
def _event_detect(self, pin, callback):
class GpioThread(threading.Thread):
def __init__(self, chip, offset, config, callback, bouncetime):
super().__init__()
self.daemon = True
self._queue = queue.Queue()
self.pin = chip.get_line(offset)
self.pin.request(config)
self.callback = callback
self.bouncetime = timedelta(microseconds=bouncetime)
def run(self):
previous_event_time = datetime.now()
while True:
if pin.event_wait():
callback()
if self.pin.event_wait():
event = self.pin.event_read()
if event.timestamp - previous_event_time > self.bouncetime:
previous_event_time = event.timestamp
ret = self.callback()
self._queue.put(
{
"type": event.event_type,
"time": event.timestamp,
"result": ret,
}
)
@property
def handle(self):
if self._queue.empty():
return None
return self._queue.get()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment