diff --git a/README.rst b/README.rst index f40d5fc..fee2cd0 100644 --- a/README.rst +++ b/README.rst @@ -61,6 +61,10 @@ After install used it from console: not specified, all data types will be returned. Allowed values arestring, hash, list, set, zset -f --format TYPE Output type format: json or text (by default) + --max-key-depth MAX_DEPTH Set the maximum depth to consider when + consolidating key segments (split by ":"). Any + segments past this depth will be squished into + a single "*". If you have large database try running first with ``--limit`` option to run first limited amount of keys. Also run with ``--types`` to limit @@ -71,6 +75,29 @@ useful. You can choose what kind of data would be aggregated from Redis node using ``-b (--behaviour)`` option as console argument. Supported behaviours are 'global', 'scanner', 'ram' and 'all'. +The tool attempts to detect keys structured in a hierarchy (with ":" as the +separating character) and consolidate them. Segments that look to be variable +(currently any segments containing numbers) are consolidated together and +represented with a single "*". The --max-key-depth argument allows you to +specify how many levels should be considered before consolidating all of the +remaining elements with a single "*". + +For example, if your keys are: + + PREFIX --> PREFIX + PREFIX:abc123 --> PREFIX:* + PREFIX:def987 --> PREFIX:* + PREFIX:abc123:SUFFIX --> PREFIX:*:SUFFIX + PREFIX:def987:SUFFIX --> PREFIX:*:SUFFIX + +However, if you pass --max-key-depth 1, these would be consolidated into: + + PREFIX --> PREFIX + PREFIX:abc123 --> PREFIX:* + PREFIX:def987 --> PREFIX:* + PREFIX:abc123:SUFFIX --> PREFIX:* + PREFIX:def987:SUFFIX --> PREFIX:* + Internals --------- diff --git a/rma/application.py b/rma/application.py index 6dc74ed..d893b95 100644 --- a/rma/application.py +++ b/rma/application.py @@ -79,7 +79,7 @@ class RmaApplication(object): REDIS_TYPE_ID_ZSET: [], } - def __init__(self, host="127.0.0.1", port=6367, password=None, db=0, match="*", limit=0, filters=None, logger=None, format="text"): + def __init__(self, host="127.0.0.1", port=6367, password=None, db=0, match="*", limit=0, filters=None, logger=None, format="text", max_key_depth=None): self.logger = logger or logging.getLogger(__name__) self.splitter = SimpleSplitter() @@ -89,6 +89,7 @@ def __init__(self, host="127.0.0.1", port=6367, password=None, db=0, match="*", self.match = match self.limit = limit if limit != 0 else sys.maxsize + self.max_key_depth = max_key_depth if 'types' in filters: self.types = list(map(redis_type_to_id, filters['types'])) @@ -124,7 +125,7 @@ def run(self): is_all = self.behaviour == 'all' with Scanner(redis=self.redis, match=self.match, accepted_types=self.types) as scanner: keys = defaultdict(list) - for v in scanner.scan(limit=self.limit): + for v in scanner.scan(limit=self.limit, calculate_sizes=(is_all or self.behaviour == 'ram')): keys[v["type"]].append(v) if self.isTextFormat: @@ -178,7 +179,7 @@ def do_ram(self, res): return {"stat": ret} def get_pattern_aggregated_data(self, data): - split_patterns = self.splitter.split((ptransform(obj["name"]) for obj in data)) + split_patterns = self.splitter.split([ptransform(obj["name"]) for obj in data], max_depth=self.max_key_depth) self.logger.debug(split_patterns) aggregate_patterns = {item: [] for item in split_patterns} diff --git a/rma/cli/rma_cli.py b/rma/cli/rma_cli.py index 4bf6136..1463843 100644 --- a/rma/cli/rma_cli.py +++ b/rma/cli/rma_cli.py @@ -70,6 +70,12 @@ def main(): dest="format", default="text", help="Output type format: json or text (by default)") + parser.add_argument("--max-key-depth", + dest="max_key_depth", + default="0", + type=int, + help="""Set the maximum depth to consider when consolidating key segments (split by ":"). Any segments past + this depth will be squished into a single "*".""") options = parser.parse_args() @@ -91,7 +97,8 @@ def main(): filters['types'].append(x) app = RmaApplication(host=options.host, port=options.port, db=options.db, password=options.password, - match=options.match, limit=options.limit, filters=filters, format=options.format) + match=options.match, limit=options.limit, filters=filters, format=options.format, + max_key_depth=options.max_key_depth) start_time = time.clock() app.run() diff --git a/rma/redis.py b/rma/redis.py index 331b134..a317437 100644 --- a/rma/redis.py +++ b/rma/redis.py @@ -98,16 +98,16 @@ def ziplist_overhead(size): return Jemalloc.align(12 + 21 * size) -def size_of_ziplist_aligned_string(value): - # Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string - # or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5% - try: - num_value = int(value) - return Jemalloc.align(size_of_pointer_fn()) - except ValueError: - pass +def size_of_ziplist_aligned_string(value_length): +# # Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string +# # or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5% +# try: +# num_value = int(value) +# return Jemalloc.align(size_of_pointer_fn()) +# except ValueError: +# pass - return Jemalloc.align(len(value)) + return Jemalloc.align(value_length) def linkedlist_overhead(): @@ -121,9 +121,8 @@ def linkedlist_entry_overhead(): return 3*size_of_pointer_fn() -def size_of_linkedlist_aligned_string(value): - return Jemalloc.align(linkedlist_entry_overhead() + len(value)) - +def size_of_linkedlist_aligned_string(value_length): + return Jemalloc.align(linkedlist_entry_overhead() + value_length) def intset_overhead(size): # typedef struct intset { diff --git a/rma/rule/List.py b/rma/rule/List.py index b2ecadd..0a167aa 100644 --- a/rma/rule/List.py +++ b/rma/rule/List.py @@ -15,20 +15,20 @@ def __init__(self, info, redis): key_name = info["name"] self.encoding = info['encoding'] self.ttl = info['ttl'] + self.entry_lengths = info['len'] - self.values = redis.lrange(key_name, 0, -1) - self.count = len(self.values) + self.count = len(self.entry_lengths) import time time.sleep(0.001) - used_bytes_iter, min_iter, max_iter = tee((len(x) for x in self.values), 3) + used_bytes_iter, min_iter, max_iter = tee(self.entry_lengths, 3) if self.encoding == REDIS_ENCODING_ID_LINKEDLIST: self.system = dict_overhead(self.count) - self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.values)) + self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.entry_lengths)) elif self.encoding == REDIS_ENCODING_ID_ZIPLIST or self.encoding == REDIS_ENCODING_ID_QUICKLIST: # Undone `quicklist` self.system = ziplist_overhead(self.count) - self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.values)) + self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.entry_lengths)) else: raise Exception('Panic', 'Unknown encoding %s in %s' % (self.encoding, key_name)) diff --git a/rma/rule/ValueString.py b/rma/rule/ValueString.py index 4e7affb..c8b4809 100644 --- a/rma/rule/ValueString.py +++ b/rma/rule/ValueString.py @@ -36,7 +36,7 @@ def __init__(self, redis, info, use_debug=True): self.logger = logging.getLogger(__name__) if self.encoding == REDIS_ENCODING_ID_INT: - self.useful_bytes = self.get_int_encoded_bytes(redis, key_name) + self.useful_bytes = info["len"] self.free_bytes = 0 self.aligned = size_of_aligned_string_by_size(self.useful_bytes, encoding=self.encoding) elif self.encoding == REDIS_ENCODING_ID_EMBSTR or self.encoding == REDIS_ENCODING_ID_RAW: @@ -45,7 +45,7 @@ def __init__(self, redis, info, use_debug=True): self.useful_bytes = sdslen_response['val_sds_len'] self.free_bytes = sdslen_response['val_sds_avail'] else: - self.useful_bytes = size_of_aligned_string_by_size(redis.strlen(key_name), self.encoding) + self.useful_bytes = info["len"] self.free_bytes = 0 # INFO Rewrite this to support Redis >= 3.2 sds dynamic header sds_len = 8 + self.useful_bytes + self.free_bytes + 1 diff --git a/rma/scanner.py b/rma/scanner.py index 79258c7..440ac0a 100644 --- a/rma/scanner.py +++ b/rma/scanner.py @@ -28,14 +28,42 @@ def __init__(self, redis, match="*", accepted_types=None): self.pipeline_mode = False self.resolve_types_script = self.redis.register_script(""" local ret = {} + for i = 1, #KEYS do local type = redis.call("TYPE", KEYS[i]) - local encoding = redis.call("OBJECT", "ENCODING",KEYS[i]) - local ttl = redis.call("TTL", KEYS[i]) - ret[i] = {type["ok"], encoding, ttl} + + if type["ok"] == 'none' then + ret[i] = {} + else + local encoding = redis.call("OBJECT", "ENCODING",KEYS[i]) + local ttl = redis.call("TTL", KEYS[i]) + local len = 0 + + -- "calculate_sizes" option + if ARGV[1] == '1' then + if encoding == 'embstr' or encoding == 'raw' then + len = redis.call("STRLEN", KEYS[i]) + elseif encoding == 'int' then + local val = redis.call("GET", KEYS[i]) + if tonumber(val) < %(REDIS_SHARED_INTEGERS)d then + len = 0 + else + len = %(SIZE_OF_POINTER_FN)d + end + elseif encoding == 'linkedlist' or encoding == 'quicklist' or encoding == 'skiplist' then + local items = redis.call("lrange", KEYS[i], 0, -1) + len = {} + for j = 1, #items do + len[j] = #items[j] + end + end + end + + ret[i] = {type["ok"], encoding, ttl, len} + end end return cmsgpack.pack(ret) - """) + """ % { "SIZE_OF_POINTER_FN": size_of_pointer_fn(), "REDIS_SHARED_INTEGERS": REDIS_SHARED_INTEGERS }) def __enter__(self): return self @@ -43,7 +71,7 @@ def __enter__(self): def __exit__(self, *exc): return False - def batch_scan(self, count=1000, batch_size=3000): + def batch_scan(self, count=1000, batch_size=3000, calculate_sizes=True): ret = [] for key in self.redis.scan_iter(self.match, count=count): ret.append(key) @@ -51,12 +79,12 @@ def batch_scan(self, count=1000, batch_size=3000): yield from self.resolve_types(ret) if len(ret): - yield from self.resolve_types(ret) + yield from self.resolve_types(ret, calculate_sizes) - def resolve_types(self, ret): + def resolve_types(self, ret, calculate_sizes=True): if not self.pipeline_mode: try: - key_with_types = msgpack.unpackb(self.resolve_types_script(ret)) + key_with_types = msgpack.unpackb(self.resolve_types_script(ret, [( '1' if calculate_sizes else '0' )] )) except ResponseError as e: if "CROSSSLOT" not in repr(e): raise e @@ -79,14 +107,16 @@ def resolve_with_pipe(self, ret): key_with_types = [{'type': x, 'encoding': y, 'ttl': z} for x, y, z in chunker(pipe.execute(), 3)] return key_with_types - def scan(self, limit=1000): + def scan(self, limit=1000, calculate_sizes=True): with tqdm(total=min(limit, self.redis.dbsize()), desc="Match {0}".format(self.match), miniters=1000) as progress: total = 0 for key_tuple in self.batch_scan(): key_info, key_name = key_tuple - key_type, key_encoding, key_ttl = key_info + if len(key_info) == 0: + continue # key deleted between scan and check + key_type, key_encoding, key_ttl, key_len = key_info if not key_name: self.logger.warning( '\r\nWarning! Scan iterator return key with empty name `` and type %s', key_type) @@ -98,7 +128,8 @@ def scan(self, limit=1000): 'name': key_name.decode("utf-8", "replace"), 'type': to_id, 'encoding': redis_encoding_str_to_id(key_encoding), - 'ttl': key_ttl + 'ttl': key_ttl, + 'len': key_len } yield key_info_obj diff --git a/rma/splitter.py b/rma/splitter.py index 309ae8e..95192ed 100644 --- a/rma/splitter.py +++ b/rma/splitter.py @@ -24,10 +24,20 @@ def map_part_to_glob(index, part): return part +def pass1_func( separator, max_depth ): + def f(x): + ret = [] + for i,y in enumerate(x.split(separator)): + globbed = map_part_to_glob(i, y) + ret.append( globbed ) + if max_depth and len(ret) > max_depth: + ret = ret[0:max_depth] + ["*"] + return ret + return f class SimpleSplitter(object): - def split(self, data, separator=":"): - pass1 = map(lambda x: list(map_part_to_glob(i, y) for i, y in enumerate(x.split(separator))), data) + def split(self, data, separator=":", max_depth=None): + pass1 = map(pass1_func(separator, max_depth), data) pass2 = self.fold_to_tree(pass1) return self.unfold_to_list(pass2, separator)