Source code for locomotor.identify

import ast
import itertools
import sully
import types

#: Method names used when trying to identify Redis client objects
REDIS_METHODS = set(['append', 'blpop', 'brpop', 'brpoplpush', 'decr',
                     'delete', 'execute', 'exists', 'expire', 'expireat',
                     'get', 'getbit', 'getset', 'hdel', 'hget', 'hgetall',
                     'hincrby', 'hkeys', 'hlen', 'hmget', 'hmset', 'hset',
                     'hsetnx', 'hvals', 'incr', 'lindex', 'linsert', 'llen',
                     'lpop', 'lpush', 'lpushnx', 'lrange', 'lrem', 'lset',
                     'ltrim', 'mget', 'move', 'mset', 'mset', 'msetnx',
                     'persist', 'publish', 'randomkey', 'rename', 'renamenx',
                     'rpop', 'rpoplpush', 'rpush', 'rpushx', 'sadd', 'scard',
                     'sdiff', 'sdiffstore', 'set', 'setbit', 'setex', 'setnx',
                     'setrange', 'sinter', 'sinterstore', 'sismember',
                     'smembers', 'smove', 'sort', 'spop', 'srandmember',
                     'srem', 'strlen', 'substr', 'sunion', 'sunionstore',
                     'ttl', 'zadd', 'zcard', 'zincrby', 'zinterstore',
                     'zrange', 'zrangebyscore', 'zrank', 'zrem',
                     'zremrangebyrank', 'zrevrange', 'zrevrangebyscore',
                     'zrevrank', 'zrevscore', 'zunionstore'])

#: The minimum number of methods which must be identified as Redis
#: calls to denote an object as corresponding to a Redis client
REDIS_METHOD_COUNT = 2

#: The percentage of method calls which must match a predefined list
REDIS_METHOD_PCT = 0.8


[docs]def identify_redis_objs(func): """Identify objects likely to be used to access Redis in the code""" redis_func_objs = [] nonredis_func_objs = [] func_ast = sully.get_func_ast(func) node_walkers = (ast.walk(func_node) for func_node in func_ast) for node in itertools.chain.from_iterable(node_walkers): # Skip any nodes which are not function calls on objects if not (isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute)): continue # Record all function calls if node.func.attr in REDIS_METHODS: redis_func_objs.append(node.func.value) else: nonredis_func_objs.append(node.func.value) # Loop through all the found function objects to pick # out the ones we deem to represent Redis interfaces redis_objs = [] while len(redis_func_objs) > 0: obj = redis_func_objs.pop() # Remove all call nodes matching this object redis_before = len(redis_func_objs) + 1 redis_func_objs = [obj2 for obj2 in redis_func_objs if not sully.nodes_equal(obj, obj2)] nonredis_before = len(nonredis_func_objs) nonredis_func_objs = [obj2 for obj2 in nonredis_func_objs if not sully.nodes_equal(obj, obj2)] # If the object meets a threshold of calls for the object # and a certain percentage of all calls match, record it redis_calls = redis_before - len(redis_func_objs) nonredis_calls = nonredis_before - len(nonredis_func_objs) if redis_calls >= REDIS_METHOD_COUNT and \ (redis_calls * 1.0 / (redis_calls + nonredis_calls)) >= REDIS_METHOD_PCT: redis_objs.append(obj) return redis_objs
[docs]def identify_redis_funcs(cls_or_mod): """Identify functions in a class or module which use Redis""" redis_funcs = {} for obj in dir(cls_or_mod): # Skip things which look private if obj.startswith('_'): continue val = getattr(cls_or_mod, obj) if isinstance(val, (type, types.ClassType)): # Recursively check all classes class_funcs = identify_redis_funcs(val) redis_funcs.update(class_funcs) elif isinstance(val, (types.FunctionType, types.MethodType)): # Identify Redis objects within the function objs = identify_redis_objs(val) if len(objs) > 0: redis_funcs[val] = objs return redis_funcs