Skip to content

Add "--max-key-depth" argument to help manage large amounts of key nesting #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ After install used it from console:
not specified, all data types will be returned.
Allowed values arestring, hash, list, set, zset
-f --format TYPE Output type format: json or text (by default)
--max-key-depth MAX_DEPTH Set the maximum depth to consider when
consolidating key segments (split by ":"). Any
segments past this depth will be squished into
a single "*".

If you have large database try running first with ``--limit`` option to
run first limited amount of keys. Also run with ``--types`` to limit
Expand All @@ -71,6 +75,29 @@ useful. You can choose what kind of data would be aggregated from Redis
node using ``-b (--behaviour)`` option as console argument. Supported
behaviours are 'global', 'scanner', 'ram' and 'all'.

The tool attempts to detect keys structured in a hierarchy (with ":" as the
separating character) and consolidate them. Segments that look to be variable
(currently any segments containing numbers) are consolidated together and
represented with a single "*". The --max-key-depth argument allows you to
specify how many levels should be considered before consolidating all of the
remaining elements with a single "*".

For example, if your keys are:

PREFIX --> PREFIX
PREFIX:abc123 --> PREFIX:*
PREFIX:def987 --> PREFIX:*
PREFIX:abc123:SUFFIX --> PREFIX:*:SUFFIX
PREFIX:def987:SUFFIX --> PREFIX:*:SUFFIX

However, if you pass --max-key-depth 1, these would be consolidated into:

PREFIX --> PREFIX
PREFIX:abc123 --> PREFIX:*
PREFIX:def987 --> PREFIX:*
PREFIX:abc123:SUFFIX --> PREFIX:*
PREFIX:def987:SUFFIX --> PREFIX:*

Internals
---------

Expand Down
7 changes: 4 additions & 3 deletions rma/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class RmaApplication(object):
REDIS_TYPE_ID_ZSET: [],
}

def __init__(self, host="127.0.0.1", port=6367, password=None, db=0, match="*", limit=0, filters=None, logger=None, format="text"):
def __init__(self, host="127.0.0.1", port=6367, password=None, db=0, match="*", limit=0, filters=None, logger=None, format="text", max_key_depth=None):
self.logger = logger or logging.getLogger(__name__)

self.splitter = SimpleSplitter()
Expand All @@ -89,6 +89,7 @@ def __init__(self, host="127.0.0.1", port=6367, password=None, db=0, match="*",

self.match = match
self.limit = limit if limit != 0 else sys.maxsize
self.max_key_depth = max_key_depth

if 'types' in filters:
self.types = list(map(redis_type_to_id, filters['types']))
Expand Down Expand Up @@ -124,7 +125,7 @@ def run(self):
is_all = self.behaviour == 'all'
with Scanner(redis=self.redis, match=self.match, accepted_types=self.types) as scanner:
keys = defaultdict(list)
for v in scanner.scan(limit=self.limit):
for v in scanner.scan(limit=self.limit, calculate_sizes=(is_all or self.behaviour == 'ram')):
keys[v["type"]].append(v)

if self.isTextFormat:
Expand Down Expand Up @@ -178,7 +179,7 @@ def do_ram(self, res):
return {"stat": ret}

def get_pattern_aggregated_data(self, data):
split_patterns = self.splitter.split((ptransform(obj["name"]) for obj in data))
split_patterns = self.splitter.split([ptransform(obj["name"]) for obj in data], max_depth=self.max_key_depth)
self.logger.debug(split_patterns)

aggregate_patterns = {item: [] for item in split_patterns}
Expand Down
9 changes: 8 additions & 1 deletion rma/cli/rma_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def main():
dest="format",
default="text",
help="Output type format: json or text (by default)")
parser.add_argument("--max-key-depth",
dest="max_key_depth",
default="0",
type=int,
help="""Set the maximum depth to consider when consolidating key segments (split by ":"). Any segments past
this depth will be squished into a single "*".""")

options = parser.parse_args()

Expand All @@ -91,7 +97,8 @@ def main():
filters['types'].append(x)

app = RmaApplication(host=options.host, port=options.port, db=options.db, password=options.password,
match=options.match, limit=options.limit, filters=filters, format=options.format)
match=options.match, limit=options.limit, filters=filters, format=options.format,
max_key_depth=options.max_key_depth)

start_time = time.clock()
app.run()
Expand Down
23 changes: 11 additions & 12 deletions rma/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ def ziplist_overhead(size):
return Jemalloc.align(12 + 21 * size)


def size_of_ziplist_aligned_string(value):
# Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string
# or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5%
try:
num_value = int(value)
return Jemalloc.align(size_of_pointer_fn())
except ValueError:
pass
def size_of_ziplist_aligned_string(value_length):
# # Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string
# # or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5%
# try:
# num_value = int(value)
# return Jemalloc.align(size_of_pointer_fn())
# except ValueError:
# pass

return Jemalloc.align(len(value))
return Jemalloc.align(value_length)


def linkedlist_overhead():
Expand All @@ -121,9 +121,8 @@ def linkedlist_entry_overhead():
return 3*size_of_pointer_fn()


def size_of_linkedlist_aligned_string(value):
return Jemalloc.align(linkedlist_entry_overhead() + len(value))

def size_of_linkedlist_aligned_string(value_length):
return Jemalloc.align(linkedlist_entry_overhead() + value_length)

def intset_overhead(size):
# typedef struct intset {
Expand Down
10 changes: 5 additions & 5 deletions rma/rule/List.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ def __init__(self, info, redis):
key_name = info["name"]
self.encoding = info['encoding']
self.ttl = info['ttl']
self.entry_lengths = info['len']

self.values = redis.lrange(key_name, 0, -1)
self.count = len(self.values)
self.count = len(self.entry_lengths)
import time
time.sleep(0.001)
used_bytes_iter, min_iter, max_iter = tee((len(x) for x in self.values), 3)
used_bytes_iter, min_iter, max_iter = tee(self.entry_lengths, 3)

if self.encoding == REDIS_ENCODING_ID_LINKEDLIST:
self.system = dict_overhead(self.count)
self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.values))
self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.entry_lengths))
elif self.encoding == REDIS_ENCODING_ID_ZIPLIST or self.encoding == REDIS_ENCODING_ID_QUICKLIST:
# Undone `quicklist`
self.system = ziplist_overhead(self.count)
self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.values))
self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.entry_lengths))
else:
raise Exception('Panic', 'Unknown encoding %s in %s' % (self.encoding, key_name))

Expand Down
4 changes: 2 additions & 2 deletions rma/rule/ValueString.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, redis, info, use_debug=True):
self.logger = logging.getLogger(__name__)

if self.encoding == REDIS_ENCODING_ID_INT:
self.useful_bytes = self.get_int_encoded_bytes(redis, key_name)
self.useful_bytes = info["len"]
self.free_bytes = 0
self.aligned = size_of_aligned_string_by_size(self.useful_bytes, encoding=self.encoding)
elif self.encoding == REDIS_ENCODING_ID_EMBSTR or self.encoding == REDIS_ENCODING_ID_RAW:
Expand All @@ -45,7 +45,7 @@ def __init__(self, redis, info, use_debug=True):
self.useful_bytes = sdslen_response['val_sds_len']
self.free_bytes = sdslen_response['val_sds_avail']
else:
self.useful_bytes = size_of_aligned_string_by_size(redis.strlen(key_name), self.encoding)
self.useful_bytes = info["len"]
self.free_bytes = 0
# INFO Rewrite this to support Redis >= 3.2 sds dynamic header
sds_len = 8 + self.useful_bytes + self.free_bytes + 1
Expand Down
53 changes: 42 additions & 11 deletions rma/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,63 @@ def __init__(self, redis, match="*", accepted_types=None):
self.pipeline_mode = False
self.resolve_types_script = self.redis.register_script("""
local ret = {}

for i = 1, #KEYS do
local type = redis.call("TYPE", KEYS[i])
local encoding = redis.call("OBJECT", "ENCODING",KEYS[i])
local ttl = redis.call("TTL", KEYS[i])
ret[i] = {type["ok"], encoding, ttl}

if type["ok"] == 'none' then
ret[i] = {}
else
local encoding = redis.call("OBJECT", "ENCODING",KEYS[i])
local ttl = redis.call("TTL", KEYS[i])
local len = 0

-- "calculate_sizes" option
if ARGV[1] == '1' then
if encoding == 'embstr' or encoding == 'raw' then
len = redis.call("STRLEN", KEYS[i])
elseif encoding == 'int' then
local val = redis.call("GET", KEYS[i])
if tonumber(val) < %(REDIS_SHARED_INTEGERS)d then
len = 0
else
len = %(SIZE_OF_POINTER_FN)d
end
elseif encoding == 'linkedlist' or encoding == 'quicklist' or encoding == 'skiplist' then
local items = redis.call("lrange", KEYS[i], 0, -1)
len = {}
for j = 1, #items do
len[j] = #items[j]
end
end
end

ret[i] = {type["ok"], encoding, ttl, len}
end
end
return cmsgpack.pack(ret)
""")
""" % { "SIZE_OF_POINTER_FN": size_of_pointer_fn(), "REDIS_SHARED_INTEGERS": REDIS_SHARED_INTEGERS })

def __enter__(self):
return self

def __exit__(self, *exc):
return False

def batch_scan(self, count=1000, batch_size=3000):
def batch_scan(self, count=1000, batch_size=3000, calculate_sizes=True):
ret = []
for key in self.redis.scan_iter(self.match, count=count):
ret.append(key)
if len(ret) == batch_size:
yield from self.resolve_types(ret)

if len(ret):
yield from self.resolve_types(ret)
yield from self.resolve_types(ret, calculate_sizes)

def resolve_types(self, ret):
def resolve_types(self, ret, calculate_sizes=True):
if not self.pipeline_mode:
try:
key_with_types = msgpack.unpackb(self.resolve_types_script(ret))
key_with_types = msgpack.unpackb(self.resolve_types_script(ret, [( '1' if calculate_sizes else '0' )] ))
except ResponseError as e:
if "CROSSSLOT" not in repr(e):
raise e
Expand All @@ -79,14 +107,16 @@ def resolve_with_pipe(self, ret):
key_with_types = [{'type': x, 'encoding': y, 'ttl': z} for x, y, z in chunker(pipe.execute(), 3)]
return key_with_types

def scan(self, limit=1000):
def scan(self, limit=1000, calculate_sizes=True):
with tqdm(total=min(limit, self.redis.dbsize()), desc="Match {0}".format(self.match),
miniters=1000) as progress:

total = 0
for key_tuple in self.batch_scan():
key_info, key_name = key_tuple
key_type, key_encoding, key_ttl = key_info
if len(key_info) == 0:
continue # key deleted between scan and check
key_type, key_encoding, key_ttl, key_len = key_info
if not key_name:
self.logger.warning(
'\r\nWarning! Scan iterator return key with empty name `` and type %s', key_type)
Expand All @@ -98,7 +128,8 @@ def scan(self, limit=1000):
'name': key_name.decode("utf-8", "replace"),
'type': to_id,
'encoding': redis_encoding_str_to_id(key_encoding),
'ttl': key_ttl
'ttl': key_ttl,
'len': key_len
}
yield key_info_obj

Expand Down
14 changes: 12 additions & 2 deletions rma/splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,20 @@ def map_part_to_glob(index, part):

return part

def pass1_func( separator, max_depth ):
def f(x):
ret = []
for i,y in enumerate(x.split(separator)):
globbed = map_part_to_glob(i, y)
ret.append( globbed )
if max_depth and len(ret) > max_depth:
ret = ret[0:max_depth] + ["*"]
return ret
return f

class SimpleSplitter(object):
def split(self, data, separator=":"):
pass1 = map(lambda x: list(map_part_to_glob(i, y) for i, y in enumerate(x.split(separator))), data)
def split(self, data, separator=":", max_depth=None):
pass1 = map(pass1_func(separator, max_depth), data)
pass2 = self.fold_to_tree(pass1)
return self.unfold_to_list(pass2, separator)

Expand Down