从零实现一个分布式锁:Redis与Zookeeper

发布时间:2026/6/23 1:15:07
从零实现一个分布式锁:Redis与Zookeeper 前言你有没有想过在分布式系统中多个服务同时操作同一份数据时怎么保证数据一致性比如秒杀系统中1000个人同时抢1个商品怎么保证不会超卖分布式锁是解决分布式环境下资源竞争的核心方案。今天我们用C语言从零实现两种分布式锁1. 基于Redis的分布式锁Redlock算法2. 基于Zookeeper的分布式锁临时顺序节点---一、分布式锁核心原理1. Redis分布式锁┌─────────┐ SET lock_key value NX PX 10000 ┌─────────┐│ 客户端A │ ───────────────────────────────────────→ │ Redis │└─────────┘ └─────────┘│ ││ 执行业务逻辑 ││ │▼ ▼┌─────────┐ DEL lock_key ┌─────────┐│ 客户端A │ ───────────────────────────────────────→ │ Redis │└─────────┘ └─────────┘2. Zookeeper分布式锁┌─────────┐ 创建临时顺序节点 ┌─────────────┐│ 客户端A │ ──────────────────────────────────────→ │ Zookeeper │└─────────┘ │ /locks/ ││ │ /lock-001││ 检查是否最小节点 │ /lock-002│▼ └─────────────┘┌─────────┐ 是 → 获得锁│ 客户端A │ ─── 否 → 监听前一个节点└─────────┘3. 核心要求要求 说明互斥性 同一时刻只有一个客户端持有锁防死锁 锁有超时机制客户端异常时自动释放可重入 同一客户端可重复获取同一把锁高可用 锁服务本身不能单点故障---二、完整代码实现1. 通用数据结构c#include stdio.h#include stdlib.h#include string.h#include unistd.h#include pthread.h#include time.h#include errno.h#include sys/socket.h#include netinet/in.h#include arpa/inet.h#include netdb.h#define MAX_KEY_LEN 128#define MAX_VALUE_LEN 256#define MAX_HOST_LEN 64#define MAX_RETRY 3// 分布式锁统一接口typedef struct distributed_lock {char lock_key[MAX_KEY_LEN];char lock_value[MAX_VALUE_LEN];int acquired;time_t expire_time;void *backend_data;int (*lock)(struct distributed_lock *self, int timeout_ms);int (*unlock)(struct distributed_lock *self);int (*renew)(struct distributed_lock *self);void (*destroy)(struct distributed_lock *self);} distributed_lock_t;2. Redis客户端基础c// Redis连接结构typedef struct redis_connection {char host[MAX_HOST_LEN];int port;int sock_fd;} redis_conn_t;// Redis命令执行int redis_connect(redis_conn_t *conn, const char *host, int port) {conn-sock_fd socket(AF_INET, SOCK_STREAM, 0);if (conn-sock_fd 0) return -1;struct hostent *server gethostbyname(host);if (!server) {close(conn-sock_fd);return -1;}struct sockaddr_in addr;addr.sin_family AF_INET;addr.sin_port htons(port);memcpy(addr.sin_addr.s_addr, server-h_addr, server-h_length);if (connect(conn-sock_fd, (struct sockaddr*)addr, sizeof(addr)) 0) {close(conn-sock_fd);return -1;}strcpy(conn-host, host);conn-port port;return 0;}int redis_command(redis_conn_t *conn, const char *cmd, char *response, int resp_size) {send(conn-sock_fd, cmd, strlen(cmd), 0);send(conn-sock_fd, \r\n, 2, 0);int n recv(conn-sock_fd, response, resp_size - 1, 0);if (n 0) {response[n] \0;return n;}return -1;}void redis_disconnect(redis_conn_t *conn) {if (conn-sock_fd 0) {close(conn-sock_fd);conn-sock_fd -1;}}3. Redis分布式锁实现c// Redis锁结构typedef struct redis_lock {distributed_lock_t base;redis_conn_t *connections; // 多Redis节点Redlockint node_count;int quorum; // 多数派long long expire_ms;} redis_lock_t;// 生成唯一锁值客户端标识void generate_lock_value(char *buf, int size) {pid_t pid getpid();time_t now time(NULL);snprintf(buf, size, %d-%ld-%d, pid, now, rand());}// 单个Redis节点加锁int redis_node_lock(redis_conn_t *conn, const char *key, const char *value,long long ttl_ms, char *error) {char cmd[512];snprintf(cmd, sizeof(cmd),SET %s %s NX PX %lld, key, value, ttl_ms);char response[256];if (redis_command(conn, cmd, response, sizeof(response)) 0) {strcpy(error, redis command failed);return -1;}// Redis返回OK表示成功if (strncmp(response, OK, 3) 0) {return 0;}strcpy(error, response);return -1;}// Redis节点解锁int redis_node_unlock(redis_conn_t *conn, const char *key, const char *value) {// 使用Lua脚本保证原子性检查value匹配才删除char cmd[512];snprintf(cmd, sizeof(cmd),EVAL \if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end\ 1 %s %s,key, value);char response[256];if (redis_command(conn, cmd, response, sizeof(response)) 0) {return -1;}return (strstr(response, :1) ! NULL) ? 0 : -1;}// Redlock加锁多Redis节点int redlock_lock(distributed_lock_t *base, int timeout_ms) {redis_lock_t *lock (redis_lock_t*)base;time_t start time(NULL);int acquired_count 0;// 计算每个节点的超时时间总超时/节点数int per_node_timeout timeout_ms / lock-node_count;if (per_node_timeout 10) per_node_timeout 10;// 尝试在所有节点上加锁for (int i 0; i lock-node_count; i) {char error[256];int ret redis_node_lock(lock-connections[i],lock-base.lock_key,lock-base.lock_value,lock-expire_ms, error);if (ret 0) {acquired_count;}// 检查是否超时time_t now time(NULL);if ((now - start) * 1000 timeout_ms) {break;}}// 判断是否获得多数派if (acquired_count lock-quorum) {lock-base.acquired 1;lock-base.expire_time time(NULL) lock-expire_ms / 1000;return 0;}// 加锁失败释放已获取的锁for (int i 0; i lock-node_count; i) {redis_node_unlock(lock-connections[i],lock-base.lock_key,lock-base.lock_value);}return -1;}// Redlock解锁int redlock_unlock(distributed_lock_t *base) {redis_lock_t *lock (redis_lock_t*)base;if (!lock-base.acquired) return -1;for (int i 0; i lock-node_count; i) {redis_node_unlock(lock-connections[i],lock-base.lock_key,lock-base.lock_value);}lock-base.acquired 0;return 0;}// 锁续期int redlock_renew(distributed_lock_t *base) {redis_lock_t *lock (redis_lock_t*)base;if (!lock-base.acquired) return -1;// 在大部分节点上续期int renewed 0;for (int i 0; i lock-node_count; i) {// 使用PEXPIRE续期char cmd[256];snprintf(cmd, sizeof(cmd), PEXPIRE %s %lld,lock-base.lock_key, lock-expire_ms);char response[64];if (redis_command(lock-connections[i], cmd, response, sizeof(response)) 0) {if (strstr(response, :1)) renewed;}}if (renewed lock-quorum) {lock-base.expire_time time(NULL) lock-expire_ms / 1000;return 0;}return -1;}// 创建Redis分布式锁distributed_lock_t *create_redis_lock(const char *key,const char **hosts, const int *ports,int node_count, long long expire_ms) {redis_lock_t *lock malloc(sizeof(redis_lock_t));memset(lock, 0, sizeof(redis_lock_t));strcpy(lock-base.lock_key, key);generate_lock_value(lock-base.lock_value, sizeof(lock-base.lock_value));lock-base.acquired 0;lock-base.expire_time 0;lock-node_count node_count;lock-quorum node_count / 2 1;lock-expire_ms expire_ms;// 连接所有Redis节点lock-connections malloc(sizeof(redis_conn_t) * node_count);for (int i 0; i node_count; i) {if (redis_connect(lock-connections[i], hosts[i], ports[i]) 0) {// 连接失败处理lock-connections[i].sock_fd -1;}}lock-base.lock redlock_lock;lock-base.unlock redlock_unlock;lock-base.renew redlock_renew;lock-base.destroy NULL; // 会在外部处理return (distributed_lock_t*)lock;}4. Zookeeper分布式锁实现c// Zookeeper节点结构模拟typedef struct zk_node {char path[256];char data[256];int ephemeral;int sequential;int seq_num;struct zk_node *children;struct zk_node *next;} zk_node_t;// Zookeeper模拟服务器typedef struct zk_server {zk_node_t *root;pthread_mutex_t mutex;} zk_server_t;zk_server_t *g_zk_server NULL;// 初始化模拟ZKvoid zk_init() {g_zk_server malloc(sizeof(zk_server_t));g_zk_server-root malloc(sizeof(zk_node_t));strcpy(g_zk_server-root-path, /);g_zk_server-root-children NULL;pthread_mutex_init(g_zk_server-mutex, NULL);}// 创建ZNodezk_node_t *zk_create_node(const char *path, int ephemeral, int sequential) {zk_node_t *node malloc(sizeof(zk_node_t));strcpy(node-path, path);node-ephemeral ephemeral;node-sequential sequential;node-seq_num 0;node-children NULL;node-next NULL;if (sequential) {node-seq_num rand() % 10000;}return node;}// 创建顺序节点int zk_create_sequential(const char *base_path, const char *data, char *result_path) {pthread_mutex_lock(g_zk_server-mutex);// 查找父节点zk_node_t *parent g_zk_server-root;char *path_copy strdup(base_path);char *token strtok(path_copy, /);while (token) {zk_node_t *child parent-children;int found 0;while (child) {if (strcmp(child-path 1, token) 0) {parent child;found 1;break;}child child-next;}if (!found) {// 创建中间节点char new_path[256];snprintf(new_path, sizeof(new_path), %s/%s, parent-path, token);zk_node_t *new_node zk_create_node(new_path, 0, 0);new_node-next parent-children;parent-children new_node;parent new_node;}token strtok(NULL, /);}free(path_copy);// 创建顺序节点int seq parent-children ? parent-children-seq_num 1 : 1;char seq_path[256];snprintf(seq_path, sizeof(seq_path), %s/lock-%05d, base_path, seq);zk_node_t *node zk_create_node(seq_path, 1, 1);node-seq_num seq;strcpy(node-data, data);node-next parent-children;parent-children node;strcpy(result_path, seq_path);pthread_mutex_unlock(g_zk_server-mutex);return seq;}// 获取子节点列表按顺序void zk_get_children(const char *path, char ***children, int *count) {pthread_mutex_lock(g_zk_server-mutex);// 查找节点zk_node_t *node g_zk_server-root;char *path_copy strdup(path);char *token strtok(path_copy, /);while (token) {zk_node_t *child node-children;int found 0;while (child) {if (strcmp(child-path 1, token) 0) {node child;found 1;break;}child child-next;}if (!found) {free(path_copy);*count 0;pthread_mutex_unlock(g_zk_server-mutex);return;}token strtok(NULL, /);}free(path_copy);// 收集子节点*count 0;zk_node_t *child node-children;while (child) {(*count);child child-next;}*children malloc(sizeof(char*) * (*count));child node-children;int idx 0;while (child) {(*children)[idx] strdup(child-path);child child-next;}pthread_mutex_unlock(g_zk_server-mutex);}// 删除节点void zk_delete(const char *path) {pthread_mutex_lock(g_zk_server-mutex);// 查找父节点zk_node_t *parent g_zk_server-root;char *path_copy strdup(path);char *last_token NULL;char *token strtok(path_copy, /);while (token) {last_token token;zk_node_t *child parent-children;int found 0;while (child) {if (strcmp(child-path 1, token) 0) {parent child;found 1;break;}child child-next;}if (!found) {free(path_copy);pthread_mutex_unlock(g_zk_server-mutex);return;}token strtok(NULL, /);}// 删除节点zk_node_t *prev NULL;zk_node_t *child parent-children;while (child) {if (strcmp(child-path, path) 0) {if (prev) {prev-next child-next;} else {parent-children child-next;}free(child);break;}prev child;child child-next;}free(path_copy);pthread_mutex_unlock(g_zk_server-mutex);}// Zookeeper锁结构typedef struct zk_lock {distributed_lock_t base;char lock_path[256];char node_path[256];char watch_path[256];int node_seq;} zk_lock_t;// Zookeeper加锁int zk_lock(distributed_lock_t *base, int timeout_ms) {zk_lock_t *lock (zk_lock_t*)base;time_t start time(NULL);// 创建临时顺序节点snprintf(lock-lock_path, sizeof(lock-lock_path), /locks/%s,lock-base.lock_key);char data[256];snprintf(data, sizeof(data), client-%d, getpid());char result_path[256];int seq zk_create_sequential(lock-lock_path, data, result_path);if (seq 0) return -1;strcpy(lock-node_path, result_path);lock-node_seq seq;// 检查是否是最小节点while (1) {char **children;int count;zk_get_children(lock-lock_path, children, count);// 找到最小序号int min_seq 99999;for (int i 0; i count; i) {int s;sscanf(children[i], %*[^-]-%d, s);if (s min_seq) min_seq s;free(children[i]);}free(children);if (lock-node_seq min_seq) {// 获得锁lock-base.acquired 1;return 0;}// 监听前一个节点简化轮询usleep(100 * 1000);time_t now time(NULL);if ((now - start) * 1000 timeout_ms) {zk_delete(lock-node_path);return -1;}}}// Zookeeper解锁int zk_unlock(distributed_lock_t *base) {zk_lock_t *lock (zk_lock_t*)base;if (!lock-base.acquired) return -1;zk_delete(lock-node_path);lock-base.acquired 0;return 0;}// 创建Zookeeper分布式锁distributed_lock_t *create_zk_lock(const char *key) {zk_lock_t *lock malloc(sizeof(zk_lock_t));memset(lock, 0, sizeof(zk_lock_t));strcpy(lock-base.lock_key, key);lock-base.acquired 0;lock-base.lock zk_lock;lock-base.unlock zk_unlock;lock-base.renew NULL; // ZK锁不需要续期会话保持// 确保锁路径存在char path[256];snprintf(path, sizeof(path), /locks/%s, key);strcpy(lock-lock_path, path);return (distributed_lock_t*)lock;}5. 使用示例c// 模拟共享资源int shared_counter 0;pthread_mutex_t counter_mutex PTHREAD_MUTEX_INITIALIZER;void *worker_thread(void *arg) {distributed_lock_t *lock (distributed_lock_t*)arg;for (int i 0; i 10; i) {// 加锁if (lock-lock(lock, 5000) 0) {// 临界区pthread_mutex_lock(counter_mutex);shared_counter;int current shared_counter;pthread_mutex_unlock(counter_mutex);printf(Thread %lu: counter %d\n, pthread_self(), current);usleep(10000);// 解锁lock-unlock(lock);} else {printf(Thread %lu: 获取锁失败\n, pthread_self());}usleep(5000);}return NULL;}int main() {printf( 分布式锁测试 \n\n);srand(time(NULL));// 初始化Zookeeper模拟服务器zk_init();// 测试Redis锁printf(--- Redis分布式锁 ---\n);const char *hosts[] {127.0.0.1, 127.0.0.1, 127.0.0.1};int ports[] {6379, 6380, 6381};distributed_lock_t *redis_lock create_redis_lock(my_resource,hosts, ports, 3, 10000);// 创建多个线程竞争锁pthread_t threads[5];for (int i 0; i 5; i) {pthread_create(threads[i], NULL, worker_thread, redis_lock);}for (int i 0; i 5; i) {pthread_join(threads[i], NULL);}// 测试Zookeeper锁printf(\n--- Zookeeper分布式锁 ---\n);shared_counter 0;distributed_lock_t *zk_lock create_zk_lock(my_resource);for (int i 0; i 5; i) {pthread_create(threads[i], NULL, worker_thread, zk_lock);}for (int i 0; i 5; i) {pthread_join(threads[i], NULL);}return 0;}---三、编译和运行bashgcc -o distributed_lock distributed_lock.c -lpthread./distributed_lock---四、Redis vs Zookeeper分布式锁特性 Redis Zookeeper一致性 最终一致 强一致ZAB协议性能 高内存操作 低需要选举可用性 高主从切换 高集群死锁预防 超时自动释放 临时节点自动删除实现复杂度 简单 复杂---五、常见问题问题 解决方案锁过期业务未完成 锁续期看门狗机制Redis主从切换锁丢失 Redlock算法死锁 设置合理的超时时间可重入 在锁值中记录持有者信息---六、总结通过这篇文章你学会了· 分布式锁的核心要求互斥、防死锁、高可用· Redis分布式锁SET NX PX Redlock· Zookeeper分布式锁临时顺序节点· 锁续期、超时处理· 完整的测试示例分布式锁是分布式系统的必备工具。掌握它你就理解了秒杀系统、分布式任务调度的核心设计。下一篇预告《从零实现一个分布式事务TCC与Saga模式》---评论区分享一下你用分布式锁解决过什么场景