From cd0b460f5ccacdb69561791202655284b5752be1 Mon Sep 17 00:00:00 2001 From: Michael van Bracht Date: Thu, 24 May 2012 11:16:21 +0200 Subject: [PATCH 1/2] remove superfluous whitespace --- thoonk/feeds/job.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/thoonk/feeds/job.py b/thoonk/feeds/job.py index c439d78..18d15de 100644 --- a/thoonk/feeds/job.py +++ b/thoonk/feeds/job.py @@ -90,8 +90,8 @@ def __init__(self, thoonk, feed): self.feed_claimed = 'feed.claimed:%s' % feed self.feed_stalled = 'feed.stalled:%s' % feed self.feed_running = 'feed.running:%s' % feed - - self.job_finish = 'job.finish:%s' % feed + + self.job_finish = 'job.finish:%s' % feed def get_channels(self): return (self.feed_publishes, self.feed_claimed, self.feed_stalled, @@ -127,7 +127,7 @@ def _retract(pipe): pipe.srem(self.feed_stalled, id) pipe.zrem(self.feed_claimed, id) pipe.lrem(self.feed_ids, 1, id) - + self.redis.transaction(_retract, self.feed_items) def put(self, item, priority=False): @@ -172,7 +172,7 @@ def get(self, timeout=0): Arguments: timeout -- Optional time in seconds to wait before raising an exception. - + Returns: id -- The id of the job job -- The job content @@ -188,14 +188,14 @@ def get(self, timeout=0): pipe.hget(self.feed_items, id) pipe.hget(self.feed_cancelled, id) result = pipe.execute() - + self.thoonk._publish(self.feed_claimed, (id,)) return id, result[1], 0 if result[2] is None else int(result[2]) def get_failure_count(self, id): return int(self.redis.hget(self.feed_cancelled, id) or 0) - + NO_RESULT = [] def finish(self, id, result=NO_RESULT): """ @@ -216,7 +216,7 @@ def _finish(pipe): if result is not self.NO_RESULT: self.thoonk._publish(self.job_finish, (id, result), pipe) pipe.hdel(self.feed_items, id) - + self.redis.transaction(_finish, self.feed_claimed) def cancel(self, id): @@ -233,7 +233,7 @@ def _cancel(pipe): pipe.hincrby(self.feed_cancelled, id, 1) pipe.lpush(self.feed_ids, id) pipe.zrem(self.feed_claimed, id) - + self.redis.transaction(_cancel, self.feed_claimed) def stall(self, id): @@ -253,7 +253,7 @@ def _stall(pipe): pipe.hdel(self.feed_cancelled, id) pipe.sadd(self.feed_stalled, id) pipe.zrem(self.feed_published, id) - + self.redis.transaction(_stall, self.feed_claimed) def retry(self, id): @@ -270,7 +270,7 @@ def _retry(pipe): pipe.srem(self.feed_stalled, id) pipe.lpush(self.feed_ids, id) pipe.zadd(self.feed_published, **{id: time.time()}) - + results = self.redis.transaction(_retry, self.feed_stalled) if not results[0]: return # raise exception? From 7fcb5200cc907f354cd2b4dbc59c8829bf6f89d7 Mon Sep 17 00:00:00 2001 From: Michael van Bracht Date: Thu, 21 Jun 2012 12:08:03 +0200 Subject: [PATCH 2/2] fix docs - high priority Job.put() also increments feed.publishes:[feed] --- contract.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/contract.txt b/contract.txt index 9b885cf..d8e224a 100644 --- a/contract.txt +++ b/contract.txt @@ -199,6 +199,7 @@ Job: // id = generated uuid MULTI RPUSH feed.ids:[feed] [id] + INCR feed.publishes:[feed] HSET feed.items:[feed] [id] [item] ZADD feed.published:[feed] [utc epoch milliseconds] [id] EXEC @@ -284,4 +285,4 @@ Job: LPUSH feed.ids:[feed] [key] check claimed jobs to see if any have been claimed too long and "Cancel" or "Stall" them - publish stats to a feed \ No newline at end of file + publish stats to a feed