Skip to content

ble support (fixes #25) #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions example_advertisement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
#!/usr/bin/python

from __future__ import print_function

import argparse
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import time
import threading

try:
from gi.repository import GObject # python3
except ImportError:
import gobject as GObject # python2

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'

LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'


class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'


class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'


class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'


class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'


class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'


class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'

def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = False
self.data = None
dbus.service.Object.__init__(self, bus, self.path)

def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.local_name is not None:
properties['LocalName'] = dbus.String(self.local_name)
if self.include_tx_power:
properties['Includes'] = dbus.Array(["tx-power"], signature='s')

if self.data is not None:
properties['Data'] = dbus.Dictionary(
self.data, signature='yv')
return {LE_ADVERTISEMENT_IFACE: properties}

def get_path(self):
return dbus.ObjectPath(self.path)

def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)

def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)

def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')

def add_local_name(self, name):
if not self.local_name:
self.local_name = ""
self.local_name = dbus.String(name)

def add_data(self, ad_type, data):
if not self.data:
self.data = dbus.Dictionary({}, signature='yv')
self.data[ad_type] = dbus.Array(data, signature='y')

@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
print('GetAll')
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
print('returning props')
return self.get_properties()[LE_ADVERTISEMENT_IFACE]

@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
print('%s: Released!' % self.path)


class TestAdvertisement(Advertisement):

def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_service_uuid('180D')
self.add_service_uuid('180F')
self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03])
self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04])
self.add_local_name('TestAdvertisement')
self.include_tx_power = True
self.add_data(0x26, [0x01, 0x01, 0x00])


def register_ad_cb():
print('Advertisement registered')


def register_ad_error_cb(error):
print('Failed to register advertisement: ' + str(error))
mainloop.quit()


def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()

for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props:
return o

return None


def shutdown(timeout):
print('Advertising for {} seconds...'.format(timeout))
time.sleep(timeout)
mainloop.quit()


def main(timeout=0):
global mainloop

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

bus = dbus.SystemBus()

adapter = find_adapter(bus)
if not adapter:
print('LEAdvertisingManager1 interface not found')
return

adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")

adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)

test_advertisement = TestAdvertisement(bus, 0)

mainloop = GObject.MainLoop()

ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)

if timeout > 0:
threading.Thread(target=shutdown, args=(timeout,)).start()
else:
print('Advertising forever...')

mainloop.run() # blocks until mainloop.quit() is called

ad_manager.UnregisterAdvertisement(test_advertisement)
print('Advertisement unregistered')
dbus.service.Object.remove_from_connection(test_advertisement)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--timeout', default=0, type=int, help="advertise " +
"for this many seconds then stop, 0=run forever " +
"(default: 0)")
args = parser.parse_args()

main(args.timeout)

Loading