Skip to content

Move calculation of size data for string, int, and list types into resolve_types LUA script #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rma/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def run(self):
is_all = self.behaviour == 'all'
with Scanner(redis=self.redis, match=self.match, accepted_types=self.types) as scanner:
keys = defaultdict(list)
for v in scanner.scan(limit=self.limit):
for v in scanner.scan(limit=self.limit, calculate_sizes=(is_all or self.behaviour == 'ram')):
keys[v["type"]].append(v)

if self.isTextFormat:
Expand Down
23 changes: 11 additions & 12 deletions rma/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ def ziplist_overhead(size):
return Jemalloc.align(12 + 21 * size)


def size_of_ziplist_aligned_string(value):
# Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string
# or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5%
try:
num_value = int(value)
return Jemalloc.align(size_of_pointer_fn())
except ValueError:
pass
def size_of_ziplist_aligned_string(value_length):
# # Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string
# # or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5%
# try:
# num_value = int(value)
# return Jemalloc.align(size_of_pointer_fn())
# except ValueError:
# pass

return Jemalloc.align(len(value))
return Jemalloc.align(value_length)


def linkedlist_overhead():
Expand All @@ -121,9 +121,8 @@ def linkedlist_entry_overhead():
return 3*size_of_pointer_fn()


def size_of_linkedlist_aligned_string(value):
return Jemalloc.align(linkedlist_entry_overhead() + len(value))

def size_of_linkedlist_aligned_string(value_length):
return Jemalloc.align(linkedlist_entry_overhead() + value_length)

def intset_overhead(size):
# typedef struct intset {
Expand Down
10 changes: 5 additions & 5 deletions rma/rule/List.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ def __init__(self, info, redis):
key_name = info["name"]
self.encoding = info['encoding']
self.ttl = info['ttl']
self.entry_lengths = info['len']

self.values = redis.lrange(key_name, 0, -1)
self.count = len(self.values)
self.count = len(self.entry_lengths)
import time
time.sleep(0.001)
used_bytes_iter, min_iter, max_iter = tee((len(x) for x in self.values), 3)
used_bytes_iter, min_iter, max_iter = tee(self.entry_lengths, 3)

if self.encoding == REDIS_ENCODING_ID_LINKEDLIST:
self.system = dict_overhead(self.count)
self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.values))
self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.entry_lengths))
elif self.encoding == REDIS_ENCODING_ID_ZIPLIST or self.encoding == REDIS_ENCODING_ID_QUICKLIST:
# Undone `quicklist`
self.system = ziplist_overhead(self.count)
self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.values))
self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.entry_lengths))
else:
raise Exception('Panic', 'Unknown encoding %s in %s' % (self.encoding, key_name))

Expand Down
4 changes: 2 additions & 2 deletions rma/rule/ValueString.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, redis, info, use_debug=True):
self.logger = logging.getLogger(__name__)

if self.encoding == REDIS_ENCODING_ID_INT:
self.useful_bytes = self.get_int_encoded_bytes(redis, key_name)
self.useful_bytes = info["len"]
self.free_bytes = 0
self.aligned = size_of_aligned_string_by_size(self.useful_bytes, encoding=self.encoding)
elif self.encoding == REDIS_ENCODING_ID_EMBSTR or self.encoding == REDIS_ENCODING_ID_RAW:
Expand All @@ -45,7 +45,7 @@ def __init__(self, redis, info, use_debug=True):
self.useful_bytes = sdslen_response['val_sds_len']
self.free_bytes = sdslen_response['val_sds_avail']
else:
self.useful_bytes = size_of_aligned_string_by_size(redis.strlen(key_name), self.encoding)
self.useful_bytes = info["len"]
self.free_bytes = 0
# INFO Rewrite this to support Redis >= 3.2 sds dynamic header
sds_len = 8 + self.useful_bytes + self.free_bytes + 1
Expand Down
53 changes: 42 additions & 11 deletions rma/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,63 @@ def __init__(self, redis, match="*", accepted_types=None):
self.pipeline_mode = False
self.resolve_types_script = self.redis.register_script("""
local ret = {}

for i = 1, #KEYS do
local type = redis.call("TYPE", KEYS[i])
local encoding = redis.call("OBJECT", "ENCODING",KEYS[i])
local ttl = redis.call("TTL", KEYS[i])
ret[i] = {type["ok"], encoding, ttl}

if type["ok"] == 'none' then
ret[i] = {}
else
local encoding = redis.call("OBJECT", "ENCODING",KEYS[i])
local ttl = redis.call("TTL", KEYS[i])
local len = 0

-- "calculate_sizes" option
if ARGV[1] == '1' then
if encoding == 'embstr' or encoding == 'raw' then
len = redis.call("STRLEN", KEYS[i])
elseif encoding == 'int' then
local val = redis.call("GET", KEYS[i])
if tonumber(val) < %(REDIS_SHARED_INTEGERS)d then
len = 0
else
len = %(SIZE_OF_POINTER_FN)d
end
elseif encoding == 'linkedlist' or encoding == 'quicklist' or encoding == 'skiplist' then
local items = redis.call("lrange", KEYS[i], 0, -1)
len = {}
for j = 1, #items do
len[j] = #items[j]
end
end
end

ret[i] = {type["ok"], encoding, ttl, len}
end
end
return cmsgpack.pack(ret)
""")
""" % { "SIZE_OF_POINTER_FN": size_of_pointer_fn(), "REDIS_SHARED_INTEGERS": REDIS_SHARED_INTEGERS })

def __enter__(self):
return self

def __exit__(self, *exc):
return False

def batch_scan(self, count=1000, batch_size=3000):
def batch_scan(self, count=1000, batch_size=3000, calculate_sizes=True):
ret = []
for key in self.redis.scan_iter(self.match, count=count):
ret.append(key)
if len(ret) == batch_size:
yield from self.resolve_types(ret)

if len(ret):
yield from self.resolve_types(ret)
yield from self.resolve_types(ret, calculate_sizes)

def resolve_types(self, ret):
def resolve_types(self, ret, calculate_sizes=True):
if not self.pipeline_mode:
try:
key_with_types = msgpack.unpackb(self.resolve_types_script(ret))
key_with_types = msgpack.unpackb(self.resolve_types_script(ret, [( '1' if calculate_sizes else '0' )] ))
except ResponseError as e:
if "CROSSSLOT" not in repr(e):
raise e
Expand All @@ -79,14 +107,16 @@ def resolve_with_pipe(self, ret):
key_with_types = [{'type': x, 'encoding': y, 'ttl': z} for x, y, z in chunker(pipe.execute(), 3)]
return key_with_types

def scan(self, limit=1000):
def scan(self, limit=1000, calculate_sizes=True):
with tqdm(total=min(limit, self.redis.dbsize()), desc="Match {0}".format(self.match),
miniters=1000) as progress:

total = 0
for key_tuple in self.batch_scan():
key_info, key_name = key_tuple
key_type, key_encoding, key_ttl = key_info
if len(key_info) == 0:
continue # key deleted between scan and check
key_type, key_encoding, key_ttl, key_len = key_info
if not key_name:
self.logger.warning(
'\r\nWarning! Scan iterator return key with empty name `` and type %s', key_type)
Expand All @@ -98,7 +128,8 @@ def scan(self, limit=1000):
'name': key_name.decode("utf-8", "replace"),
'type': to_id,
'encoding': redis_encoding_str_to_id(key_encoding),
'ttl': key_ttl
'ttl': key_ttl,
'len': key_len
}
yield key_info_obj

Expand Down