Skip to content

Commit

Permalink
Merge pull request #6 from jobe3774/Publish_Data_Thread
Browse files Browse the repository at this point in the history
Publish data thread
  • Loading branch information
jobe3774 committed Oct 20, 2019
2 parents 582cc50 + 7ce9e91 commit c78af28
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 6 deletions.
117 changes: 117 additions & 0 deletions Examples/example4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# raspend - Example
#
# License: MIT
#
# Copyright (c) 2019 Joerg Beckers

import time
import random
import requests
from requests.exceptions import HTTPError
import argparse
import getpass
import json

from raspend.application import RaspendApplication
from raspend.utils import dataacquisition as DataAcquisition
from raspend.utils import publishing as Publishing

class DoorBell():
def __init__(self, *args, **kwargs):
self.doorBellState = "on"

def switchDoorBell(self, onoff):
if type(onoff) == str:
self.doorBellState = "on" if onoff == "on" else "off"
elif type(onoff) == int:
self.doorBellState = "on" if onoff >= 1 else "off"
else:
raise TypeError("State must be 'int' or 'string'.")
return self.doorBellState

def getCurrentState(self):
return self.doorBellState

class ReadOneWireTemperature(DataAcquisition.DataAcquisitionHandler):
def __init__(self, groupId, sensorId, oneWireSensorPath = ""):
# A groupId for grouping the temperature sensors
self.groupId = groupId
# The name or Id of your sensor under which you would read it's JSON value
self.sensorId = sensorId
# The path of your sensor within your system
self.oneWireSensorPath = oneWireSensorPath

def prepare(self):
if self.groupId not in self.sharedDict:
self.sharedDict[self.groupId] = {}
self.sharedDict[self.groupId][self.sensorId] = 0
return

def acquireData(self):
# If you use 1-Wire sensors like a DS18B20 you normally would read its w1_slave file like:
# /sys/bus/w1/devices/<the-sensor's system id>/w1_slave
temp = random.randint(18, 24)
self.sharedDict[self.groupId][self.sensorId] = temp
return

class PublishOneWireTemperatures(Publishing.PublishDataHandler):
def __init__(self, endPointURL, userName, password):
self.endPoint = endPointURL
self.userName = userName
self.password = password

def prepare(self):
# Nothing to prepare so far.
pass

def publishData(self):
data = json.dumps(self.sharedDict)
try:
response = requests.post(self.endPoint, data, auth=(self.userName, self.password))
response.raise_for_status()
except HTTPError as http_err:
print("HTTP error occurred: {}".format(http_err))
except Exception as err:
print("Unexpected error occurred: {}".format(err))
else:
print(response.text)

def main():

cmdLineParser = argparse.ArgumentParser(prog="example4", usage="%(prog)s [options]")
cmdLineParser.add_argument("--port", help="The port the server should listen on", type=int, required=True)

try:
args = cmdLineParser.parse_args()
except SystemExit:
return

username = input("Enter username: ")
password = getpass.getpass("Enter password: ")

myApp = RaspendApplication(args.port)

theDoorBell = DoorBell()

myApp.addCommand(theDoorBell.switchDoorBell)
myApp.addCommand(theDoorBell.getCurrentState)

myApp.updateSharedDict({"Starting Time" : time.asctime()})

myApp.createDataAcquisitionThread(ReadOneWireTemperature("basement", "party_room", "/sys/bus/w1/devices/23-000000000001/w1_slave"), 60)
myApp.createDataAcquisitionThread(ReadOneWireTemperature("basement", "heating_room", "/sys/bus/w1/devices/23-000000000002/w1_slave"), 60)
myApp.createDataAcquisitionThread(ReadOneWireTemperature("basement", "fitness_room", "/sys/bus/w1/devices/23-000000000003/w1_slave"), 60)
myApp.createDataAcquisitionThread(ReadOneWireTemperature("ground_floor", "kitchen", "/sys/bus/w1/devices/23-000000000004/w1_slave"), 60)
myApp.createDataAcquisitionThread(ReadOneWireTemperature("ground_floor", "living_room", "/sys/bus/w1/devices/23-000000000005/w1_slave"), 60)

myApp.createPublishDataThread(PublishOneWireTemperatures("http://localhost/raspend_demo/api/post_data.php", username, password), 60)

myApp.run()

print ("Exit")

if __name__ == "__main__":
main()
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,39 @@ myApp.run()

```

Though your acquired data is available via raspend's HTTP interface, you probably want to push this data somewhere, a database for instance. Therefore version 1.3.0 introduces the **Publishing** module. You just need to create a handler derived from the **Publishing.PublishDataHandler** class, similar to the data acquisition part, and override it's **publishData** method. Here you can publish all or parts of the data contained in the shared dictionary to wherever you want. You then pass this handler to the **createPublishDataThread** method of **RaspendApplication**. The example below posts the whole shared dictionary as a JSON string to a PHP backend, which in turn writes the data into a MySQL database (see *raspend_demo* for details).

``` python
from raspend.application import RaspendApplication
from raspend.utils import publishing as Publishing

class PublishOneWireTemperatures(Publishing.PublishDataHandler):
def __init__(self, endPointURL, userName, password):
self.endPoint = endPointURL
self.userName = userName
self.password = password

def prepare(self):
# Nothing to prepare so far.
pass

def publishData(self):
data = json.dumps(self.sharedDict)
try:
response = requests.post(self.endPoint, data, auth=(self.userName, self.password))
response.raise_for_status()
except HTTPError as http_err:
print("HTTP error occurred: {}".format(http_err))
except Exception as err:
print("Unexpected error occurred: {}".format(err))
else:
print(response.text)


myApp.createPublishDataThread(PublishOneWireTemperatures("http://localhost/raspend_demo/api/post_data.php", username, password), 60)

```

The other idea is to expose different functionalities, such as switching on/off your door bell via GPIO, as a command you can send to your RPi via HTTP POST request. All you have to do is to encapsulate the functionality you want to make available to the outside world into a method of a Python class. Then instantiate your class and call the **addCommand** method of **RaspendApplication** providing the method you want to expose. Now you can execute your method using a simple HTTP POST request.

``` python
Expand Down Expand Up @@ -87,9 +120,9 @@ myApp.run()

```

When all initialization stuff is done (adding commands, creating data acquisition threads), then you start your application by calling the **run** method of **RaspendApplication**. The **RaspendApplication** class installs signal handlers for SIGTERM and SIGINT, so you can quit your application by pressing CTRL+C or sending one of the signals via the **kill** command of your shell.
When all initialization stuff is done (adding commands, creating threads), then you start your application by calling the **run** method of **RaspendApplication**. The **RaspendApplication** class installs signal handlers for SIGTERM and SIGINT, so you can quit your application by pressing CTRL+C or sending one of the signals via the **kill** command of your shell.

Please have a look at the examples included in this project to get a better understanding. *example1.py* and *example2.py* show how to do most of the work yourself, while *example3.py* shows you the most convenient way of using this framework.
Please have a look at the examples included in this project to get a better understanding. *example1.py* and *example2.py* show how to do most of the work yourself, while *example3.py* shows you the most convenient way of using this framework. *example4.py* is identical to *example3.py* but extended by a **PublishDataHandler**.

## How to use the HTTP interface?

Expand Down
4 changes: 3 additions & 1 deletion raspend.pyproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{d0fd4413-717c-4da2-b189-4cc8bdecbc68}</ProjectGuid>
<ProjectHome />
<StartupFile>Examples\example3.py</StartupFile>
<StartupFile>Examples\example4.py</StartupFile>
<SearchPath />
<WorkingDirectory>.</WorkingDirectory>
<OutputPath>.</OutputPath>
Expand All @@ -23,13 +23,15 @@
<ItemGroup>
<Compile Include="Examples\example1.py" />
<Compile Include="Examples\example2.py" />
<Compile Include="Examples\example4.py" />
<Compile Include="Examples\example3.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="raspend\application.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="raspend\http.py" />
<Compile Include="raspend\utils\publishing.py" />
<Compile Include="setup.py">
<SubType>Code</SubType>
</Compile>
Expand Down
33 changes: 31 additions & 2 deletions raspend/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .utils import serviceshutdownhandling as ServiceShutdownHandling
from .utils import dataacquisition as DataAcquisition
from .utils import commandmapping as CommandMapping
from .utils import publishing as Publishing

class RaspendApplication():
""" This class handles the main loop for a raspend based application.
Expand All @@ -26,6 +27,9 @@ def __init__(self, port, sharedDict=None):
# A list holding instances of DataAcquisitionThread
self.__daqThreads = list()

# A list holding instances of PublishDataThread
self.__pubThreads = list()

# The dictionary holding user's commands he wants to expose.
self.__cmdMap = CommandMapping.CommandMap()

Expand Down Expand Up @@ -58,6 +62,23 @@ def createDataAcquisitionThread(self, dataAcquisitionHandler, threadSleep=1):

return len(self.__daqThreads)

def createPublishDataThread(self, publishDataHandler, threadSleep=1):
""" This method creates a new instance of 'Publishing.PublishDataThread'.
Make sure that the handler you provide is derived from 'Publishing.PublishDataHandler'!
"""
if not isinstance(publishDataHandler, Publishing.PublishDataHandler):
raise TypeError("Your 'PublishDataHandler' must be derived from 'Publishing.PublishDataHandler'!")

publishDataHandler.setSharedDict(self.__sharedDict)

publishDataThread = Publishing.PublishDataThread(threadSleep,
self.__shutdownFlag,
self.__dataLock,
publishDataHandler)
self.__pubThreads.append(publishDataThread)

return len(self.__pubThreads)

def addCommand(self, callbackMethod):
""" Adds a new command to the command map of your application.
"""
Expand All @@ -67,7 +88,7 @@ def addCommand(self, callbackMethod):

def updateSharedDict(self, other):
""" Updates the shared dictionary with 'other'.
Note: existing keys will be overwritten.
Note: existing keys will be overwritten!
"""
self.__sharedDict.update(other)
return len(self.__sharedDict)
Expand All @@ -88,16 +109,24 @@ def run(self):
for daqThread in self.__daqThreads:
daqThread.start()

for pubThread in self.__pubThreads:
pubThread.start()

# Keep primary thread or main loop alive.
while True:
time.sleep(0.5)

except ServiceShutdownHandling.ServiceShutdownException:
# Signal the shutdown flag, so the threads can quit their work.
self.__shutdownFlag.set()
# Wait for all thread to end.

# Wait for all threads to end.
for pubThread in self.__pubThreads:
pubThread.join()

for daqThread in self.__daqThreads:
daqThread.join()

httpd.join()

except Exception as e:
Expand Down
77 changes: 77 additions & 0 deletions raspend/utils/publishing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Simple classes that handle threaded data publishing.
#
# License: MIT
#
# Copyright (c) 2019 Joerg Beckers

import threading
import time

class PublishDataHandler():
""" Base class for a handler which published data or parts of data store in the shared dictionary.
Derive this class and override the 'publishData' - methods to publish the 'sharedDict'.
"""

def __init__(self, sharedDict=None):
""" The contructor gets a dictionary containing any acquired data of your application.
"""
self.setSharedDict(sharedDict)

def setSharedDict(self, sharedDict):
""" Set the shared dictionary
"""
self.sharedDict = sharedDict

def prepare(self):
""" This method is called before the publish thread is started with this handler.
So if you need to initialize parts of the shared dictionary you should override this method.
"""
pass

def publishData(self):
""" This method is called by a 'PublishDataThread'. Override this method publish your data in 'sharedDict'.
"""
pass

class PublishDataThread(threading.Thread):
""" A thread class which handles cyclic data publishing.
An instance of this class needs a lock - object for controlling access to its 'publishDataHandler', an event - object for
notifying the thread to exit and an object of a class derived from 'PublishDataHandler'.
"""
def __init__(self, threadSleep=0, shutdownFlag=None, dataLock=None, publishDataHandler=None):
""" Contructs a new instance of 'PublishDataThread'.
Parameters:
threadSleep - milliseconds sleep time for the thread loop.
shutdownFlag - a threading.event() object for notifying the thread to exit.
dataLock - a threading.Lock() object for controlling access to the 'dataAcquisitionHandler'.
publishDataHandler - an object of a class derived from 'PublishDataHandler'.
"""
threading.Thread.__init__(self)

self.threadSleep = threadSleep
self.shutdownFlag = shutdownFlag
self.dataLock = dataLock
self.publishDataHandler = publishDataHandler

def run(self):
""" The thread loop runs until 'shutdownFlag' has been signaled. Sleep for 'threadSleep' milliseconds.
"""
# Let the handler prepare itself if necessary.
self.dataLock.acquire()
self.publishDataHandler.prepare()
self.dataLock.release()

while not self.shutdownFlag.is_set():
# acquire lock
self.dataLock.acquire()
# call publish data handler
self.publishDataHandler.publishData()
# release lock
self.dataLock.release()
self.shutdownFlag.wait(self.threadSleep)
8 changes: 8 additions & 0 deletions raspend_demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Purpose

**raspend_demo** is used for testing **raspend**'s publishing module.

# Usage

The easiest way to use **raspend_demo** is using Visual Studio Code with the PHP Debug extension installed (that's what I did). Then you need a web server with PHP enabled (I use [XAMPP](https://www.apachefriends.org/index.html)). To use the PHP Debug extension you further need to enable XDebug. See [PHP Debug extension](https://marketplace.visualstudio.com/items?itemName=felixfbecker.php-debug) for more information and follow the installations steps.

19 changes: 19 additions & 0 deletions raspend_demo/api/.vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Listen for XDebug",
"type": "php",
"request": "launch",
"port": 9000
},
{
"name": "Launch currently open script",
"type": "php",
"request": "launch",
"program": "${file}",
"cwd": "${fileDirname}",
"port": 9000
}
]
}

0 comments on commit c78af28

Please sign in to comment.