diff --git a/client.cpp b/client.cpp index 06912aea..64e84fea 100755 --- a/client.cpp +++ b/client.cpp @@ -275,7 +275,7 @@ int client::connect(void) evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf)); if (m_unix_sockaddr != NULL) { - m_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + m_sockfd = socket(AF_UNIX, m_config->use_udp ? SOCK_DGRAM : SOCK_STREAM, 0); if (m_sockfd < 0) { return -errno; } @@ -300,8 +300,10 @@ int client::connect(void) error = setsockopt(m_sockfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); assert(error == 0); - error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); - assert(error == 0); + if (!m_config->use_udp) { + error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); + assert(error == 0); + } } // set non-blcoking behavior @@ -555,6 +557,7 @@ void client::create_request(void) assert(key != NULL); assert(keylen > 0); + keylen = 6; // FIXME const benchmark_debug_log("GET key=[%.*s]\n", keylen, key); cmd_size = m_protocol->write_command_get(key, keylen, m_config->data_offset); @@ -618,6 +621,12 @@ void client::handle_response(request *request, protocol_response *response) { switch (request->m_type) { case rt_get: + if (m_config->transaction_latency) { + // NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput. + // FIXME might be preferable to print to some user-specified file, rather than to stdout + printf("GET %lu\n", ts_diff_now(request->m_sent_time)); + } + m_stats.update_get_op(NULL, request->m_size + response->get_total_len(), ts_diff_now(request->m_sent_time), @@ -625,6 +634,12 @@ void client::handle_response(request *request, protocol_response *response) request->m_keys - response->get_hits()); break; case rt_set: + if (m_config->transaction_latency) { + // NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput. + // FIXME might be preferable to print to some user-specified file, rather than to stdout + printf("SET %lu\n", ts_diff_now(request->m_sent_time)); + } + m_stats.update_set_op(NULL, request->m_size + response->get_total_len(), ts_diff_now(request->m_sent_time)); diff --git a/config_types.cpp b/config_types.cpp index 8c098404..e95a0d32 100644 --- a/config_types.cpp +++ b/config_types.cpp @@ -187,8 +187,8 @@ const char* config_weight_list::print(char *buf, int buf_len) } -server_addr::server_addr(const char *hostname, int port) : - m_hostname(hostname), m_port(port), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0) +server_addr::server_addr(const char *hostname, int port, transport_protocol protocol) : + m_hostname(hostname), m_port(port), m_protocol(protocol), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0) { int error = resolve(); @@ -215,7 +215,7 @@ int server_addr::resolve(void) memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_PASSIVE; - hints.ai_socktype = SOCK_STREAM; + hints.ai_socktype = m_protocol; hints.ai_family = AF_INET; // Don't play with IPv6 for now... snprintf(port_str, sizeof(port_str)-1, "%u", m_port); diff --git a/config_types.h b/config_types.h index 318549aa..d642386c 100644 --- a/config_types.h +++ b/config_types.h @@ -77,7 +77,8 @@ struct connect_info { }; struct server_addr { - server_addr(const char *hostname, int port); + enum transport_protocol {TCP=SOCK_STREAM, UDP=SOCK_DGRAM}; + server_addr(const char *hostname, int port, transport_protocol proto); virtual ~server_addr(); int get_connect_info(struct connect_info *ci); @@ -88,6 +89,7 @@ struct server_addr { std::string m_hostname; int m_port; + int m_protocol; struct addrinfo *m_server_addr; struct addrinfo *m_used_addr; int m_last_error; diff --git a/libmemcached_protocol/binary.h b/libmemcached_protocol/binary.h index 7cd313ec..eb891c09 100644 --- a/libmemcached_protocol/binary.h +++ b/libmemcached_protocol/binary.h @@ -179,6 +179,22 @@ extern "C" PROTOCOL_BINARY_RAW_BYTES = 0x00 } protocol_binary_datatypes; + /** + * Definition of the extra header that is used when using the + * memcached binary protocol over UDP. This header lies between + * the UDP header and the memcached datagram. + * See https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L922 + */ + typedef union { + struct { + uint16_t request_id; + uint16_t sequence_no; + uint16_t total_datagrams; + uint16_t reserved; // Must be set to 0 + } header; + uint8_t bytes[8]; + } protocol_binary_udp_header; + /** * Definition of the header structure for a request packet. * See section 2 diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index f32ad1d2..62e3cf95 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -119,7 +119,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg) "multi_key_get = %u\n" "authenticate = %s\n" "select-db = %d\n" - "no-expiry = %s\n", + "no-expiry = %s\n" + "transaction_latency = %s\n" + "key-width = %u\n" + "udp = %s\n", cfg->server, cfg->port, cfg->unix_socket, @@ -155,7 +158,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg) cfg->multi_key_get, cfg->authenticate ? cfg->authenticate : "", cfg->select_db, - cfg->no_expiry ? "yes" : "no"); + cfg->no_expiry ? "yes" : "no", + cfg->transaction_latency ? "yes" : "no", + cfg->key_width, + cfg->use_udp ? "yes" : "no"); } static void config_init_defaults(struct benchmark_config *cfg) @@ -196,6 +202,8 @@ static void config_init_defaults(struct benchmark_config *cfg) } if (!cfg->requests && !cfg->test_time) cfg->requests = 10000; + if (!cfg->key_width) + cfg->key_width = OBJECT_GENERATOR_KEY_WIDTH; } static int generate_random_seed() @@ -241,7 +249,10 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf o_generate_keys, o_multi_key_get, o_select_db, - o_no_expiry + o_no_expiry, + o_transaction_latency, + o_key_width, + o_use_udp, }; static struct option long_options[] = { @@ -287,6 +298,9 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf { "no-expiry", 0, 0, o_no_expiry }, { "help", 0, 0, 'h' }, { "version", 0, 0, 'v' }, + { "transaction_latency", 0, 0, o_transaction_latency }, + { "key-width", 1, 0, o_key_width }, + { "udp", 0, 0, o_use_udp }, { NULL, 0, 0, 0 } }; @@ -553,6 +567,22 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf case o_no_expiry: cfg->no_expiry = true; break; + case o_transaction_latency: + cfg->transaction_latency = true; + break; + case o_key_width: + endptr = NULL; + cfg->key_width = (unsigned short) strtoul(optarg, &endptr, 10); + if (!cfg->key_width || cfg->key_width > OBJECT_GENERATOR_KEY_WIDTH || !endptr || *endptr != '\0') { + fprintf(stderr, "error: key-width must be a number in the range [1-%u].\n", OBJECT_GENERATOR_KEY_WIDTH); + return -1; + } + break; + case o_use_udp: + //FIXME check that key and value size can fit in a datagram + fprintf(stderr, "Warning: generating traffic over UDP is still experimental. Check that the size of requests can fit in a UDP datagram."); + cfg->use_udp = true; + break; default: return -1; break; @@ -570,6 +600,7 @@ void usage() { " -s, --server=ADDR Server address (default: localhost)\n" " -p, --port=PORT Server port (default: 6379)\n" " -S, --unix-socket=SOCKET UNIX Domain socket name (default: none)\n" + " --udp Connect using UDP rather than TCP (default: false)\n" " -P, --protocol=PROTOCOL Protocol to use (default: redis). Other\n" " supported protocols are memcache_text,\n" " memcache_binary.\n" @@ -597,6 +628,7 @@ void usage() { " --select-db=DB DB number to select, when testing a redis server\n" " --distinct-client-seed Use a different random seed for each client\n" " --randomize random seed based on timestamp (default is constant value)\n" + " --transaction_latency Measure and report the latency of each transaction\n" "\n" "Object Options:\n" " -d --data-size=SIZE Object data size (default: 32)\n" @@ -619,6 +651,7 @@ void usage() { " --no-expiry Ignore expiry information in imported data\n" "\n" "Key Options:\n" + " --key-width=NUMBER Maximum key size (default: \"250\")\n" " --key-prefix=PREFIX Prefix for keys (default: \"memtier-\")\n" " --key-minimum=NUMBER Key ID minimum value (default: 0)\n" " --key-maximum=NUMBER Key ID maximum value (default: 10000000)\n" @@ -651,7 +684,7 @@ struct cg_thread { cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) : m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false) { - m_protocol = protocol_factory(m_config->protocol); + m_protocol = protocol_factory(m_config->protocol, m_config->use_udp); assert(m_protocol != NULL); m_cg = new client_group(m_config, m_protocol, m_obj_gen); @@ -868,7 +901,7 @@ int main(int argc, char *argv[]) if (cfg.server != NULL && cfg.port > 0) { try { - cfg.server_addr = new server_addr(cfg.server, cfg.port); + cfg.server_addr = new server_addr(cfg.server, cfg.port, cfg.use_udp ? server_addr::UDP : server_addr::TCP); } catch (std::runtime_error& e) { benchmark_error_log("%s:%u: error: %s\n", cfg.server, cfg.port, e.what()); @@ -903,7 +936,7 @@ int main(int argc, char *argv[]) exit(1); } - obj_gen = new object_generator(); + obj_gen = new object_generator(cfg.key_width); assert(obj_gen != NULL); } else { // check paramters @@ -1094,7 +1127,7 @@ int main(int argc, char *argv[]) // If needed, data verification is done now... if (cfg.data_verify) { struct event_base *verify_event_base = event_base_new(); - abstract_protocol *verify_protocol = protocol_factory(cfg.protocol); + abstract_protocol *verify_protocol = protocol_factory(cfg.protocol, cfg.use_udp); verify_client *client = new verify_client(verify_event_base, &cfg, verify_protocol, obj_gen); fprintf(outfile, "\n\nPerforming data verification...\n"); diff --git a/memtier_benchmark.h b/memtier_benchmark.h index abf112a9..46201b9f 100644 --- a/memtier_benchmark.h +++ b/memtier_benchmark.h @@ -75,6 +75,9 @@ struct benchmark_config { int select_db; bool no_expiry; bool resolve_on_connect; + bool transaction_latency; + unsigned short key_width; + bool use_udp; }; diff --git a/obj_gen.cpp b/obj_gen.cpp index 94e39ea2..9f4a3de3 100644 --- a/obj_gen.cpp +++ b/obj_gen.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #ifdef HAVE_ASSERT_H #include @@ -117,6 +118,7 @@ int gaussian_noise::gaussian_distribution_range(double stddev, double median, in } object_generator::object_generator() : + m_key_width(OBJECT_GENERATOR_KEY_WIDTH), m_data_size_type(data_size_unknown), m_data_size_pattern(NULL), m_random_data(false), @@ -136,7 +138,31 @@ object_generator::object_generator() : m_data_size.size_list = NULL; } +object_generator::object_generator(unsigned int key_width) : + m_key_width(key_width), + m_data_size_type(data_size_unknown), + m_data_size_pattern(NULL), + m_random_data(false), + m_expiry_min(0), + m_expiry_max(0), + m_key_prefix(NULL), + m_key_min(0), + m_key_max(0), + m_key_stddev(0), + m_key_median(0), + m_value_buffer(NULL), + m_random_fd(-1) +{ + assert(m_key_width <= OBJECT_GENERATOR_KEY_WIDTH); + + for (int i = 0; i < OBJECT_GENERATOR_KEY_ITERATORS; i++) + m_next_key[i] = 0; + + m_data_size.size_list = NULL; +} + object_generator::object_generator(const object_generator& copy) : + m_key_width(copy.m_key_width), m_data_size_type(copy.m_data_size_type), m_data_size(copy.m_data_size), m_data_size_pattern(copy.m_data_size_pattern), @@ -150,7 +176,6 @@ object_generator::object_generator(const object_generator& copy) : m_key_median(copy.m_key_median), m_value_buffer(NULL), m_random_fd(-1) - { if (m_data_size_type == data_size_weighted && m_data_size.size_list != NULL) { @@ -304,15 +329,29 @@ void object_generator::set_expiry_range(unsigned int expiry_min, unsigned int ex m_expiry_max = expiry_max; } +void object_generator::check_key_size() +{ + unsigned int width_of_key_prefix = m_key_prefix == NULL ? 0 : strlen(m_key_prefix); + unsigned int width_of_key_max = (unsigned)log10((double)m_key_max) + 1; + if (width_of_key_prefix + width_of_key_max > m_key_width) { + char str [200]; + sprintf(str, "Key prefix '%s' (length %u) exceeds maximum key width (%u) when combined with the maximum key index (%u, length %u)", m_key_prefix, width_of_key_prefix, m_key_width, m_key_max, width_of_key_max); + throw std::logic_error(str); + } +} + void object_generator::set_key_prefix(const char *key_prefix) { m_key_prefix = key_prefix; + check_key_size(); } void object_generator::set_key_range(unsigned int key_min, unsigned int key_max) { + assert (key_min <= key_max); m_key_min = key_min; m_key_max = key_max; + check_key_size(); } void object_generator::set_key_distribution(double key_stddev, double key_median) @@ -360,9 +399,9 @@ const char* object_generator::get_key(int iter, unsigned int *len) { unsigned int l; m_key_index = get_key_index(iter); - + // format key - l = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, + l = snprintf(m_key_buffer, m_key_width - 1, "%s%u", m_key_prefix, m_key_index); if (len != NULL) *len = l; diff --git a/obj_gen.h b/obj_gen.h index 8078bbcd..0769058d 100644 --- a/obj_gen.h +++ b/obj_gen.h @@ -75,10 +75,12 @@ class data_object { #define OBJECT_GENERATOR_KEY_GET_ITER 0 #define OBJECT_GENERATOR_KEY_RANDOM -1 #define OBJECT_GENERATOR_KEY_GAUSSIAN -2 +#define OBJECT_GENERATOR_KEY_WIDTH 250 class object_generator { public: enum data_size_type { data_size_unknown, data_size_fixed, data_size_range, data_size_weighted }; + const unsigned int m_key_width; protected: data_size_type m_data_size_type; union { @@ -103,7 +105,7 @@ class object_generator { unsigned int m_next_key[OBJECT_GENERATOR_KEY_ITERATORS]; unsigned int m_key_index; - char m_key_buffer[250]; + char m_key_buffer[OBJECT_GENERATOR_KEY_WIDTH]; char *m_value_buffer; int m_random_fd; gaussian_noise m_random; @@ -116,6 +118,7 @@ class object_generator { unsigned int get_key_index(int iter); public: object_generator(); + object_generator(unsigned int key_width); object_generator(const object_generator& copy); virtual ~object_generator(); virtual object_generator* clone(void); @@ -130,6 +133,7 @@ class object_generator { void set_key_range(unsigned int key_min, unsigned int key_max); void set_key_distribution(double key_stddev, double key_median); void set_random_seed(int seed); + void check_key_size(); virtual const char* get_key(int iter, unsigned int *len); virtual data_object* get_object(int iter); diff --git a/protocol.cpp b/protocol.cpp index f6e6c23e..983aae44 100644 --- a/protocol.cpp +++ b/protocol.cpp @@ -410,7 +410,7 @@ int memcache_text_protocol::write_command_set(const char *key, int key_len, cons assert(value != NULL); assert(value_len > 0); int size = 0; - + size = evbuffer_add_printf(m_write_buf, "set %.*s 0 %u %u\r\n", key_len, key, expiry, value_len); evbuffer_add(m_write_buf, value, value_len); @@ -565,11 +565,12 @@ class memcache_binary_protocol : public abstract_protocol { response_state m_response_state; protocol_binary_response_no_extras m_response_hdr; size_t m_response_len; + bool m_over_udp; const char* status_text(void); public: - memcache_binary_protocol() : m_response_state(rs_initial), m_response_len(0) { } - virtual memcache_binary_protocol* clone(void) { return new memcache_binary_protocol(); } + memcache_binary_protocol(bool over_udp) : m_response_state(rs_initial), m_response_len(0), m_over_udp(over_udp) { } + virtual memcache_binary_protocol* clone(void) { return new memcache_binary_protocol(m_over_udp); } virtual int select_db(int db); virtual int authenticate(const char *credentials); virtual int write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset); @@ -620,6 +621,18 @@ int memcache_binary_protocol::authenticate(const char *credentials) return sizeof(req) + user_len + passwd_len + 2 + sizeof(mechanism) - 1; } +// FIXME this currently produces a constant value. +int append_binary_udp_header(struct evbuffer* write_buf) { + protocol_binary_udp_header binary_udp_head; + memset(&binary_udp_head, 0, sizeof(binary_udp_head)); + binary_udp_head.header.request_id = 0; + binary_udp_head.header.sequence_no = 0; + binary_udp_head.header.total_datagrams = 0x0100; //16-bit "0x01" in big endian. + binary_udp_head.header.reserved = 0; + evbuffer_add(write_buf, &binary_udp_head, sizeof(binary_udp_head)); + return sizeof(binary_udp_head); +} + int memcache_binary_protocol::write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset) { assert(key != NULL); @@ -638,11 +651,16 @@ int memcache_binary_protocol::write_command_set(const char *key, int key_len, co req.message.header.request.extlen = sizeof(req.message.body); req.message.body.expiration = htonl(expiry); + int binary_udp_head_size = 0; + if (m_over_udp) { + binary_udp_head_size = append_binary_udp_header(m_write_buf); + } + evbuffer_add(m_write_buf, &req, sizeof(req)); evbuffer_add(m_write_buf, key, key_len); evbuffer_add(m_write_buf, value, value_len); - return sizeof(req) + key_len + value_len; + return binary_udp_head_size + sizeof(req) + key_len + value_len; } int memcache_binary_protocol::write_command_get(const char *key, int key_len, unsigned int offset) @@ -660,10 +678,15 @@ int memcache_binary_protocol::write_command_get(const char *key, int key_len, un req.message.header.request.bodylen = htonl(key_len); req.message.header.request.extlen = 0; + int binary_udp_head_size = 0; + if (m_over_udp) { + binary_udp_head_size = append_binary_udp_header(m_write_buf); + } + evbuffer_add(m_write_buf, &req, sizeof(req)); evbuffer_add(m_write_buf, key, key_len); - return sizeof(req) + key_len; + return binary_udp_head_size + sizeof(req) + key_len; } int memcache_binary_protocol::write_command_multi_get(const keylist *keylist) @@ -722,6 +745,13 @@ int memcache_binary_protocol::parse_response(void) if (evbuffer_get_length(m_read_buf) < sizeof(m_response_hdr)) return 0; // no header yet? + if (m_over_udp) { + protocol_binary_udp_header header; + // FIXME we currently do not check the returned header. + ret = evbuffer_remove(m_read_buf, (void *)&header, sizeof(header)); + assert(ret == sizeof(header)); + } + ret = evbuffer_remove(m_read_buf, (void *)&m_response_hdr, sizeof(m_response_hdr)); assert(ret == sizeof(m_response_hdr)); @@ -800,7 +830,7 @@ int memcache_binary_protocol::parse_response(void) ///////////////////////////////////////////////////////////////////////// -class abstract_protocol *protocol_factory(const char *proto_name) +class abstract_protocol *protocol_factory(const char *proto_name, bool over_udp) { assert(proto_name != NULL); @@ -809,7 +839,7 @@ class abstract_protocol *protocol_factory(const char *proto_name) } else if (strcmp(proto_name, "memcache_text") == 0) { return new memcache_text_protocol(); } else if (strcmp(proto_name, "memcache_binary") == 0) { - return new memcache_binary_protocol(); + return new memcache_binary_protocol(over_udp); } else { benchmark_error_log("Error: unknown protocol '%s'.\n", proto_name); return NULL; diff --git a/protocol.h b/protocol.h index 2de923f5..9d3c6c45 100644 --- a/protocol.h +++ b/protocol.h @@ -102,6 +102,6 @@ class abstract_protocol { struct protocol_response* get_response(void) { return &m_last_response; } }; -class abstract_protocol *protocol_factory(const char *proto_name); +class abstract_protocol *protocol_factory(const char *proto_name, bool over_udp); #endif /* _PROTOCOL_H */