Redis Modules 入门。
1. Introduction to Redis modules
原文地址
1.1. 加载、卸载和查看 Modules
加载
Modules 有两种加载方式:
- 在
redis.conf
中设置:loadmodule /path/to/mymodule.so
- 通过 redis command 实时加载:
MODULE LOAD /path/to/mymodule.so
也可在加载模块时传递参数,如: loadmodule(MODULE LOAD) /path/to/mymodule.so 765 PRO FIGHT
卸载
卸载 Modules:MODULE UNLOAD mymodule
,如果需要在卸载模块之前进行内存清理之类的操作,可以通过实现 RedisModule_OnUnload
函数来完成,此函数接口为:
1
|
int RedisModule_OnUnload(RedisModuleCtx *ctx);
|
也可以通过在 RedisModule_OnUnload
中直接返回 REDISMODULE_ERR
禁止模块被卸载。
查看
列出已加载的 Modules: MODULE LIST
1.2. 模块示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
#include "redismodule.h"
#include <stdlib.h>
/* The function that implements the command must have the following prototype:
* int mycommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) */
int HelloworldRand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
RedisModule_ReplyWithLongLong(ctx, rand());
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
/* It should be the first function called by the module OnLoad function.
* Init function prototype:
* @return int
* @param1: RedisModuleCtx *ctx
* @param2: const char *modeulename
* @param3: int module_version (reported by MODULE LIST)
* @param4: int api_version
* */
if (RedisModule_Init(ctx, "helloworld", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
{
return REDISMODULE_ERR;
}
/* It used in order to register commands into the Redis core.
* CreateCommand function prototype:
* @return int
* @param1: RedisModuleCtx *ctx
* @param2: const char *name
* @param3: RedisModuleCmdFunc cmdfunc
* @param4: const char *strflags
* @param5: int firstkey
* @param6: int lastkey
* @param7: int keystep
* */
if (RedisModule_CreateCommand(ctx, "helloworld.rand", HelloworldRand_RedisCommand,
"fast random", 0, 0, 0) == REDISMODULE_ERR)
{
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}
|
1.3. RedisModuleString
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// accesss a string
const char *RedisModule_StringPtrLen(RedisModuleString *string, size_t *len);
// create new string
RedisModuleString *RedisModule_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len);
// create string from number
RedisModuleString *mystr = RedisModule_CreateStringFromLongLong(ctx, 10);
// parse string as number
long long myval;
if (RedisModule_StringToLongLong(ctx, argv[1], &myval) == REDISMODULE_OK)
{
/*Do something with myval*/
}
// free a string which created by RedisModule_CreateString
void RedisModule_FreeString(RedisModuleString *str);
|
- 如果不想手动 free 的话,可以开启自动内存管理
- 通过
argv
数组传递的参数不要 free
1.4. 返回响应
在模块中实现的命令可以通过 RedisModule_ReplyWith<something>
接口返回响应至客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/* Return an error */
RedisModule_ReplyWithError(RedisModuleCtx *ctx, const char *err);
/* Relpy with a long long */
RedisModule_ReplyWithLongLong(RedisModuleCtx *ctx, long long res);
/* Reply with a simple string, that can't contain binary values or newlines */
RedisModule_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *res);
/* Reply with bulk strings that are binary safe */
int RedisModule_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len);
int RedisModule_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str);
/* Reply with an array, you just need to use a function to
* emit the array length, followed by as many calls to the
* above functions as the number of elements of array are. */
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithStringBuffer(ctx, "age", 3);
RedisModule_ReplyWithLongLong(ctx, 22);
/* Returning arrays with dynamic length,
* It is possible to have multiple nested arrays with postponed reply.
* Each call to ReplySetArrayLength() will set the length of the latest
* corresponding call to to ReplyWithArray() */
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
... generate 100 elements ...
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
... generate 10 elements ...
RedisModule_ReplySetArrayLength(ctx, 10);
RedisModuel_ReplySetArrayLength(ctx, 100);
|
1.5. 请求有效性检测
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
/* Check the number of arguments */
if (argc != 2)
{
return RedisModule_WrongArity(ctx);
}
/* Check the type of key */
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int key_type = RedisModule_KeyType(key);
if (key_type != REDISMODULE_KEYTYPE_STRING && key_type != REDISMODULE_KEYTYPE_EMPTY)
{
RedisModule_CloseKey(key);
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
|
1.6. Low level access to keys
访问 Redis 数据库有两种 APIs:
- low level APIs: 可直接操作 Redis 数据结构,速度快
- high level APIs:使用
RedisModule_Call()
函数 call Redis commands
在请求有效性检测中,我们已经看过了 low level 操作的简单示例,这些 APIs 为了保证运行速度,并未做很多 run-time checks,因此需要遵守下述规则:
- 同时 Open 相同的 key,并且其中有写操作的话,会产生未定义行为或 crashes
- 当一个 key 被 Open 时,应该只使用 low level APIs 访问,不可同时和
RedisModule_Call()
混用
Open a key for access
1
2
3
4
5
|
RedisModuleKey *key;
/* Prototype: void* RedisModule_OpenKey(RedisModuleCtx *ctx,
* RedisModuleString *keyname, int mode);
* */
RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ;
|
mode
可以是 REDISMODULE_READ
或 REDISMODULE_WRITE
- 打开一个不存在的 key 时
Getting the key type
1
|
int key_type = RedisModule_KeyType(key);
|
返回值种类:
REDISMODULE_KEYTYPE_EMPTY
REDISMODULE_KEYTYPE_STRING
REDISMODULE_KEYTYPE_LIST
REDISMODULE_KEYTYPE_SET
REDISMODULE_KEYTYPE_ZSET
REDISMODULE_KEYTYPE_HASH
REDISMODULE_KEYTYPE_MODULE
REDISMODULE_KEYTYPE_STREAM
Managing key expires(TTLs)
1
2
3
4
5
6
7
8
|
/* Query the current expire of an open key. Return the time to live of the key
* in milliseconds, or REDISMODULE_NO_EXPIRE as a special value to signal the
* key has no associated or does not exist at all. */
mstime_t RedisModule_GetExpire(RedisModuleKey *key);
/* Change the expire of a key
* When called on a non existing key, REDISMODULE_ERR is returned. */
int RedisModule_SetExpire(RedisModuleKey *key, mstime_t expire);
|
Deleting keys
1
2
|
/* return REDISMODULE_ERR if the key is not open for writing */
RedisModule_DeleteKey(key);
|
Close a key
1
|
RedisModule_CloseKey(key);
|
当自动内存管理功能开启时,也可以不用 close keys。
Obtaining the length of values
1
2
|
// return the length of values associated to an open key
size_t len = RedisModule_ValueLength(key);
|
string
: 返回的是字符串长度
list
, set
, zset
和 hash
:返回的是元素个数
- key 不存在时返回 0
String type API
1
2
|
/* Setting a new string value, this function works exactly like SET cmd */
int RedisModule_StringSet(RedisModuleKey *key, RedisModuleString *str);
|
通过 DMA(direct memory access) 访问字符串类型效率更高,DMA 方位期间该 key 不可有其他操作,否则 DMA 指针非法。
1
2
3
4
5
6
|
size_t len, j;
char *myptr = RedisModule_StringDMA(key, &len, REDISMODULE_WRITE);
for (j = 0; j < len; j++)
{
myptr[j] = 'A';
}
|
可通过以下接口改变字符串长度:
1
|
RedisModule_StringTruncate(mykey, 1024);
|
- 如果该 key 操作之前不存在,则会创造新 key
- 字符串长度扩展时,会填充 zero bytes
更多数据类型的 APIs 可见 Modules API reference。
1.7. High level access to keys – Calling Redis commands
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
RedisModuleCallReply *reply;
/* @return: RedisModuleCallReply on sucess, NUll on error
* @param1: context
* @param2: a null terminated C string with the command name
* @param3: the format specifier where each character corresponds to
* the type of the arguments that will follow */
reply = RedisModule_Call(ctx, "INCRBY", "sc", argv[1], "10");
/* Accesss a RedisModuleCallReply object */
if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_INTEGER)
{
long long myval = RedisModule_CallReplyInteger(reply);
/* Do something with myval */
}
|
格式说明符
c
– Null terminated C string pointer
b
– C buffer, two arguments needed: C string pointer and size_t
length
s
– RedisModuleString as received in argv
or by other Redis module APIs returning a RedisModuleString object
l
– Long long integer
v
– Array of RedisModuleString objects
!
– This modifier just tells the function to replicate the command to replicas and AOF. it is ignored from the point of view of arguments parsing
A
– This modifier, when !
is given, tells to suppress AOF propagation: the command will propagated only to replicas
R
– This modifier, when !
is given, tells to suppress replicas propagation: the command will be propagated only to the AOF if enabled
返回值
函数执行成功时返回 RedisModuleCallReply
对象,失败时返回 NULL
并将 errno
设置为以下值:
errno |
meaning |
EBADF |
wrong format specifier. |
EINVAL |
wrong command arity. |
ENOENT |
command does not exist. |
EPERM |
operation in Cluster instance with key in non local slot. |
EROFS |
operation in Cluster instance when a write command is sent in a readonly state. ENETDOWN |
RedisModuleCallReply
Reply type |
Meaning |
REDISMODULE_REPLY_STRING |
Bulk string or status replies |
REDISMODULE_REPLY_ERROR |
Errors |
REDISMODULE_REPLY_INTEGER |
Signed 64-bit integers |
REDISMODULE_REPLY_ARRAY |
Array of replies |
REDISMODULE_REPLY_NULL |
NULL reply |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
/* Strings, errors and arrays hava an associated length.
* To obtain the reply length: */
size_t reply_len = RedisModule_CallReplyLength(reply);
/* Accesss strings or errors */
size_t len;
char *ptr = RedisModule_CallReplyStringPtr(reply, &len);
/* Accesss sub elements of array */
RedisModuleCallReply *sub_reply;
sub_reply = RedisModule_CallReplyArrayElement(reply, idx);
/* Obtain the value of an integer reply */
long long reply_integer_val = RedisModule_CallReplyInteger(reply);
/* RedisModuleCallReply are not the same as RedisModuleString type
* create a new string object from a call reply of type string, error or integer
* */
RedisModuleString *mystr = RedisModule_CreateStringFromCallReply(reply);
/* Free a reply object
* For arrays, you need to free only the top level reply, not the nested replies.
* */
void RedisModule_FreeCallReply(RedisModuleCallReply *reply);
|
1.8. Replicating commands
当使用 high level APIs 访问 key,通过加 !
格式说明符,命令自动被复制,如:
1
|
reply = RedisModule_Call(ctx, "INCRBY", "!sc", argv[1], "10");
|
当使用 low level APIs 时,可通过以下俩个接口复制命令:
1
2
3
4
5
6
7
8
9
10
|
/* When you use the follow API, you should not use any other replication
* function since they are not guaranteed to mix well. */
RedisModule_ReplicateVerbatim(ctx);
/* Exactly tell Redis what commands to replicate as the effect of the command
* execution. It's possible to call RedisModule_Replicate multiple times,
* and each will emit a command. All the sequence emitted is wrapped between
* a MULTI/EXEC transaction, so that the AOF and replication effects are
* the same as executing a single command. */
RedisModule_Replicate(ctx, "INCRBY", "cl", "foo", my_increment);
|
1.9. Automatic memory management
启用自动内存管理,只需在命令实现函数的开头加上 RedisModule_AutoMemory(ctx);
即可,虽然会有些性能损耗,但是你可以不再需要手动 close open keys
, free replies
和 free RedisModuleString objects
。
1.10. Allocating memory into modules
虽然在模块中可以调用 malloc()
和 free()
函数,但是通过 malloc()
分配的内存并不会被计算到 used_memory
信息中去,也不会受到 maxmemory
限制,所以应首先考虑使用 Redis Modules 提供的 APIs。而且,使用 APIs 分配内存实现的自定义数据结构也可以被 RDB 加载函数正确地反序列化。
1
2
3
4
5
6
7
8
9
|
void *RedisModule_Alloc(size_t bytes);
void* RedisModule_Realloc(void *ptr, size_t bytes);
void RedisModule_Free(void *ptr);
void RedisModule_Calloc(size_t nmemb, size_t size);
char *RedisModule_Strdup(const char *str);
/* Pool allocator
* the memory allocated is automatically released when the command returns. */
void *RedisModule_PoolAlloc(RedisModuleCtx *ctx, size_t bytes);
|
2. Implementing native data types
原文地址
在模块中实现类似 Redis 原生的数据类型需要满足以下几点:
- The implementation of some kind of new data structure and of commands operating on the new data structure.
- A set of callbacks that handle: RDB saving, RDB loading, AOF rewriting, releasing of a value associated with a key, calculation of a value digest (hash) to be used with the DEBUG DIGEST command.
- A 9 characters name that is unique to each module native data type.
- An encoding version, used to persist into RDB files a module-specific data version, so that a module will be able to load older representations from RDB files.
2.1. Registering a new data types
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/* Declare a global variable that will hold a reference to the data type */
static RedisModuleType *MyType;
#define MYTYPE_ENCODING_VERSION 0
int RedisModule_OnLoad(RedisModuleCtx *ctx)
{
/* The follow set of methods must be passed.
* while .digest and .mem_usage are optinal */
RedisModuleTypeMethods tm =
{
.version = REDISMODULE_TYPE_METHOD_VERSION,
.rdb_load = MyTypeRDBLoad,
.rdb_save = MyTypeRDBSave,
.aof_rewrite = MyTypeAOFRewrite,
.free = MyTypeFree
};
/* 9 character name, character set {A-Z, a-z, 0-9, _, -}*/
MyType = RedisModule_CreateDataType(ctx, "MyType-AZ",
MYTYPE_ENCODING_VERSION, &tm);
if (MyType == NULL)
{
return REDISMODULE_ERR;
}
}
|
Prototypes of type methods
1
2
3
4
5
6
7
8
9
10
|
/* mandatory */
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof,
RedisModuleString *key, void *value)
typedef void (*RedisModuleTypeFreeFunc)(void *value)
/* optinal */
typedef size_t (*RedisModuleTypeMemUsageFunc)(void *value)
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value)
|
rdb_load
is called when loading data from the RDB file. It loads data in the same format as rdb_save produces.
rdb_save
is called when saving data to the RDB file.
aof_rewrite
is called is called when the AOF is being rewritten, and the module needs to tell Redis what is the sequence of commands to recreate the content of a given key.
free
is called when a key with the module native type is deleted via DEL or in any other mean, in order to let the module reclaim the memory associated with such a value.
digest
is called when DEBUG DIGEST is executed and a key holding this module type is found. Currently this is not yet implemented so the function can be left empty.
mem_usage
is called when the MEMORY command asks for the total memory consumed by a specific key, and is used in order to get the amount of bytes used by the module value.
为什么名字要 9 个字符?
1
2
3
4
5
6
7
8
9
10
11
|
/* Now RDB files are sequences of key-value pairs like the following: */
---------------------------------------------
| 1 byte type | key | a type specific value |
---------------------------------------------
/* in module data, prefix a type specific value with 64-bit integer
* 64 = 9 * 6 + 10, 9 characters of 6 bits, 10 bits are used in oreder
* to store the encoding version of the type */
------------------------------------------
| 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 10 |
------------------------------------------
|
2.2. Seting and getting keys
使用 low level APIs 去操作自定义的数据结构,例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
/* Check the key */
if (type != REDISMODULE_KEYTYPE_EMPTY &&
RedisModule_ModuleTypeGetType(key) != MyType)
{
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
struct some_private_struct *data;
if (type == REDISMODULE_KEYTYPE_EMPTY)
{
/* Create an empty value object if the key is currently empty. */
data = createMyDataStructure();
RedisModule_ModuleTypeSetValue(key, MyType, data);
}
else
{
/* Retrieve the private data from a key */
data = RedisModule_ModuleTypeGetValue(key);
}
/* Do something with data */
|
2.3. Free method
1
2
3
4
5
|
/* a simple impl of the free method */
void MyTypeFreeCallback(void *value)
{
RedisModule_Free(value);
}
|
2.4. RDB load and save methods
当调用 RDB saving 或 loading 时,我们需要给出自定义数据类型在磁盘上的排布方式,Redis 提供了在 RDB 文件中存储以下数据类型的 high level APIs:
- Unsigned 64-bit integers
- Signed 64-bit integers
- Doubles
- Strings
APIs 如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
void RedisModule_SaveUnsigned(RedisModuleIO *io, uint64_t value);
uint64_t RedisModule_LoadUnsigned(RedisModuleIO *io);
void RedisModule_SaveSigned(RedisModuleIO *io, int64_t value);
int64_t RedisModule_LoadSigned(RedisModuleIO *io);
void RedisModule_SaveDouble(RedisModuleIO *io, double value);
double RedisModule_LoadDouble(RedisModuleIO *io);
void RedisModule_SaveString(RedisModuleIO *io, RedisModuleString *s);
RedisModuleString *RedisModule_LoadString(RedisModuleIO *io);
void RedisModule_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len);
char *RedisModule_LoadStringBuffer(RedisModuleIO *io);
|
一个简单的例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
struct double_array
{
size_t count;
double *values;
};
/* rdb_save */
void DoubleArrayRDBSave(RedisModuleIO *io, void *ptr)
{
struct double_array *da = ptr;
RedisModule_SaveUnsigned(io, da->count);
for (size_t j = 0; j < da->count; j++)
{
RedisModule_SaveDouble(io, da->values[j]);
}
}
/* rdb_load */
void *DoubleArrayRDBLoad(RedisModuleIO *io, int encver)
{
if (encver != DOUBLE_ARRAY_ENC_VER)
{
/* We should actually log an error here, or try to implement the ability
* to load older versions of our data structure */
return NULL;
}
struct double_array *da;
da = RedisModule_Alloc(sizeof(*da));
da->count = RedisModule_LoadUnsigned(io);
da->values = RedisModule_Alloc(da->count * sizeof(double));
for (size_t j = 0; j < da->count; j++)
{
da->values[j] = RedisModule_LoadDouble(io);
}
return da;
}
|
2.5. AOF rewriting
1
|
void RedisModule_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...);
|
2.6. Handling multiple encodings
TODO
2.7. Allocating memory
自定义数据结构的内存管理建议使用 Redis Module 提供的 APIs:
- 自定义数据结构使用的内存会被计算在
used_memory
内,方便 Redis 进行内存管理
- Redis 使用的是
jemalloc
分配器,可以降低内存碎片率
- 与 Redis 内存分配器相同时,从 RDB loading 函数中返回的值无需再次拷贝,可直接访问使用
3. Blocking operations with modules
原文地址
3.1. How blocking and resuming works
可使用以下接口让客户端进入 blocked state:
1
2
3
4
5
6
|
RedisModuleBlockedClient *RedisModule_BlockClient(
RedisModuleCtx *ctx,
RedisModuleCmdFunc reply_callback,
RedisModuleCmdFunc timeout_callback,
void (*free_private)(void*),
long long timeout_ms);
|
- 该函数返回的
RedisModuleBlockedClient
对象,用于之后 unblock 该客户端
ctx
is the command execution context as usually in the rest of the API
reply_callback
is the callback, having the same prototype of a normal command function, that is called when the client is unblokced in order to return a reply to the client
timeout_callback
is the callback, having the same prototype of a normal command function that is called when the client reached the timeout_ms
timeout.
free_private
is the callback that is called in order to free the private data. Private data is a pointer to some data that is passed between the API used to unblock the client, to the callback that will send the reply to the client.
timeout_ms
is the timeout in milliseconds. When the timeout is reached, the timeout callback is called and the client is automatically aborted.
可使用以下接口 unblock 客户端:
1
2
|
/* This function is thread safe */
int RedisModule_UnblockClient(RedisModuleBlokcedClient *bc, void *privdata);
|
- 在客户端解除阻塞之前,会立即调用在客户端被阻塞时指定的
reply_callback
函数:该函数将有权访问此处使用的 privdata
指针。
- 在客户端解除阻塞之后,会自动调用在客户端被阻塞时指定的
free_private
函数释放 privdata
占用的空间
一个简单示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
return RedisModule_ReplyWithSimpleString(ctx, "765PRO! FIGHT!");
}
int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
return RedisModule_ReplyWithNull(ctx);
}
void *threadmain(void *arg)
{
RedisModuleBlockedClient *bc = arg;
/* Wait one second and unblock,
* You can think at it as an actually expansive operation of some kind */
sleep(1);
RedisModule_UnblockClient(bc, NULL);
}
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);
pthread_t tid;
pthread_create(&tid, NULL, threadmian, bc);
return REDISMODULE_OK;
}
|
3.2. Passing reply data when unblocking
一般而言阻塞客户端的目的是为了等待某些计算的结果,并将结果回复给客户端,该结果往往是在 unblocked 时传递给 reply_callback
的,我们将上述示例进行简单修改,以演示传递 reply
的过程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
/* Obtain the private data */
long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
/* Don't free mynumber here, but in the free private data callback */
return RedisModule_ReplyWithLongLong(ctx, mynumber);
}
int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
return RedisModule_ReplyWithNull(ctx);
}
void free_privdata(void *privdata)
{
RedisModule_Free(privdata);
}
void *threadmain(void *arg)
{
RedisModuleBlokcedClient *bc = arg;
/* Wait one second and unblock,
* You can think at it as an actually expansive operation of some kind */
sleep(1);
long *mynumber = RedisModule_Alloc(sizeof(long));
*mynumber = rand();
RedisModule_UnblockClient(bc, mynumber);
}
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);
pthread_t tid;
pthread_create(&tid, NULL, threadmian, bc);
return REDISMODULE_OK;
}
|
privdata
的释放必须放置在 free_privdata
callback 中,因为 reply_callback
可能因为客户端连接超时或断开导致无法被调用
- 在
timeout_callback
中,也可通过 GetBlockedClientPrivateData()
访问 privdata
3.3. Aborting the blocking of a client
当实现 Redis Command 的函数中,当申请资源发生错误时,我们既不想 block 客户端,也不想触发 reply_callback
,此时我们最好的选择可能是使用以下接口:
1
|
int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);
|
这时,我们可以将上一小节的 Example_RedisCommand
函数改写如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);
pthread_t tid;
if (pthread_create(&tid, NULL, threadmain, bc) != 0)
{
RedisModule_AbortBlock(bc);
RedisModule_ReplyWithError(ctx, "Sorry can't create a thread");
}
return REDISMODULE_OK;
}
|
3.4. Implementing the command, reply and timeout callback using a single function
有人可能喜欢将所有实现放在一个函数中,通过使用以下 API 可达到此目的:
1
2
|
int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);
|
比如可将之前的示例改写如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
void *threadmain(void *arg)
{
RedisModuleBlokcedClient *bc = arg;
/* Wait one second and unblock,
* You can think at it as an actually expansive operation of some kind */
sleep(1);
long *mynumber = RedisModule_Alloc(sizeof(long));
*mynumber = rand();
RedisModule_UnblockClient(bc, mynumber);
}
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (RedisModule_IsBlockedReplyRequest(ctx))
{
long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx, mynumber);
}
else if (RedisModule_IsBlockedTimeoutRequest(ctx))
{
return RedisModule_ReplyWithNull(ctx);
}
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);
pthread_t tid;
if (pthread_create(&tid, NULL, threadmain, bc) != 0)
{
RedisModule_AbortBlock(bc);
RedisModule_ReplyWithError(ctx, "Sorry can't create a thread");
}
return REDISMODULE_OK;
}
|