diff --git a/.gitignore b/.gitignore index 488838e..04712b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ *.pyc .*.swp build/ +.project +.pydevproject +.settings diff --git a/contract.txt b/contract.txt index d1a133a..9b885cf 100644 --- a/contract.txt +++ b/contract.txt @@ -12,15 +12,19 @@ Delete Feed: PUBLISH delfeed [feed]\x00[instance uuid] EXEC // if nil: go back to WATCH -Set Config: - SET feed.config:[feed] //json(config) +Set Config Value: + HSET feed.config:[feed] name value PUBLISH conffeed [feed]\x00[instance uuid] +Get Config Value: + HGET feed.config:[feed] name + Feed: Publish: //id may be provided or generated. An id that has already been published will update that id + max = feed.config:[feed] max_length WATCH feed.ids:[feed] delete_ids = ZRANGE feed.ids:[feed] 0 [-max] // eg ZRANGE feed.ids:test 0 -5 if the max is 4 MULTI @@ -188,7 +192,7 @@ Job: LPUSH feed.ids:[feed] [id] INCR feed.publishes:[feed] HSET feed.items:[feed] [id] [item] - ZADD feed.published:[feed] [utc epoch] [id] + ZADD feed.published:[feed] [utc epoch milliseconds] [id] EXEC High Priority Put: @@ -196,14 +200,14 @@ Job: MULTI RPUSH feed.ids:[feed] [id] HSET feed.items:[feed] [id] [item] - ZADD feed.published:[feed] [utc epoch] [id] + ZADD feed.published:[feed] [utc epoch milliseconds] [id] EXEC Get: id = BRPOP feed.ids:[feed] [timeout] //if error/timeout, abort MULTI - ZADD feed.claimed:[feed] [utc epoch] [id] + ZADD feed.claimed:[feed] [utc epoch milliseconds] [id] item = HGET feed:items[feed] [id] EXEC //if the id fails to get from feed.ids to feed.claimed, the maintenance will notice eventually @@ -214,15 +218,12 @@ Job: MULTI ZREM feed.claimed:[feed] [id] HDEL feed.cancelled:[feed] [id] //just to make sure + INCR feed.finishes:[feed] //optionally if publishing a result: - LPUSH feed.jobfinished:[feed]\x00[id] [result] - EXPIRE feed.jobfinished:[feed]\x00[id] [timeout] + PUBLISH job.finish:[feed] [id]\x00[result] HDEL feed.items:[feed] [id] EXEC // if nil: go back to WATCH and try again - Get Result: - BRPOP feed:jobfinished:[feed]\x00[id] [timeout] - Get Ids: HKEYS feed.items:[feed] @@ -252,7 +253,7 @@ Job: SREM feed.stalled:[feed] [id] //if error, abort LPUSH feed.ids:[feed] [id] - ZADD feed.published:[feed] [utc epoch] [id] + ZADD feed.published:[feed] [utc epoch milliseconds] [id] EXEC // if nil retry Retract: @@ -267,6 +268,9 @@ Job: LREM feed.ids:[feed] 1 [id] EXEC // if fail, retry + getNumOfFailures: + HGET feed.cancelled:[feed] [id] + Maintenance: //maintain job queue -- only ran by one process per jobqueue on occassion -- still a bit hand-wavey MULTI keys = HKEYS feed.items:[feed] @@ -280,4 +284,4 @@ Job: LPUSH feed.ids:[feed] [key] check claimed jobs to see if any have been claimed too long and "Cancel" or "Stall" them - publish stats to a feed + publish stats to a feed \ No newline at end of file diff --git a/scripts/config.lua b/scripts/config.lua new file mode 100644 index 0000000..cd664c9 --- /dev/null +++ b/scripts/config.lua @@ -0,0 +1,11 @@ +-- ARGV: name, config(json), instance +if redis.call('sismember', 'feeds', name) then + return false +end +config = cjson.decode(ARGV[2]) +feed = 'feed.config:'..ARGV[1] +table.foreach(config, function(k, v) + redis.call('hset', feed, k, v) +end) +redis.call('publish', 'conffeed', ARGV[1]..'\0'..ARGV[3]) +return true diff --git a/scripts/create.lua b/scripts/create.lua new file mode 100644 index 0000000..038f835 --- /dev/null +++ b/scripts/create.lua @@ -0,0 +1,13 @@ +-- ARGV: name, config, instance +if redis.call('sadd', 'feeds', ARGV[1]) == 0 then + -- feed already exists + return false +end +feed = 'feed.config:'..ARGV[1] +config = cjson.decode(ARGV[2]) +-- TODO: check if config has a type key +for k, v in pairs(config) do + redis.call('hset', feed, k, v) +end +redis.call('publish', 'newfeed', ARGV[1]..'\0'..ARGV[3]) +return true diff --git a/scripts/delete.lua b/scripts/delete.lua new file mode 100644 index 0000000..c282fce --- /dev/null +++ b/scripts/delete.lua @@ -0,0 +1,40 @@ +-- ARGS: feed, instance +if redis.call('srem', 'feeds', ARGV[1]) == 0 then + return false +end +schema = { + feed = function(name) return { + 'feed.config:'..name, + 'feed.ids:'..name, + 'feed.items:'..name, + 'feed.publishes:'..name + } end, + sortedfeed = function(name) return { + 'feed.config:'..name, + 'feed.ids:'..name, + 'feed.items:'..name, + 'feed.publishes:'..name, + 'feed.idincr:'..name + } end, + queue = function(name) return { + 'feed.config:'..name, + 'feed.ids:'..name, + 'feed.items:'..name, + 'feed.publishes:'..name + } end, + job = function(name) return { + 'feed.config:'..name, + 'feed.ids:'..name, + 'feed.items:'..name, + 'feed.publishes:'..name, + 'feed.published:'..name, + 'feed.claimed:'..name, + 'feed.cancelled:'..name, + 'feed.finishes:'..name, + 'feed.stalled:'..name + } end +} +feedtype = redis.call('hget', 'feed.config:'..ARGV[1], 'type') +redis.call('del', unpack(schema[feedtype](ARGV[1]))) +redis.call('publish', 'delfeed', ARGV[1]..'\0'..ARGV[2]) +return true diff --git a/scripts/feed/publish.lua b/scripts/feed/publish.lua new file mode 100644 index 0000000..a9c18a1 --- /dev/null +++ b/scripts/feed/publish.lua @@ -0,0 +1,19 @@ +-- ARGV: feed, id, item, time +max = redis.call("hget", "feed.config:"..ARGV[1], "max_length") +if max and tonumber(max) > 0 then + ids = redis.call('zrange', 'feed.ids:'..ARGV[1], 0, -tonumber(max)) + table.foreach(ids, function(i, id) + redis.call('zrem', 'feed.ids:'..ARGV[1], id) + redis.call('hdel', 'feed.items:'..ARGV[1], id) + redis.call('publish', 'feed.retract:'..ARGV[1], id) + end) +end + +redis.call('incr', 'feed.publishes:'..ARGV[1]) +redis.call('hset', 'feed.items:'..ARGV[1], ARGV[2], ARGV[3]) +if redis.call('zadd', 'feed.ids:'..ARGV[1], ARGV[4], ARGV[2]) == 1 then + redis.call('publish', 'feed.edit:'..ARGV[1], ARGV[2]..'\0'..ARGV[3]) +else + redis.call('publish', 'feed.publish:'..ARGV[1], ARGV[2]..'\0'..ARGV[3]) +end +return zadd diff --git a/scripts/feed/retract.lua b/scripts/feed/retract.lua new file mode 100644 index 0000000..910cf60 --- /dev/null +++ b/scripts/feed/retract.lua @@ -0,0 +1,8 @@ +-- ARGV: feed, id +if redis.call('zrem', 'feed.ids:'..ARGV[1], ARGV[2]) == 0 then + return false +end +redis.call('hdel', 'feed.items:'..ARGV[1], ARGV[2]) +redis.call('publish', 'feed.retract:'..ARGV[1], ARGV[2]) +return true + diff --git a/scripts/jobs/cancel.lua b/scripts/jobs/cancel.lua new file mode 100644 index 0000000..6424ce1 --- /dev/null +++ b/scripts/jobs/cancel.lua @@ -0,0 +1,7 @@ +-- ARGV: name, id +if redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) == 0 then + return false +end +redis.call('hincrby', 'feed.cancelled:'..ARGV[1], ARGV[2], 1) +redis.call('lpush', 'feed.ids:'..ARGV[1], ARGV[2]) +return true diff --git a/scripts/jobs/finish.lua b/scripts/jobs/finish.lua new file mode 100644 index 0000000..2a6c980 --- /dev/null +++ b/scripts/jobs/finish.lua @@ -0,0 +1,12 @@ +-- ARGV: name, id, result(optional) +if redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) == 0 then + return false; +end +redis.call('hdel', 'feed.cancelled:'..ARGV[1], ARGV[2]) +redis.call('zrem', 'feed.published:'..ARGV[1], ARGV[2]) +redis.call('incr', 'feed.finishes:'..ARGV[1]) +if table.getn(ARGV) == 3 then + redis.call('publish', 'job.finish:'..ARGV[1], ARGV[2].."\0"..ARGV[3]) +end +redis.call('hdel', 'feed.items:'..ARGV[1], ARGV[2]) +return true diff --git a/scripts/jobs/get.lua b/scripts/jobs/get.lua new file mode 100644 index 0000000..ebc99de --- /dev/null +++ b/scripts/jobs/get.lua @@ -0,0 +1,4 @@ +-- ARGV: name, id, time +r1 = redis.call('zadd', 'feed.claimed:'..ARGV[1], ARGV[3], ARGV[2]); +r2 = redis.call('hget', 'feed.items:'..ARGV[1], ARGV[2]); +return {r1, r2} diff --git a/scripts/jobs/publish.lua b/scripts/jobs/publish.lua new file mode 100644 index 0000000..ce76723 --- /dev/null +++ b/scripts/jobs/publish.lua @@ -0,0 +1,9 @@ +-- ARGV: name, id, item, time, priority +if ARGV[5] == nil then + redis.call('lpush', 'feed.ids:'..ARGV[1], ARGV[2]); +else + redis.call('rpush', 'feed.ids:'..ARGV[1], ARGV[2]); +end +redis.call('incr', 'feed.publishes:'..ARGV[1]); +redis.call('hset', 'feed.items:'..ARGV[1], ARGV[2], ARGV[3]); +return redis.call('zadd', 'feed.published:'..ARGV[1], ARGV[4], ARGV[2]); diff --git a/scripts/jobs/retract.lua b/scripts/jobs/retract.lua new file mode 100644 index 0000000..14f31c6 --- /dev/null +++ b/scripts/jobs/retract.lua @@ -0,0 +1,11 @@ +-- ARGV: name, id +if redis.call('hdel', 'feed.items:'..ARGV[1], ARGV[2]) == 0 then + return false +end +redis.call('hdel', 'feed.cancelled:'..ARGV[1], ARGV[2]) +redis.call('zrem', 'feed.published:'..ARGV[1], ARGV[2]) +redis.call('srem', 'feed.stalled:'..ARGV[1], ARGV[2]) +redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) +redis.call('lrem', 'feed.ids:'..ARGV[1], 1, ARGV[2]) +return true + diff --git a/scripts/jobs/retry.lua b/scripts/jobs/retry.lua new file mode 100644 index 0000000..10bbd53 --- /dev/null +++ b/scripts/jobs/retry.lua @@ -0,0 +1,7 @@ +-- ARGV: name, id, time +if redis.call('srem', 'feed.stalled:'..ARGV[1], ARGV[2]) == 0 then + return false; +end +redis.call('lpush', 'feed.ids:'..ARGV[1], ARGV[2]); +redis.call('zadd', 'feed.published:'..ARGV[1], ARGV[3], ARGV[2]); +return true diff --git a/scripts/jobs/stall.lua b/scripts/jobs/stall.lua new file mode 100644 index 0000000..679f277 --- /dev/null +++ b/scripts/jobs/stall.lua @@ -0,0 +1,8 @@ +-- ARGV: name, id +if redis.call('zrem', 'feed.claimed:'..ARGV[1], ARGV[2]) == 0 then + return false; +end +redis.call('hdel', 'feed.cancelled:'..ARGV[1], ARGV[2]); +redis.call('sadd', 'feed.stalled:'..ARGV[1], ARGV[2]); +redis.call('zrem', 'feed.published'..ARGV[1], ARGV[2]); +return true diff --git a/test.cfg b/test.cfg index ab55402..82c2ef0 100644 --- a/test.cfg +++ b/test.cfg @@ -4,7 +4,7 @@ # You were warned. # ===================================================================== -#[Test] -#host=localhost -#port=6379 -#db=10 +[Test] +host=localhost +port=6379 +db=10 diff --git a/tests/test_feed.py b/tests/test_feed.py index e83bcbc..cf13856 100644 --- a/tests/test_feed.py +++ b/tests/test_feed.py @@ -1,27 +1,30 @@ import thoonk +from thoonk.feeds import Feed import unittest from ConfigParser import ConfigParser - class TestLeaf(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - + def setUp(self, *args, **kwargs): conf = ConfigParser() conf.read('test.cfg') if conf.sections() == ['Test']: self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'), port=conf.getint('Test', 'port'), - db=conf.getint('Test', 'db')) + db=conf.getint('Test', 'db'), + listen=False) self.ps.redis.flushdb() else: print 'No test configuration found in test.cfg' exit() - + + def tearDown(self): + self.ps.close() + def test_05_basic_retract(self): """Test adding and retracting an item.""" l = self.ps.feed("testfeed") + self.assertEqual(type(l), Feed) l.publish('foo', id='1') r = l.get_ids() v = l.get_all() @@ -46,6 +49,10 @@ def test_10_basic_feed(self): def test_20_basic_feed_items(self): """Test items match completely.""" l = self.ps.feed("testfeed") + l.publish("hi", id='1') + l.publish("bye", id='2') + l.publish("thanks", id='3') + l.publish("you're welcome", id='4') r = l.get_ids() self.assertEqual(r, ['1', '2', '3', '4'], "Queue results did not match publish: %s" % r) c = {} @@ -56,6 +63,10 @@ def test_20_basic_feed_items(self): def test_30_basic_feed_retract(self): """Testing item retract items match.""" l = self.ps.feed("testfeed") + l.publish("hi", id='1') + l.publish("bye", id='2') + l.publish("thanks", id='3') + l.publish("you're welcome", id='4') l.retract('3') r = l.get_ids() self.assertEqual(r, ['1', '2','4'], "Queue results did not match publish: %s" % r) @@ -68,6 +79,7 @@ def test_40_create_delete(self): """Testing feed delete""" l = self.ps.feed("test2") l.delete_feed() + def test_50_max_length(self): """Test feeds with a max length""" diff --git a/tests/test_job.py b/tests/test_job.py index fa15a8f..b298b7c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,14 +1,12 @@ import thoonk import unittest -import math from ConfigParser import ConfigParser +import threading class TestJob(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - + def setUp(self, *args, **kwargs): conf = ConfigParser() conf.read('test.cfg') if conf.sections() == ['Test']: @@ -20,48 +18,169 @@ def __init__(self, *args, **kwargs): print 'No test configuration found in test.cfg' exit() - - def setUp(self): - self.ps = thoonk.Pubsub(db=10, listen=True) - self.ps.redis.flushdb() - def tearDown(self): self.ps.close() def test_10_basic_job(self): - """JOB publish, retrieve, finish, get result""" + """Test job publish, retrieve, finish flow""" #publisher testjob = self.ps.job("testjob") - id = testjob.put(9.0) - + self.assertEqual(testjob.get_ids(), []) + + id = testjob.put('9.0') + #worker - testjobworker = self.ps.job("testjob") - id_worker, query_worker = testjobworker.get(timeout=3) - result_worker = math.sqrt(float(query_worker)) - testjobworker.finish(id_worker, result_worker, True) - - #publisher gets result - query_publisher, result_publisher = testjob.get_result(id, 1) - self.assertEqual(float(result_worker), float(result_publisher), "Job results did not match publish.") + id_worker, job_content = testjob.get(timeout=3) + self.assertEqual(job_content, '9.0') + self.assertEqual(testjob.get_failure_count(id), 0) + self.assertEqual(id_worker, id) + testjob.finish(id_worker) + self.assertEqual(testjob.get_ids(), []) - + def test_20_cancel_job(self): """Test cancelling a job""" j = self.ps.job("testjob") #publisher - id = j.put(9.0) + id = j.put('9.0') #worker claims - id, query = j.get() + id, job_content = j.get() + self.assertEqual(job_content, '9.0') + self.assertEqual(j.get_failure_count(id), 0) #publisher or worker cancels j.cancel(id) - id2, query2 = j.get() + id2, job_content2 = j.get() + self.assertEqual(j.get_failure_count(id), 1) + self.assertEqual(job_content2, '9.0') self.assertEqual(id, id2) #cancel the work again j.cancel(id) + # check the cancelled increment again + id3, job_content3 = j.get() + self.assertEqual(j.get_failure_count(id), 2) + self.assertEqual(job_content3, '9.0') + self.assertEqual(id, id3) #cleanup -- remove the job from the queue j.retract(id) self.assertEqual(j.get_ids(), []) + def test_25_stall_job(self): + """Test stalling a job""" + testjob = self.ps.job("testjob") + self.assertEqual(testjob.get_ids(), []) + + # put + id = testjob.put('9.0') + self.assertEqual(testjob.get_ids(), [id]) + + # invalid stall + self.assertRaises(thoonk.exceptions.JobNotClaimed, testjob.stall, id) + + # get + id_worker, job_content = testjob.get(timeout=3) + self.assertEqual(id_worker, id) + self.assertEqual(job_content, '9.0') + self.assertEqual(testjob.get_failure_count(id), 0) + + # invalid retry + self.assertRaises(thoonk.exceptions.JobNotStalled, testjob.retry, id) + + # stall + testjob.stall(id) + self.assertEqual(testjob.get_ids(), [id]) + self.assertRaises(thoonk.exceptions.Empty, testjob.get, timeout=1) + + # retry + testjob.retry(id) + self.assertEqual(testjob.get_ids(), [id]) + + # get + id_worker, job_content = testjob.get(timeout=3) + self.assertEqual(id_worker, id) + self.assertEqual(job_content, '9.0') + self.assertEqual(testjob.get_failure_count(id), 0) + + # finish + testjob.finish(id_worker) + self.assertEqual(testjob.get_ids(), []) + + def test_27_retract_job(self): + """Test retracting a job""" + testjob = self.ps.job("testjob") + self.assertEqual(testjob.get_ids(), []) + + # put + id = testjob.put('9.0') + self.assertEqual(testjob.get_ids(), [id]) + + # retract + testjob.retract(id) + self.assertEqual(testjob.get_ids(), []) + + # invalid retract + self.assertRaises(thoonk.exceptions.ItemDoesNotExist, testjob.retract, id) + + def test_30_no_job(self): + """Test exception raise when job.get times out""" + j = self.ps.job("testjob") + self.assertEqual(j.get_ids(), []) + self.assertRaises(thoonk.exceptions.Empty, j.get, timeout=1) + +class TestJobResult(unittest.TestCase): -suite = unittest.TestLoader().loadTestsFromTestCase(TestJob) + def setUp(self, *args, **kwargs): + conf = ConfigParser() + conf.read('test.cfg') + if conf.sections() == ['Test']: + self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'), + port=conf.getint('Test', 'port'), + db=conf.getint('Test', 'db'), + listen=True) + self.ps.redis.flushdb() + else: + print 'No test configuration found in test.cfg' + exit() + + def tearDown(self): + self.ps.close() + + def test_10_job_result(self): + """Test job result published""" + + create_event = threading.Event() + def create_handler(name): + self.assertEqual(name, "testjobresult") + create_event.set() + self.ps.register_handler("create", create_handler) + + #publisher + testjob = self.ps.job("testjobresult") + self.assertEqual(testjob.get_ids(), []) + + # Wait until the create event has been received by the ThoonkListener + create_event.wait() + + id = testjob.put('9.0') + + #worker + id_worker, job_content = testjob.get(timeout=3) + self.assertEqual(job_content, '9.0') + self.assertEqual(testjob.get_failure_count(id), 0) + self.assertEqual(id_worker, id) + + result_event = threading.Event() + def result_handler(name, id, result): + self.assertEqual(name, "testjobresult") + self.assertEqual(id, id_worker) + self.assertEqual(result, "myresult") + result_event.set() + + self.ps.register_handler("finish", result_handler) + testjob.finish(id_worker, "myresult") + result_event.wait(1) + self.assertTrue(result_event.isSet(), "No result received!") + self.assertEqual(testjob.get_ids(), []) + self.ps.remove_handler("result", result_handler) + +#suite = unittest.TestLoader().loadTestsFromTestCase(TestJob) diff --git a/tests/test_notice.py b/tests/test_notice.py new file mode 100644 index 0000000..d7c715f --- /dev/null +++ b/tests/test_notice.py @@ -0,0 +1,227 @@ +import thoonk +from thoonk.feeds import Feed, Job +import unittest +import time +import redis +from ConfigParser import ConfigParser +import threading + +class TestNotice(unittest.TestCase): + + def setUp(self): + conf = ConfigParser() + conf.read('test.cfg') + if conf.sections() == ['Test']: + redis.Redis(host=conf.get('Test', 'host'), + port=conf.getint('Test', 'port'), + db=conf.getint('Test', 'db')).flushdb() + self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'), + port=conf.getint('Test', 'port'), + db=conf.getint('Test', 'db'), + listen=True) + else: + print 'No test configuration found in test.cfg' + exit() + + def tearDown(self): + self.ps.close() + + "claimed, cancelled, stalled, finished" + def test_01_feed_notices(self): + """Test for create, publish, edit, retract and delete notices from feeds""" + + """Feed Create Event""" + create_event = threading.Event() + def create_handler(feed): + self.assertEqual(feed, "test_notices") + create_event.set() + + self.ps.register_handler("create", create_handler) + l = self.ps.feed("test_notices") + create_event.wait(1) + self.assertTrue(create_event.isSet(), "Create notice not received") + self.ps.remove_handler("create", create_handler) + + """Feed Publish Event""" + publish_event = threading.Event() + ids = [None, None] + + def received_handler(feed, item, id): + self.assertEqual(feed, "test_notices") + ids[1] = id + publish_event.set() + + self.ps.register_handler('publish', received_handler) + ids[0] = l.publish('a') + publish_event.wait(1) + + self.assertTrue(publish_event.isSet(), "Publish notice not received") + self.assertEqual(ids[1], ids[0]) + self.ps.remove_handler('publish', received_handler) + + """Feed Edit Event """ + edit_event = threading.Event() + def edit_handler(feed, item, id): + self.assertEqual(feed, "test_notices") + ids[1] = id + edit_event.set() + + self.ps.register_handler('edit', edit_handler) + l.publish('b', id=ids[0]) + edit_event.wait(1) + + self.assertTrue(edit_event.isSet(), "Edit notice not received") + self.assertEqual(ids[1], ids[0]) + self.ps.remove_handler('edit', edit_handler) + + """Feed Retract Event""" + retract_event = threading.Event() + def retract_handler(feed, id): + self.assertEqual(feed, "test_notices") + ids[1] = id + retract_event.set() + + self.ps.register_handler('retract', retract_handler) + l.retract(ids[0]) + retract_event.wait(1) + + self.assertTrue(retract_event.isSet(), "Retract notice not received") + self.assertEqual(ids[1], ids[0]) + self.ps.remove_handler('retract', retract_handler) + + """Feed Delete Event""" + delete_event = threading.Event() + def delete_handler(feed): + self.assertEqual(feed, "test_notices") + delete_event.set() + + self.ps.register_handler("delete", delete_handler) + l.delete_feed() + delete_event.wait(1) + self.assertTrue(delete_event.isSet(), "Delete notice not received") + self.ps.remove_handler("delete", delete_handler) + + + def skiptest_10_job_notices(self): + notices_received = [False] + ids = [None, None] + + def publish_handler(feed, item, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "publish" + + def claimed_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "claimed" + + def cancelled_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "cancelled" + + def stalled_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "stalled" + + def retried_handler(feed, id): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "retried" + + def finished_handler(feed, id, result): + self.assertEqual(feed, "testjob") + ids[-1] = id + notices_received[-1] = "finished" + + def do_wait(): + i = 0 + while not notices_received[-1] and i < 2: + i += 1 + time.sleep(0.2) + + self.ps.register_handler('publish_notice', publish_handler) + self.ps.register_handler('claimed_notice', claimed_handler) + self.ps.register_handler('cancelled_notice', cancelled_handler) + self.ps.register_handler('stalled_notice', stalled_handler) + self.ps.register_handler('retried_notice', retried_handler) + self.ps.register_handler('finished_notice', finished_handler) + + j = self.ps.job("testjob") + self.assertEqual(j.__class__, Job) + self.assertFalse(notices_received[0]) + + # create the job + ids[0] = j.put('b') + do_wait() + self.assertEqual(notices_received[0], "publish", "Notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # claim the job + id, job, cancelled = j.get() + self.assertEqual(job, 'b') + self.assertEqual(cancelled, 0) + self.assertEqual(ids[0], id) + do_wait() + self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # cancel the job + j.cancel(id) + do_wait() + self.assertEqual(notices_received[-1], "cancelled", "Cancelled notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # get the job again + id, job, cancelled = j.get() + self.assertEqual(job, 'b') + self.assertEqual(cancelled, 1) + self.assertEqual(ids[0], id) + do_wait() + self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # stall the job + j.stall(id) + do_wait() + self.assertEqual(notices_received[-1], "stalled", "Stalled notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # retry the job + j.retry(id) + do_wait() + self.assertEqual(notices_received[-1], "retried", "Retried notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # get the job again + id, job, cancelled = j.get() + self.assertEqual(job, 'b') + self.assertEqual(cancelled, 0) + self.assertEqual(ids[0], id) + do_wait() + self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received") + self.assertEqual(ids[0], ids[-1]) + + notices_received.append(False); ids.append(None); + # finish the job + j.finish(id) + do_wait() + self.assertEqual(notices_received[-1], "finished", "Finished notice not received") + self.assertEqual(ids[0], ids[-1]) + + self.ps.remove_handler('publish_notice', publish_handler) + self.ps.remove_handler('claimed_notice', claimed_handler) + self.ps.remove_handler('cancelled_notice', cancelled_handler) + self.ps.remove_handler('stalled_notice', stalled_handler) + self.ps.remove_handler('retried_notice', retried_handler) + self.ps.remove_handler('finished_notice', finished_handler) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestNotice) diff --git a/tests/test_queue.py b/tests/test_queue.py index 417a767..49ee583 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,13 +1,12 @@ import thoonk +from thoonk.feeds import Queue import unittest from ConfigParser import ConfigParser class TestQueue(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - + def setUp(self): conf = ConfigParser() conf.read('test.cfg') if conf.sections() == ['Test']: @@ -22,6 +21,7 @@ def __init__(self, *args, **kwargs): def test_basic_queue(self): """Test basic QUEUE publish and retrieve.""" q = self.ps.queue("testqueue") + self.assertEqual(q.__class__, Queue) q.put("10") q.put("20") q.put("30") diff --git a/tests/test_sorted_feed.py b/tests/test_sorted_feed.py index 40e3e1d..20b8115 100644 --- a/tests/test_sorted_feed.py +++ b/tests/test_sorted_feed.py @@ -1,13 +1,12 @@ import thoonk +from thoonk.feeds import SortedFeed import unittest from ConfigParser import ConfigParser class TestLeaf(unittest.TestCase): - - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - + + def setUp(self): conf = ConfigParser() conf.read('test.cfg') if conf.sections() == ['Test']: @@ -18,10 +17,11 @@ def __init__(self, *args, **kwargs): else: print 'No test configuration found in test.cfg' exit() - + def test_10_basic_sorted_feed(self): """Test basic sorted feed publish and retrieve.""" - l = self.ps.sorted_feed("testfeed") + l = self.ps.sorted_feed("sortedfeed") + self.assertEqual(l.__class__, SortedFeed) l.publish("hi") l.publish("bye") l.publish("thanks") @@ -37,75 +37,107 @@ def test_10_basic_sorted_feed(self): def test_20_sorted_feed_before(self): """Test addding an item before another item""" - l = self.ps.sorted_feed("testfeed") - l.publish_before('3', 'foo') + l = self.ps.sorted_feed("sortedfeed") + l.publish("hi") + l.publish("bye") + l.publish_before('2', 'foo') r = l.get_ids() - self.assertEqual(r, ['1', '2', '5', '3', '4'], "Sorted feed results did not match: %s." % r) + self.assertEqual(r, ['1', '3', '2'], "Sorted feed results did not match: %s." % r) def test_30_sorted_feed_after(self): """Test adding an item after another item""" - l = self.ps.sorted_feed("testfeed") - l.publish_after('3', 'foo') + l = self.ps.sorted_feed("sortedfeed") + l.publish("hi") + l.publish("bye") + l.publish_after('1', 'foo') r = l.get_ids() - self.assertEqual(r, ['1', '2', '5', '3', '6', '4'], "Sorted feed results did not match: %s." % r) + self.assertEqual(r, ['1', '3', '2'], "Sorted feed results did not match: %s." % r) def test_40_sorted_feed_prepend(self): """Test addding an item to the front of the sorted feed""" - l = self.ps.sorted_feed("testfeed") + l = self.ps.sorted_feed("sortedfeed") + l.publish("hi") + l.publish("bye") l.prepend('bar') r = l.get_ids() - self.assertEqual(r, ['7', '1', '2', '5', '3', '6', '4'], + self.assertEqual(r, ['3', '1', '2'], "Sorted feed results don't match: %s" % r) def test_50_sorted_feed_edit(self): """Test editing an item in a sorted feed""" - l = self.ps.sorted_feed("testfeed") - l.edit('6', 'bar') + l = self.ps.sorted_feed("sortedfeed") + l.publish("hi") + l.publish("bye") + l.edit('1', 'bar') r = l.get_ids() - v = l.get_item('6') + v = l.get_item('1') vs = l.get_items() - items = {'1': 'hi', - '2': 'bye', - '3': 'thanks', - '4': "you're welcome", - '5': 'foo', - '6': 'bar', - '7': 'bar'} - self.assertEqual(r, ['7', '1', '2', '5', '3', '6', '4'], + items = {'1': 'bar', + '2': 'bye'} + self.assertEqual(r, ['1', '2'], "Sorted feed results don't match: %s" % r) self.assertEqual(v, 'bar', "Items don't match: %s" % v) self.assertEqual(vs, items, "Sorted feed items don't match: %s" % vs) def test_60_sorted_feed_retract(self): """Test retracting an item from a sorted feed""" - l = self.ps.sorted_feed("testfeed") + l = self.ps.sorted_feed("sortedfeed") + l.publish("hi") + l.publish("bye") + l.publish("thanks") + l.publish("you're welcome") l.retract('3') r = l.get_ids() - self.assertEqual(r, ['7', '1', '2', '5', '6', '4'], + self.assertEqual(r, ['1', '2', '4'], "Sorted feed results don't match: %s" % r) - def test_70_sorted_feed_move(self): + def test_70_sorted_feed_move_first(self): """Test moving items around in the feed.""" - l = self.ps.sorted_feed('testfeed') - l.move_first('6') + l = self.ps.sorted_feed('sortedfeed') + l.publish("hi") + l.publish("bye") + l.publish("thanks") + l.publish("you're welcome") + l.move_first('4') r = l.get_ids() - self.assertEqual(r, ['6', '7', '1', '2', '5', '4'], + self.assertEqual(r, ['4', '1', '2', '3'], "Sorted feed results don't match: %s" % r) - l.move_last('7') + def test_71_sorted_feed_move_last(self): + """Test moving items around in the feed.""" + l = self.ps.sorted_feed('sortedfeed') + l.publish("hi") + l.publish("bye") + l.publish("thanks") + l.publish("you're welcome") + l.move_last('2') r = l.get_ids() - self.assertEqual(r, ['6', '1', '2', '5', '4', '7'], + self.assertEqual(r, ['1', '3', '4', '2'], "Sorted feed results don't match: %s" % r) - l.move_before('2', '5') + def test_72_sorted_feed_move_before(self): + """Test moving items around in the feed.""" + l = self.ps.sorted_feed('sortedfeed') + l.publish("hi") + l.publish("bye") + l.publish("thanks") + l.publish("you're welcome") + l.move_before('1', '2') r = l.get_ids() - self.assertEqual(r, ['6', '1', '5', '2', '4', '7'], + self.assertEqual(r, ['2', '1', '3', '4'], "Sorted feed results don't match: %s" % r) + def test_73_sorted_feed_move_after(self): + """Test moving items around in the feed.""" + l = self.ps.sorted_feed('sortedfeed') + l.publish("hi") + l.publish("bye") + l.publish("thanks") + l.publish("you're welcome") l.move_after('1', '4') r = l.get_ids() - self.assertEqual(r, ['6', '1', '4', '5', '2', '7'], + self.assertEqual(r, ['1', '4', '2', '3'], "Sorted feed results don't match: %s" % r) diff --git a/thoonk/__init__.py b/thoonk/__init__.py index 7a63f2b..c532d1f 100644 --- a/thoonk/__init__.py +++ b/thoonk/__init__.py @@ -6,5 +6,5 @@ from pubsub import Thoonk from pubsub import Thoonk as Pubsub -__version__ = '1.0.0rc1' -__version_info__ = (1, 0, 0, 'rc1', 0) +__version__ = '1.0.0rc1a' +__version_info__ = (1, 0, 0, 'rc1a', 0) diff --git a/thoonk/cache.py b/thoonk/cache.py new file mode 100644 index 0000000..dd8e3f5 --- /dev/null +++ b/thoonk/cache.py @@ -0,0 +1,59 @@ +""" + Written by Nathan Fritz and Lance Stout. Copyright 2011 by &yet, LLC. + Released under the terms of the MIT License +""" + +import threading +import uuid +from thoonk.exceptions import FeedDoesNotExist + +class FeedCache(object): + + """ + The FeedCache class stores an in-memory version of each + feed. As there may be multiple systems using + Thoonk with the same Redis server, and each with its own + FeedCache instance, each FeedCache has a self.instance + field to uniquely identify itself. + + Attributes: + thoonk -- The main Thoonk object. + instance -- A hex string for uniquely identifying this + FeedCache instance. + + Methods: + invalidate -- Force a feed's config to be retrieved from + Redis instead of in-memory. + """ + + def __init__(self, thoonk): + """ + Create a new configuration cache. + + Arguments: + thoonk -- The main Thoonk object. + """ + self._feeds = {} + self.thoonk = thoonk + self.lock = threading.Lock() + + def __getitem__(self, feed): + """ + Return a feed object for a given feed name. + + Arguments: + feed -- The name of the requested feed. + """ + with self.lock: + if feed not in self._feeds: + feed_type = self.thoonk.redis.hget('feed.config:%s' % feed, "type") + if not feed_type: + raise FeedDoesNotExist + self._feeds[feed] = self.thoonk.feedtypes[feed_type](self.thoonk, feed) + return self._feeds[feed] + + def __delitem__(self, feed): + with self.lock: + if feed in self._feeds: + self._feeds[feed].delete() + del self._feeds[feed] diff --git a/thoonk/config.py b/thoonk/config.py deleted file mode 100644 index 7d1e4da..0000000 --- a/thoonk/config.py +++ /dev/null @@ -1,80 +0,0 @@ -""" - Written by Nathan Fritz and Lance Stout. Copyright 2011 by &yet, LLC. - Released under the terms of the MIT License -""" - -import json -import threading -import uuid - - -class ConfigCache(object): - - """ - The ConfigCache class stores an in-memory version of each - feed's configuration. As there may be multiple systems using - Thoonk with the same Redis server, and each with its own - ConfigCache instance, each ConfigCache has a self.instance - field to uniquely identify itself. - - Attributes: - thoonk -- The main Thoonk object. - instance -- A hex string for uniquely identifying this - ConfigCache instance. - - Methods: - invalidate -- Force a feed's config to be retrieved from - Redis instead of in-memory. - """ - - def __init__(self, thoonk): - """ - Create a new configuration cache. - - Arguments: - thoonk -- The main Thoonk object. - """ - self._feeds = {} - self.thoonk = thoonk - self.lock = threading.Lock() - self.instance = uuid.uuid4().hex - - def __getitem__(self, feed): - """ - Return a feed object for a given feed name. - - Arguments: - feed -- The name of the requested feed. - """ - with self.lock: - if feed in self._feeds: - return self._feeds[feed] - else: - if not self.thoonk.feed_exists(feed): - raise FeedDoesNotExist - config = self.thoonk.redis.get('feed.config:%s' % feed) - config = json.loads(config) - feed_type = config.get(u'type', u'feed') - feed_class = self.thoonk.feedtypes[feed_type] - self._feeds[feed] = feed_class(self.thoonk, feed, config) - return self._feeds[feed] - - def invalidate(self, feed, instance, delete=False): - """ - Delete a configuration so that it will be retrieved from Redis - instead of from the cache. - - Arguments: - feed -- The name of the feed to invalidate. - instance -- A UUID identifying the cache which made the - invalidation request. - delete -- Indicates if the entire feed object should be - invalidated, or just its configuration. - """ - if instance != self.instance: - with self.lock: - if feed in self._feeds: - if delete: - del self._feeds[feed] - else: - del self._feeds[feed].config diff --git a/thoonk/exceptions.py b/thoonk/exceptions.py index fe9da70..440c37a 100644 --- a/thoonk/exceptions.py +++ b/thoonk/exceptions.py @@ -7,12 +7,20 @@ class FeedExists(Exception): pass -class NotAllowed(Exception): - pass - class FeedDoesNotExist(Exception): pass class ItemDoesNotExist(Exception): pass +class JobNotClaimed(Exception): + pass + +class JobNotStalled(Exception): + pass + +class Empty(Exception): + pass + +class NotListening(Exception): + pass \ No newline at end of file diff --git a/thoonk/feeds/feed.py b/thoonk/feeds/feed.py index ed89e04..6741a7c 100644 --- a/thoonk/feeds/feed.py +++ b/thoonk/feeds/feed.py @@ -3,17 +3,9 @@ Released under the terms of the MIT License """ -import json -import threading import time import uuid -try: - import queue -except ImportError: - import Queue as queue - -from thoonk.exceptions import * - +from thoonk.exceptions import ItemDoesNotExist class Feed(object): @@ -28,7 +20,6 @@ class Feed(object): thoonk -- The main Thoonk object. redis -- A Redis connection instance from the Thoonk object. feed -- The name of the feed. - config -- A dictionary of configuration values. Redis Keys Used: feed.ids:[feed] -- A sorted set of item IDs. @@ -54,7 +45,7 @@ class Feed(object): retract -- Remove an item from the feed. """ - def __init__(self, thoonk, feed, config=None): + def __init__(self, thoonk, feed): """ Create a new Feed object for a given Thoonk feed. @@ -67,12 +58,9 @@ def __init__(self, thoonk, feed, config=None): feed -- The name of the feed. config -- Optional dictionary of configuration values. """ - self.config_lock = threading.Lock() - self.config_valid = False self.thoonk = thoonk self.redis = thoonk.redis self.feed = feed - self._config = None self.feed_ids = 'feed.ids:%s' % feed self.feed_items = 'feed.items:%s' % feed @@ -114,39 +102,6 @@ def event_retract(self, id): """ pass - @property - def config(self): - """ - Return the feed's configuration. - - If the cached version is marked as invalid, then a new copy of - the config will be retrieved from Redis. - """ - with self.config_lock: - if not self.config_valid: - conf = self.redis.get(self.feed_config) - self._config = json.loads(conf) - self.config_valid = True - return self._config - - @config.setter - def config(self, config): - """ - Set a new configuration for the feed. - - Arguments: - config -- A dictionary of configuration values. - """ - with self.config_lock: - self.thoonk.set_config(self.feed, config) - self.config_valid = False - - @config.deleter - def config(self): - """Mark the current configuration cache as invalid.""" - with self.config_lock: - self.config_valid = False - def delete_feed(self): """Delete the feed and its contents.""" self.thoonk.delete_feed(self.feed) @@ -156,7 +111,7 @@ def get_schemas(self): return set((self.feed_ids, self.feed_items, self.feed_publish, self.feed_publishes, self.feed_retract, self.feed_config, self.feed_edit)) - + # Thoonk Standard API # ================================================================= @@ -195,38 +150,15 @@ def publish(self, item, id=None): item -- The content of the item to add to the feed. id -- Optional ID to use for the item, if the ID already exists, the existing item will be replaced. + + Returns a tuple containing the ID of the item and a flag indicating if + the item was created or edited """ - publish_id = id - if publish_id is None: - publish_id = uuid.uuid4().hex - while True: - self.redis.watch(self.feed_ids) - - max = int(self.config.get('max_length', 0)) - pipe = self.redis.pipeline() - if max > 0: - delete_ids = self.redis.zrange(self.feed_ids, 0, -max) - for id in delete_ids: - if id != publish_id: - pipe.zrem(self.feed_ids, id) - pipe.hdel(self.feed_items, id) - pipe.publish(self.feed_retract, id) - pipe.zadd(self.feed_ids, publish_id, time.time()) - pipe.incr(self.feed_publishes) - pipe.hset(self.feed_items, publish_id, item) - try: - results = pipe.execute() - break - except redis.exceptions.WatchError: - pass - - if results[-3]: - # If zadd was successful - self.thoonk._publish(self.feed_publish, (publish_id, item)) - else: - self.thoonk._publish(self.feed_edit, (publish_id, item)) - - return publish_id + if id is None: + id = uuid.uuid4().hex + created = self.redis.evalsha(self.thoonk.scripts["feed/publish"], 0, + self.feed, id, item, int(time.time()*1000)) + return id, created def retract(self, id): """ @@ -235,18 +167,7 @@ def retract(self, id): Arguments: id -- The ID value of the item to remove. """ - while True: - self.redis.watch(self.feed_ids) - if self.redis.zrank(self.feed_ids, id) is not None: - pipe = self.redis.pipeline() - pipe.zrem(self.feed_ids, id) - pipe.hdel(self.feed_items, id) - pipe.publish(self.feed_retract, id) - try: - pipe.execute() - return - except redis.exceptions.WatchError: - pass - else: - self.redis.unwatch() - break + success = self.redis.evalsha(self.thoonk.scripts["feed/retract"], 0, + self.feed, id) + if not success: + raise ItemDoesNotExist \ No newline at end of file diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index 43d06c6..499f458 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -6,17 +6,9 @@ import time import uuid -from thoonk.exceptions import * from thoonk.feeds import Queue - - -class JobDoesNotExist(Exception): - pass - - -class JobNotPending(Exception): - pass - +from thoonk.exceptions import Empty, JobNotClaimed, JobNotStalled,\ + ItemDoesNotExist class Job(Queue): @@ -55,9 +47,10 @@ class Job(Queue): feed.cancelled:[feed] -- A hash table of cancelled jobs. feed.claimed:[feed] -- A hash table of claimed jobs. feed.stalled:[feed] -- A hash table of stalled jobs. - feeed.funning:[feed] -- A hash table of running jobs. - feed.finished:[feed]\x00[id] -- Temporary queue for receiving job - result data. + feed.running:[feed] -- A hash table of running jobs. + feed.publishes:[feed] -- A count of the number of jobs published + feed.finishes:[feed] -- A count of the number of jobs finished + job.finish:[feed] -- A pubsub channel for job results Thoonk.py Implementation API: get_schemas -- Return the set of Redis keys used by this feed. @@ -75,7 +68,7 @@ class Job(Queue): stall -- Pause execution of a queued job. """ - def __init__(self, thoonk, feed, config=None): + def __init__(self, thoonk, feed): """ Create a new Job queue object for a given Thoonk feed. @@ -88,26 +81,30 @@ def __init__(self, thoonk, feed, config=None): feed -- The name of the feed. config -- Optional dictionary of configuration values. """ - Queue.__init__(self, thoonk, feed, config=None) - + Queue.__init__(self, thoonk, feed) + + self.feed_publishes = 'feed.publishes:%s' % feed self.feed_published = 'feed.published:%s' % feed self.feed_cancelled = 'feed.cancelled:%s' % feed - self.feed_job_claimed = 'feed.claimed:%s' % feed - self.feed_job_stalled = 'feed.stalled:%s' % feed - self.feed_job_finished = 'feed.finished:%s\x00%s' % (feed, '%s') - self.feed_job_running = 'feed.running:%s' % feed + self.feed_retried = 'feed.retried:%s' % feed + self.feed_finishes = 'feed.finishes:%s' % feed + self.feed_claimed = 'feed.claimed:%s' % feed + self.feed_stalled = 'feed.stalled:%s' % feed + self.feed_running = 'feed.running:%s' % feed + + self.job_finish = 'job.finish:%s' % feed + + def get_channels(self): + return (self.feed_publishes, self.feed_claimed, self.feed_stalled, + self.feed_finishes, self.feed_cancelled, self.feed_retried) def get_schemas(self): """Return the set of Redis keys used exclusively by this feed.""" - schema = set((self.feed_job_claimed, - self.feed_job_stalled, - self.feed_job_running, - self.feed_published, + schema = set((self.feed_claimed, + self.feed_stalled, + self.feed_running, + self.feed_publishes, self.feed_cancelled)) - - for id in self.get_ids(): - schema.add(self.feed_job_finished % id) - return schema.union(Queue.get_schemas(self)) def get_ids(self): @@ -121,25 +118,9 @@ def retract(self, id): Arguments: id -- The ID of the job to remove. """ - while True: - self.redis.watch(self.feed_items) - if self.redis.hexists(self.feed_items, id): - pipe = self.redis.pipeline() - pipe.hdel(self.feed_items, id) - pipe.hdel(self.feed_cancelled, id) - pipe.zrem(self.feed_published, id) - pipe.srem(self.feed_job_stalled, id) - pipe.zrem(self.feed_job_claimed, id) - pipe.lrem(self.feed_ids, 1, id) - pipe.delete(self.feed_job_finished % id) - try: - pipe.execute() - return - except redis.exceptions.WatchError: - pass - else: - self.redis.unwatch() - break + success = self.redis.evalsha(self.thoonk.scripts["jobs/retract"], 0, self.feed, id) + if not success: + raise ItemDoesNotExist def put(self, item, priority=False): """ @@ -154,19 +135,13 @@ def put(self, item, priority=False): queue instead of the end. """ id = uuid.uuid4().hex - pipe = self.redis.pipeline() - - if priority: - pipe.rpush(self.feed_ids, id) - pipe.hset(self.feed_items, id, item) - pipe.zadd(self.feed_publishes, id, time.time()) + added = self.redis.evalsha(self.thoonk.scripts["jobs/publish"], 0, + self.feed, id, item, int(time.time()*1000), 1 if priority else None) + if added: + # If zadd was successful + self.thoonk._publish(self.feed_publishes, (id, item)) else: - pipe.lpush(self.feed_ids, id) - pipe.incr(self.feed_publishes) - pipe.hset(self.feed_items, id, item) - pipe.zadd(self.feed_published, id, time.time()) - - results = pipe.execute() + self.thoonk._publish(self.feed_edit, (id, item)) return id def get(self, timeout=0): @@ -178,64 +153,35 @@ def get(self, timeout=0): Arguments: timeout -- Optional time in seconds to wait before raising an exception. + + Returns: + id -- The id of the job + job -- The job content """ id = self.redis.brpop(self.feed_ids, timeout) if id is None: - return # raise exception? + raise Empty id = id[1] - - pipe = self.redis.pipeline() - pipe.zadd(self.feed_job_claimed, id, time.time()) - pipe.hget(self.feed_items, id) - result = pipe.execute() + result = self.redis.evalsha(self.thoonk.scripts["jobs/get"], 0, self.feed, id, + int(time.time()*1000)) return id, result[1] - def finish(self, id, item=None, result=False, timeout=None): + def get_failure_count(self, id): + return int(self.redis.hget(self.feed_cancelled, id) or 0) + + NO_RESULT = [] + def finish(self, id, result=NO_RESULT): """ Mark a job as completed, and store any results. Arguments: id -- The ID of the completed job. - item -- The result data from the job. - result -- Flag indicating that result data should be stored. - Defaults to False. - timeout -- Time in seconds to keep the result data. The default - is to store data indefinitely until retrieved. - """ - while True: - self.redis.watch(self.feed_job_claimed) - if self.redis.zrank(self.feed_job_claimed, id) is None: - self.redis.unwatch() - return # raise exception? - - query = self.redis.hget(self.feed_items, id) - - pipe = self.redis.pipeline() - pipe.zrem(self.feed_job_claimed, id) - pipe.hdel(self.feed_cancelled, id) - if result: - pipe.lpush(self.feed_job_finished % id, item) - if timeout is not None: - pipe.expire(self.feed_job_finished % id, timeout) - pipe.hdel(self.feed_items, id) - try: - result = pipe.execute() - break - except redis.exceptions.WatchError: - pass - - def get_result(self, id, timeout=0): - """ - Retrieve the result of a given job. - - Arguments: - id -- The ID of the job to check for results. - timeout -- Time in seconds to wait for results to arrive. - Default is to block indefinitely. + result -- The result data from the job. (should be a string!) """ - result = self.redis.brpop(self.feed_job_finished % id, timeout) - if result is not None: - return result + success = self.redis.evalsha(self.thoonk.scripts["jobs/finish"], 0, self.feed, id, + *([result] if result is not self.NO_RESULT else [])) + if not success: + raise JobNotClaimed def cancel(self, id): """ @@ -244,21 +190,9 @@ def cancel(self, id): Arguments: id -- The ID of the job to cancel. """ - while True: - self.redis.watch(self.feed_job_claimed) - if self.redis.zrank(self.feed_job_claimed, id) is None: - self.redis.unwatch() - return # raise exception? - - pipe = self.redis.pipeline() - pipe.hincrby(self.feed_cancelled, id, 1) - pipe.lpush(self.feed_ids, id) - pipe.zrem(self.feed_job_claimed, id) - try: - pipe.execute() - break - except redis.exceptions.WatchError: - pass + success = self.redis.evalsha(self.thoonk.scripts["jobs/cancel"], 0, self.feed, id) + if not success: + raise JobNotClaimed def stall(self, id): """ @@ -269,22 +203,9 @@ def stall(self, id): Arguments: id -- The ID of the job to pause. """ - while True: - self.redis.watch(self.feed_job_claimed) - if self.redis.zrank(self.feed_job_claimed, id) is None: - self.redis.unwatch() - return # raise exception? - - pipe = self.redis.pipeline() - pipe.zrem(self.feed_job_claimed, id) - pipe.hdel(self.feed_cancelled, id) - pipe.sadd(self.feed_job_stalled, id) - pipe.zrem(self.feed_published, id) - try: - pipe.execute() - break - except redis.exceptions.WatchError: - pass + success = self.redis.evalsha(self.thoonk.scripts["jobs/stall"], 0, self.feed, id) + if not success: + raise JobNotClaimed def retry(self, id): """ @@ -293,23 +214,10 @@ def retry(self, id): Arguments: id -- The ID of the job to resume. """ - while True: - self.redis.watch(self.feed_job_stalled) - if self.redis.sismember(self.feed_job_stalled, id) is None: - self.redis.unwatch() - return # raise exception? - - pipe = self.redis.pipeline() - pipe.srem(self.feed_job_stalled, id) - pipe.lpush(self.feed_ids, id) - pipe.zadd(self.feed_published, time.time(), id) - try: - results = pipe.execute() - if not results[0]: - return # raise exception? - break - except redis.exceptions.WatchError: - pass + success = self.redis.evalsha(self.thoonk.scripts["jobs/retry"], 0, self.feed, id, + int(time.time()*1000)) + if not success: + raise JobNotStalled def maintenance(self): """ @@ -324,8 +232,8 @@ def maintenance(self): pipe = self.redis.pipeline() pipe.hkeys(self.feed_items) pipe.lrange(self.feed_ids) - pipe.zrange(self.feed_job_claimed, 0, -1) - pipe.stall = pipe.smembers(self.feed_job_stalled) + pipe.zrange(self.feed_claimed, 0, -1) + pipe.stall = pipe.smembers(self.feed_stalled) keys, avail, claim, stall = pipe.execute() diff --git a/thoonk/feeds/queue.py b/thoonk/feeds/queue.py index 1c11791..d3716e4 100644 --- a/thoonk/feeds/queue.py +++ b/thoonk/feeds/queue.py @@ -5,14 +5,9 @@ import uuid -from thoonk.exceptions import * +from thoonk.exceptions import Empty from thoonk.feeds import Feed - -class Empty(Exception): - pass - - class Queue(Feed): """ diff --git a/thoonk/feeds/sorted_feed.py b/thoonk/feeds/sorted_feed.py index 9eb6eee..58f5d30 100644 --- a/thoonk/feeds/sorted_feed.py +++ b/thoonk/feeds/sorted_feed.py @@ -3,9 +3,7 @@ Released under the terms of the MIT License """ -from thoonk.exceptions import * -from thoonk.feeds import * - +from thoonk.feeds import Feed class SortedFeed(Feed): @@ -34,7 +32,7 @@ class SortedFeed(Feed): publish_before -- Add an item immediately after an existing item. """ - def __init__(self, thoonk, feed, config=None): + def __init__(self, thoonk, feed): """ Create a new SortedFeed object for a given Thoonk feed. @@ -48,7 +46,7 @@ def __init__(self, thoonk, feed, config=None): config -- Optional dictionary of configuration values. """ - Feed.__init__(self, thoonk, feed, config) + Feed.__init__(self, thoonk, feed) self.feed_id_incr = 'feed.idincr:%s' % feed self.feed_position = 'feed.position:%s' % feed @@ -87,8 +85,8 @@ def prepend(self, item): pipe.lpush(self.feed_ids, id) pipe.incr(self.feed_publishes) pipe.hset(self.feed_items, id, item) - pipe.publish(self.feed_publish, '%s\x00%s' % (id, item)) - pipe.publish(self.feed_position, '%s\x00%s' % (id, 'begin:')) + self.thoonk._publish(self.feed_publish, (str(id), item), pipe) + self.thoonk._publish(self.feed_position, (str(id), 'begin:'), pipe) pipe.execute() return id @@ -105,27 +103,21 @@ def __insert(self, item, rel_id, method): to rel_id. """ id = self.redis.incr(self.feed_id_incr) - while True: - self.redis.watch(self.feed_items) - if not self.redis.hexists(self.feed_items, rel_id): - self.redis.unwatch() + if method == 'BEFORE': + pos_rel_id = ':%s' % rel_id + else: + pos_rel_id = '%s:' % rel_id + + def _insert(pipe): + if not pipe.hexists(self.feed_items, rel_id): return # raise exception? - - pipe = self.redis.pipeline() + pipe.multi() pipe.linsert(self.feed_ids, method, rel_id, id) pipe.hset(self.feed_items, id, item) - pipe.publish(self.feed_publish, '%s\x00%s' % (id, item)) - if method == 'BEFORE': - rel_id = ':%s' % rel_id - else: - rel_id = '%s:' % rel_id - pipe.publish(self.feed_position, '%s\x00%s' % (id, rel_id)) - - try: - pipe.execute() - break - except redis.exceptions.WatchError: - pass + self.thoonk._publish(self.feed_publish, (str(id), item), pipe) + self.thoonk._publish(self.feed_position, (str(id), pos_rel_id), pipe) + + self.redis.transaction(_insert, self.feed_items) return id def publish(self, item): @@ -142,8 +134,8 @@ def publish(self, item): pipe.rpush(self.feed_ids, id) pipe.incr(self.feed_publishes) pipe.hset(self.feed_items, id, item) - pipe.publish(self.feed_publish, '%s\x00%s' % (id, item)) - pipe.publish(self.feed_position, '%s\x00%s' % (id, ':end')) + self.thoonk._publish(self.feed_publish, (str(id), item), pipe) + self.thoonk._publish(self.feed_position, (str(id), ':end'), pipe) pipe.execute() return id @@ -155,22 +147,15 @@ def edit(self, id, item): id -- The ID value of the item to edit. item -- The new contents of the item. """ - while True: - self.redis.watch(self.feed_items) - if not self.redis.hexists(self.feed_items, id): - self.redis.unwatch() + def _edit(pipe): + if not pipe.hexists(self.feed_items, id): return # raise exception? - - pipe = self.redis.pipeline() + pipe.multi() pipe.hset(self.feed_items, id, item) pipe.incr(self.feed_publishes) pipe.publish(self.feed_publish, '%s\x00%s' % (id, item)) - - try: - pipe.execute() - break - except redis.exceptions.WatchError: - pass + + self.redis.transaction(_edit, self.feed_items) def publish_before(self, before_id, item): """ @@ -215,19 +200,15 @@ def move(self, rel_position, id): rel_id = rel_position[:-1] else: raise ValueError('Relative ID formatted incorrectly') - - while True: - self.redis.watch(self.feed_items) - if not self.redis.hexists(self.feed_items, id): - self.redis.unwatch() - break + + def _move(pipe): + if not pipe.hexists(self.feed_items, id): + return if rel_id not in ['begin', 'end'] and \ - not self.redis.hexists(self.feed_items, rel_id): - self.redis.unwatch() - break - - pipe = self.redis.pipeline() - pipe.lrem(self.feed_ids, id, 1) + not pipe.hexists(self.feed_items, rel_id): + return + pipe.multi() + pipe.lrem(self.feed_ids, 1, id) if rel_id == 'begin': pipe.lpush(self.feed_ids, id) elif rel_id == 'end': @@ -237,12 +218,8 @@ def move(self, rel_position, id): pipe.publish(self.feed_position, '%s\x00%s' % (id, rel_position)) - - try: - pipe.execute() - break - except redis.exceptions.WatchError: - pass + + self.redis.transaction(_move, self.feed_items) def move_before(self, rel_id, id): """ @@ -289,21 +266,14 @@ def retract(self, id): Arguments: id -- The ID value of the item to remove. """ - while True: - self.redis.watch(self.feed_items) - if self.redis.hexists(self.feed_items, id): - pipe = self.redis.pipeline() - pipe.lrem(self.feed_ids, id, 1) + def _retract(pipe): + if pipe.hexists(self.feed_items, id): + pipe.multi() + pipe.lrem(self.feed_ids, 1, id) pipe.hdel(self.feed_items, id) pipe.publish(self.feed_retract, id) - try: - pipe.execute() - break - except redis.exceptions.WatchError: - pass - else: - self.redis.unwatch() - return + + self.redis.transaction(_retract, self.feed_items) def get_ids(self): """Return the set of IDs used by items in the feed.""" diff --git a/thoonk/pubsub.py b/thoonk/pubsub.py index a060798..600b002 100644 --- a/thoonk/pubsub.py +++ b/thoonk/pubsub.py @@ -3,15 +3,14 @@ Released under the terms of the MIT License """ -import json import redis import threading import uuid -from thoonk import feeds -from thoonk.exceptions import * -from thoonk.config import ConfigCache - +from thoonk import feeds, cache +from thoonk.exceptions import FeedExists, FeedDoesNotExist, NotListening +import os +import json class Thoonk(object): @@ -40,7 +39,6 @@ class Thoonk(object): Attributes: db -- The Redis database number. feeds -- A set of known feed names. - _feed_config -- A cache of feed configurations. feedtypes -- A dictionary mapping feed type names to their implementation classes. handlers -- A dictionary mapping event names to event handlers. @@ -84,28 +82,25 @@ def __init__(self, host='localhost', port=6379, db=0, listen=False): self.host = host self.port = port self.db = db - self.redis = redis.Redis(host=self.host, port=self.port, db=self.db) - self.lredis = None + self.redis = redis.StrictRedis(host=self.host, port=self.port, db=self.db) + self._feeds = cache.FeedCache(self) + self.instance = uuid.uuid4().hex self.feedtypes = {} - self.feeds = set() - self._feed_config = ConfigCache(self) - self.handlers = { - 'create_notice': [], - 'delete_notice': [], - 'publish_notice': [], - 'retract_notice': [], - 'position_notice': []} - - self.listen_ready = threading.Event() + + self.scripts = {} + for dirpath, _, filenames in os.walk("scripts/"): + for filename in filenames: + if filename.endswith(".lua"): + f = open(os.path.join(dirpath, filename), "r") + if len(dirpath) > 8: + filename = dirpath[8:]+"/"+filename + self.scripts[filename[:-4]] = self.redis.script("LOAD", f.read()) self.listening = listen self.feed_publish = 'feed.publish:%s' self.feed_retract = 'feed.retract:%s' self.feed_config = 'feed.config:%s' - self.conf_feed = 'conffeed' - self.new_feed = 'newfeed' - self.del_feed = 'delfeed' self.register_feedtype(u'feed', feeds.Feed) self.register_feedtype(u'queue', feeds.Queue) @@ -114,42 +109,25 @@ def __init__(self, host='localhost', port=6379, db=0, listen=False): self.register_feedtype(u'sorted_feed', feeds.SortedFeed) if listen: - #start listener thread - self.lthread = threading.Thread(target=self.listen) - self.lthread.daemon = True - self.lthread.start() - self.listen_ready.wait() + self.listener = ThoonkListener(self) + self.listener.start() + self.listener.ready.wait() - def _publish(self, schema, items): + def _publish(self, schema, items=[], pipe=None): """ A shortcut method to publish items separated by \x00. Arguments: schema -- The key to publish the items to. items -- A tuple or list of items to publish. + pipe -- A redis pipeline to use to publish the item using. + Note: it is up to the caller to execute the pipe after + publishing """ - self.redis.publish(schema, "\x00".join(items)) - - def __getitem__(self, feed): - """ - Return the configuration for a feed. - - Arguments: - feed -- The name of the feed. - - Returns: Dict - """ - return self._feed_config[feed] - - def __setitem__(self, feed, config): - """ - Set the configuration for a feed. - - Arguments: - feed -- The name of the feed. - config -- A dict of config values. - """ - self.set_config(feed, config) + if pipe: + pipe.publish(schema, "\x00".join(items)) + else: + self.redis.publish(schema, "\x00".join(items)) def register_feedtype(self, feedtype, klass): """ @@ -178,15 +156,16 @@ def startclass(feed, config=None): """ if config is None: config = {} - if self.feed_exists(feed): - return self[feed] - else: - if not config.get('type', False): - config['type'] = feedtype - return self.create_feed(feed, config) + config['type'] = feedtype + try: + self.create_feed(feed, config) + except FeedExists: + pass + return self._feeds[feed] + setattr(self, feedtype, startclass) - + def register_handler(self, name, handler): """ Register a function to respond to feed events. @@ -202,10 +181,24 @@ def register_handler(self, name, handler): name -- The name of the feed event. handler -- The function for handling the event. """ - if name not in self.handlers: - self.handlers[name] = [] - self.handlers[name].append(handler) + if self.listener: + self.listener.register_handler(name, handler) + else: + raise NotListening + + def remove_handler(self, name, handler): + """ + Unregister a function that was registered via register_handler + Arguments: + name -- The name of the feed event. + handler -- The function for handling the event. + """ + if self.listener: + self.listener.remove_handler(name, handler) + else: + raise NotListening + def create_feed(self, feed, config): """ Create a new feed with a given configuration. @@ -217,14 +210,12 @@ def create_feed(self, feed, config): feed -- The name of the new feed. config -- A dictionary of configuration values. """ - if config is None: - config = {} - if not self.redis.sadd("feeds", feed): + if 'type' not in config: + config['type'] = 'feed' + success = self.redis.evalsha(self.scripts["create"], 0, feed, + json.dumps(config), self.instance) + if not success: raise FeedExists - self.feeds.add(feed) - self.set_config(feed, config) - self._publish(self.new_feed, (feed, self._feed_config.instance)) - return self[feed] def delete_feed(self, feed): """ @@ -233,24 +224,11 @@ def delete_feed(self, feed): Arguments: feed -- The name of the feed. """ - feed_instance = self._feed_config[feed] - deleted = False - while not deleted: - self.redis.watch('feeds') - if not self.feed_exists(feed): - return FeedDoesNotExist - pipe = self.redis.pipeline() - pipe.srem("feeds", feed) - for key in feed_instance.get_schemas(): - pipe.delete(key) - self._publish(self.del_feed, (feed, self._feed_config.instance)) - try: - pipe.execute() - deleted = True - except redis.exceptions.WatchError: - deleted = False + success = self.redis.evalsha(self.scripts["delete"], 0, feed, self.instance) + if not success: + raise FeedDoesNotExist - def set_config(self, feed, config): + def set_config(self, feed, config, new_feed=False): """ Set the configuration for a given feed. @@ -258,28 +236,18 @@ def set_config(self, feed, config): feed -- The name of the feed. config -- A dictionary of configuration values. """ - if not self.feed_exists(feed): + success = self.redis.evalsha(self.scripts["config"], 0, feed, + self.instance, json.dumps(config)) + if not success: raise FeedDoesNotExist - if type(config) == dict: - if u'type' not in config: - config[u'type'] = u'feed' - jconfig = json.dumps(config) - dconfig = config - else: - dconfig = json.loads(config) - if u'type' not in dconfig: - dconfig[u'type'] = u'feed' - jconfig = json.dumps(dconfig) - self.redis.set(self.feed_config % feed, jconfig) - self._publish(self.conf_feed, (feed, self._feed_config.instance)) - - def get_feeds(self): + + def get_feed_names(self): """ Return the set of known feeds. Returns: set """ - return self.feeds + return self.redis.smembers('feeds') or set() def feed_exists(self, feed): """ @@ -288,23 +256,40 @@ def feed_exists(self, feed): Arguments: feed -- The name of the feed. """ - if not self.listening: - if not feed in self.feeds: - if self.redis.sismember('feeds', feed): - self.feeds.add(feed) - return True - return False - else: - return True - return feed in self.feeds + return self.redis.sismember('feeds', feed) def close(self): """Terminate the listening Redis connection.""" - self.redis.connection.disconnect() if self.listening: - self.lredis.connection.disconnect() - - def listen(self): + self.redis.publish(self.listener._finish_channel, "") + self.listener.finished.wait() + self.redis.connection_pool.disconnect() + + +class ThoonkListener(threading.Thread): + + def __init__(self, thoonk, *args, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) + self.lock = threading.Lock() + self.handlers = {} + self.thoonk = thoonk + self.ready = threading.Event() + self.redis = redis.StrictRedis(host=thoonk.host, port=thoonk.port, + db=thoonk.db) + self.finished = threading.Event() + self.instance = thoonk.instance + self._finish_channel = "listenerclose_%s" % self.instance + self._pubsub = None + self.daemon = True + + def finish(self): + self.redis.publish(self._finish_channel, "") + + def _channels_for_feed(self, name): + return ("feed.publish:"+name, "feed.edit:"+name, "feed.retract:"+name, + "feed.position:"+name, "job.finish:"+name) + + def run(self): """ Listen for feed creation and manipulation events and execute relevant event handlers. Specifically, listen for: @@ -315,112 +300,105 @@ def listen(self): - Item retractions. """ # listener redis object - self.lredis = redis.Redis(host=self.host, port=self.port, db=self.db) - + self._pubsub = self.redis.pubsub() # subscribe to feed activities channel - self.lredis.subscribe((self.new_feed, self.del_feed, self.conf_feed)) - - # get set of feeds - self.feeds.update(self.redis.smembers('feeds')) - if self.feeds: - # subscribe to exist feeds retract and publish - for feed in self.feeds: - self.lredis.subscribe(self[feed].get_channels()) - - self.listen_ready.set() - for event in self.lredis.listen(): - if event['type'] == 'message': - if event['channel'].startswith('feed.publish'): - #feed publish event - id, item = event['data'].split('\x00', 1) - self.publish_notice(event['channel'].split(':', 1)[-1], - item, id) - elif event['channel'].startswith('feed.retract'): - self.retract_notice(event['channel'].split(':', 1)[-1], - event['data']) - elif event['channel'].startswith('feed.position'): - self.position_notice(event['channel'].split(':', 1)[-1], - event['data']) - elif event['channel'] == self.new_feed: - #feed created event - name, instance = event['data'].split('\x00') - self.feeds.add(name) - self.lredis.subscribe((self.feed_publish % name, - self.feed_retract % name)) - self.create_notice(name) - elif event['channel'] == self.del_feed: - #feed destroyed event - name, instance = event['data'].split('\x00') - try: - self.feeds.remove(name) - except KeyError: - #already removed -- probably locally - pass - self._feed_config.invalidate(name, instance, delete=True) - self.delete_notice(name) - elif event['channel'] == self.conf_feed: - feed, instance = event['data'].split('\x00', 1) - self._feed_config.invalidate(feed, instance) - - def create_notice(self, feed): - """ - Generate a notice that a new feed has been created and - execute any relevant event handlers. - - Arguments: - feed -- The name of the created feed. - """ - for handler in self.handlers['create_notice']: - handler(feed) - - def delete_notice(self, feed): - """ - Generate a notice that a feed has been deleted, and - execute any relevant event handlers. - - Arguments: - feed -- The name of the deleted feed. - """ - for handler in self.handlers['delete_notice']: - handler(feed) - - def publish_notice(self, feed, item, id): - """ - Generate a notice that an item has been published to a feed, and - execute any relevant event handlers. + self._pubsub.subscribe((self._finish_channel, 'newfeed', 'delfeed', + 'conffeed')) + + + # subscribe to exist feeds retract and publish + for feed in self.redis.smembers("feeds"): + self._pubsub.subscribe(self._channels_for_feed(feed)) + + self.ready.set() + for event in self._pubsub.listen(): + type = event.pop("type") + if event["channel"] == self._finish_channel: + if self._pubsub.subscription_count: + self._pubsub.unsubscribe() + elif type == 'message': + self._handle_message(**event) + elif type == 'pmessage': + self._handle_pmessage(**event) + + self.finished.set() + + def _handle_message(self, channel, data, pattern=None): + if channel == 'newfeed': + #feed created event + name, _ = data.split('\x00') + self._pubsub.subscribe(self._channels_for_feed(name)) + self.emit("create", name) + + elif channel == 'delfeed': + #feed destroyed event + name, _ = data.split('\x00') + try: + del self._feeds[name] + except: + pass + self.emit("delete", name) + + elif channel == 'conffeed': + feed, _ = data.split('\x00', 1) + self.emit("config:"+feed, None) + + elif channel.startswith('feed.publish'): + #feed publish event + id, item = data.split('\x00', 1) + self.emit("publish", channel.split(':', 1)[-1], item, id) + + elif channel.startswith('feed.edit'): + #feed publish event + id, item = data.split('\x00', 1) + self.emit("edit", channel.split(':', 1)[-1], item, id) + + elif channel.startswith('feed.retract'): + self.emit("retract", channel.split(':', 1)[-1], data) + + elif channel.startswith('feed.position'): + id, rel_id = data.split('\x00', 1) + self.emit("position", channel.split(':', 1)[-1], id, rel_id) + + elif channel.startswith('job.finish'): + id, result = data.split('\x00', 1) + self.emit("finish", channel.split(':', 1)[-1], id, result) + + def emit(self, event, *args): + with self.lock: + for handler in self.handlers.get(event, []): + handler(*args) - Arguments: - feed -- The name of the feed. - item -- The content of the published item. - id -- The ID of the published item. + def register_handler(self, name, handler): """ - self[feed].event_publish(id, item) - for handler in self.handlers['publish_notice']: - handler(feed, item, id) + Register a function to respond to feed events. - def retract_notice(self, feed, id): - """ - Generate a notice that an item has been retracted from a feed, and - execute any relevant event handlers. + Event types: + - create_notice + - delete_notice + - publish_notice + - retract_notice + - position_notice Arguments: - feed -- The name of the feed. - id -- The ID of the retracted item. + name -- The name of the feed event. + handler -- The function for handling the event. """ - self[feed].event_retract(id) - for handler in self.handlers['retract_notice']: - handler(feed, id) + with self.lock: + if name not in self.handlers: + self.handlers[name] = [] + self.handlers[name].append(handler) - def position_notice(self, feed, id, rel_id): + def remove_handler(self, name, handler): """ - Generate a notice that an item has been moved, and - execute any relevant event handlers. + Unregister a function that was registered via register_handler Arguments: - feed -- The name of the feed. - id -- The ID of the moved item. - rel_id -- Where the item was moved, in relation to - existing items. + name -- The name of the feed event. + handler -- The function for handling the event. """ - for handler in self.handlers['position_notice']: - handler(feed, id, rel_id) + with self.lock: + try: + self.handlers[name].remove(handler) + except (KeyError, ValueError): + pass