Redis Modules 入门。


Introduction to Redis modules

原文地址

加载、卸载和查看 Modules

加载

Modules 有两种加载方式:

  • redis.conf 中设置:loadmodule /path/to/mymodule.so
  • 通过 redis command 实时加载:MODULE LOAD /path/to/mymodule.so

也可在加载模块时传递参数,如: loadmodule(MODULE LOAD) /path/to/mymodule.so 765 PRO FIGHT

卸载

卸载 Modules:MODULE UNLOAD mymodule,如果需要在卸载模块之前进行内存清理之类的操作,可以通过实现 RedisModule_OnUnload 函数来完成,此函数接口为:

1
int RedisModule_OnUnload(RedisModuleCtx *ctx);

也可以通过在 RedisModule_OnUnload 中直接返回 REDISMODULE_ERR 禁止模块被卸载。

查看

列出已加载的 Modules: MODULE LIST

模块示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include "redismodule.h"
#include <stdlib.h>

/* The function that implements the command must have the following prototype:
 * int mycommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) */
int HelloworldRand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    RedisModule_ReplyWithLongLong(ctx, rand());
    return REDISMODULE_OK;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    /* It should be the first function called by the module OnLoad function.
     * Init function prototype:
     * @return int
     * @param1: RedisModuleCtx *ctx
     * @param2: const char *modeulename
     * @param3: int module_version (reported by MODULE LIST)
     * @param4: int api_version
     * */
    if (RedisModule_Init(ctx, "helloworld", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
    {
        return REDISMODULE_ERR;
    }

    /* It used in order to register commands into the Redis core.
     * CreateCommand function prototype:
     * @return int
     * @param1: RedisModuleCtx *ctx
     * @param2: const char *name
     * @param3: RedisModuleCmdFunc cmdfunc
     * @param4: const char *strflags
     * @param5: int firstkey
     * @param6: int lastkey
     * @param7: int keystep
     * */
    if (RedisModule_CreateCommand(ctx, "helloworld.rand", HelloworldRand_RedisCommand,
                                  "fast random", 0, 0, 0) == REDISMODULE_ERR)
    {
        return REDISMODULE_ERR;
    }

    return REDISMODULE_OK;
}

RedisModuleString

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// accesss a string
const char *RedisModule_StringPtrLen(RedisModuleString *string, size_t *len);

// create new string
RedisModuleString *RedisModule_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len);

// create string from number
RedisModuleString *mystr = RedisModule_CreateStringFromLongLong(ctx, 10);

// parse string as number
long long myval;
if (RedisModule_StringToLongLong(ctx, argv[1], &myval) == REDISMODULE_OK)
{
    /*Do something with myval*/
}

// free a string which created by RedisModule_CreateString
void RedisModule_FreeString(RedisModuleString *str);
  • 如果不想手动 free 的话,可以开启自动内存管理
  • 通过 argv 数组传递的参数不要 free

返回响应

在模块中实现的命令可以通过 RedisModule_ReplyWith<something> 接口返回响应至客户端。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Return an error */
RedisModule_ReplyWithError(RedisModuleCtx *ctx, const char *err);

/* Relpy with a long long */
RedisModule_ReplyWithLongLong(RedisModuleCtx *ctx, long long res);

/* Reply with a simple string, that can't contain binary values or newlines */
RedisModule_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *res);

/* Reply with bulk strings that are binary safe */
int RedisModule_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len);
int RedisModule_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str);

/* Reply with an array, you just need to use a function to
 * emit the array length, followed by as many calls to the
 * above functions as the number of elements of array are. */
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithStringBuffer(ctx, "age", 3);
RedisModule_ReplyWithLongLong(ctx, 22);

/* Returning arrays with dynamic length,
 * It is possible to have multiple nested arrays with postponed reply.
 * Each call to ReplySetArrayLength() will set the length of the latest
 * corresponding call to to ReplyWithArray() */
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
... generate 100 elements ...
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
... generate 10 elements ...
RedisModule_ReplySetArrayLength(ctx, 10);
RedisModuel_ReplySetArrayLength(ctx, 100);

请求有效性检测

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/* Check the number of arguments */
if (argc != 2)
{
    return RedisModule_WrongArity(ctx);
}

/* Check the type of key */
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1],
    REDISMODULE_READ|REDISMODULE_WRITE);
int key_type = RedisModule_KeyType(key);
if (key_type != REDISMODULE_KEYTYPE_STRING && key_type != REDISMODULE_KEYTYPE_EMPTY)
{
    RedisModule_CloseKey(key);
    return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}

Low level access to keys

访问 Redis 数据库有两种 APIs:

  • low level APIs: 可直接操作 Redis 数据结构,速度快
  • high level APIs:使用 RedisModule_Call() 函数 call Redis commands

在请求有效性检测中,我们已经看过了 low level 操作的简单示例,这些 APIs 为了保证运行速度,并未做很多 run-time checks,因此需要遵守下述规则:

  • 同时 Open 相同的 key,并且其中有写操作的话,会产生未定义行为或 crashes
  • 当一个 key 被 Open 时,应该只使用 low level APIs 访问,不可同时和 RedisModule_Call() 混用

Open a key for access

1
2
3
4
5
RedisModuleKey *key;
/* Prototype: void* RedisModule_OpenKey(RedisModuleCtx *ctx,
 *                                      RedisModuleString *keyname, int mode);
 * */
RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ;
  • mode 可以是 REDISMODULE_READREDISMODULE_WRITE
  • 打开一个不存在的 key 时
    • REDISMODULE_WRITE:创造新 key,例如:

      1
      2
      3
      4
      5
      6
      
      RedisModuleKey *key;
      key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
      if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY)
      {
          RedisModule_StringSet(key, argv[2]);
      }
      
    • REDISMODULE_READ: 返回 NULL

Getting the key type

1
int key_type = RedisModule_KeyType(key);

返回值种类:

  • REDISMODULE_KEYTYPE_EMPTY
  • REDISMODULE_KEYTYPE_STRING
  • REDISMODULE_KEYTYPE_LIST
  • REDISMODULE_KEYTYPE_SET
  • REDISMODULE_KEYTYPE_ZSET
  • REDISMODULE_KEYTYPE_HASH
  • REDISMODULE_KEYTYPE_MODULE
  • REDISMODULE_KEYTYPE_STREAM

Managing key expires(TTLs)

1
2
3
4
5
6
7
8
/* Query the current expire of an open key. Return the time to live of the key
 * in milliseconds, or REDISMODULE_NO_EXPIRE as a special value to signal the
 * key has no associated or does not exist at all. */
mstime_t RedisModule_GetExpire(RedisModuleKey *key);

/* Change the expire of a key
 * When called on a non existing key, REDISMODULE_ERR is returned. */
int RedisModule_SetExpire(RedisModuleKey *key, mstime_t expire);

Deleting keys

1
2
/* return REDISMODULE_ERR if the key is not open for writing */
RedisModule_DeleteKey(key);

Close a key

1
RedisModule_CloseKey(key);

当自动内存管理功能开启时,也可以不用 close keys。

Obtaining the length of values

1
2
// return the length of values associated to an open key
size_t len = RedisModule_ValueLength(key);
  • string: 返回的是字符串长度
  • list, set, zsethash:返回的是元素个数
  • key 不存在时返回 0

String type API

1
2
/* Setting a new string value, this function works exactly like SET cmd */
int RedisModule_StringSet(RedisModuleKey *key, RedisModuleString *str);

通过 DMA(direct memory access) 访问字符串类型效率更高,DMA 方位期间该 key 不可有其他操作,否则 DMA 指针非法。

1
2
3
4
5
6
size_t len, j;
char *myptr = RedisModule_StringDMA(key, &len, REDISMODULE_WRITE);
for (j = 0; j < len; j++)
{
    myptr[j] = 'A';
}

可通过以下接口改变字符串长度:

1
RedisModule_StringTruncate(mykey, 1024);
  • 如果该 key 操作之前不存在,则会创造新 key
  • 字符串长度扩展时,会填充 zero bytes

更多数据类型的 APIs 可见 Modules API reference。

High level access to keys – Calling Redis commands

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
RedisModuleCallReply *reply;
/* @return: RedisModuleCallReply on sucess, NUll on error
 * @param1: context
 * @param2: a null terminated C string with the command name
 * @param3: the format specifier where each character corresponds to
 *          the type of the arguments that will follow  */
reply = RedisModule_Call(ctx, "INCRBY", "sc", argv[1], "10");

/* Accesss a RedisModuleCallReply object */
if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_INTEGER)
{
    long long myval = RedisModule_CallReplyInteger(reply);
    /* Do something with myval */
}

格式说明符

  • c – Null terminated C string pointer
  • b – C buffer, two arguments needed: C string pointer and size_t length
  • s – RedisModuleString as received in argv or by other Redis module APIs returning a RedisModuleString object
  • l – Long long integer
  • v – Array of RedisModuleString objects
  • ! – This modifier just tells the function to replicate the command to replicas and AOF. it is ignored from the point of view of arguments parsing
  • A – This modifier, when ! is given, tells to suppress AOF propagation: the command will propagated only to replicas
  • R – This modifier, when ! is given, tells to suppress replicas propagation: the command will be propagated only to the AOF if enabled

返回值

函数执行成功时返回 RedisModuleCallReply 对象,失败时返回 NULL 并将 errno 设置为以下值:

errno meaning
EBADF wrong format specifier.
EINVAL wrong command arity.
ENOENT command does not exist.
EPERM operation in Cluster instance with key in non local slot.
EROFS operation in Cluster instance when a write command is sent in a readonly state. ENETDOWN

RedisModuleCallReply

Reply type Meaning
REDISMODULE_REPLY_STRING Bulk string or status replies
REDISMODULE_REPLY_ERROR Errors
REDISMODULE_REPLY_INTEGER Signed 64-bit integers
REDISMODULE_REPLY_ARRAY Array of replies
REDISMODULE_REPLY_NULL NULL reply
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* Strings, errors and arrays hava an associated length.
 * To obtain the reply length: */
size_t reply_len = RedisModule_CallReplyLength(reply);

/* Accesss strings or errors */
size_t len;
char *ptr = RedisModule_CallReplyStringPtr(reply, &len);

/* Accesss sub elements of array */
RedisModuleCallReply *sub_reply;
sub_reply = RedisModule_CallReplyArrayElement(reply, idx);

/* Obtain the value of an integer reply */
long long reply_integer_val = RedisModule_CallReplyInteger(reply);

/* RedisModuleCallReply are not the same as RedisModuleString type
 * create a new string object from a call reply of type string, error or integer
 * */
RedisModuleString *mystr = RedisModule_CreateStringFromCallReply(reply);

/* Free a reply object
 * For arrays, you need to free only the top level reply, not the nested replies.
 * */
void RedisModule_FreeCallReply(RedisModuleCallReply *reply);

Replicating commands

当使用 high level APIs 访问 key,通过加 ! 格式说明符,命令自动被复制,如:

1
reply = RedisModule_Call(ctx, "INCRBY", "!sc", argv[1], "10");

当使用 low level APIs 时,可通过以下俩个接口复制命令:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/* When you use the follow API, you should not use any other replication
 * function since they are not guaranteed to mix well. */
RedisModule_ReplicateVerbatim(ctx);

/* Exactly tell Redis what commands to replicate as the effect of the command
 * execution. It's possible to call RedisModule_Replicate multiple times,
 * and each will emit a command. All the sequence emitted is wrapped between
 * a MULTI/EXEC transaction, so that the AOF and replication effects are
 * the same as executing a single command. */
RedisModule_Replicate(ctx, "INCRBY", "cl", "foo", my_increment);

Automatic memory management

启用自动内存管理,只需在命令实现函数的开头加上 RedisModule_AutoMemory(ctx); 即可,虽然会有些性能损耗,但是你可以不再需要手动 close open keys, free repliesfree RedisModuleString objects

Allocating memory into modules

虽然在模块中可以调用 malloc()free() 函数,但是通过 malloc() 分配的内存并不会被计算到 used_memory 信息中去,也不会受到 maxmemory 限制,所以应首先考虑使用 Redis Modules 提供的 APIs。而且,使用 APIs 分配内存实现的自定义数据结构也可以被 RDB 加载函数正确地反序列化。

1
2
3
4
5
6
7
8
9
void *RedisModule_Alloc(size_t bytes);
void* RedisModule_Realloc(void *ptr, size_t bytes);
void RedisModule_Free(void *ptr);
void RedisModule_Calloc(size_t nmemb, size_t size);
char *RedisModule_Strdup(const char *str);

/* Pool allocator
 * the memory allocated is automatically released when the command returns. */
void *RedisModule_PoolAlloc(RedisModuleCtx *ctx, size_t bytes);

Implementing native data types

原文地址

在模块中实现类似 Redis 原生的数据类型需要满足以下几点:

  • The implementation of some kind of new data structure and of commands operating on the new data structure.
  • A set of callbacks that handle: RDB saving, RDB loading, AOF rewriting, releasing of a value associated with a key, calculation of a value digest (hash) to be used with the DEBUG DIGEST command.
  • A 9 characters name that is unique to each module native data type.
  • An encoding version, used to persist into RDB files a module-specific data version, so that a module will be able to load older representations from RDB files.

Registering a new data types

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Declare a global variable that will hold a reference to the data type */
static RedisModuleType *MyType;
#define MYTYPE_ENCODING_VERSION 0

int RedisModule_OnLoad(RedisModuleCtx *ctx)
{
    /* The follow set of methods must be passed.
     * while .digest and .mem_usage are optinal */
    RedisModuleTypeMethods tm =
    {
        .version = REDISMODULE_TYPE_METHOD_VERSION,
        .rdb_load = MyTypeRDBLoad,
        .rdb_save = MyTypeRDBSave,
        .aof_rewrite = MyTypeAOFRewrite,
        .free = MyTypeFree
    };

    /* 9 character name, character set {A-Z, a-z, 0-9, _, -}*/
    MyType = RedisModule_CreateDataType(ctx, "MyType-AZ",
                                        MYTYPE_ENCODING_VERSION, &tm);
    if (MyType == NULL)
    {
        return REDISMODULE_ERR;
    }
}

Prototypes of type methods

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/* mandatory */
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof,
                                           RedisModuleString *key, void *value)
typedef void (*RedisModuleTypeFreeFunc)(void *value)

/* optinal */
typedef size_t (*RedisModuleTypeMemUsageFunc)(void *value)
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value)
  • rdb_load is called when loading data from the RDB file. It loads data in the same format as rdb_save produces.
  • rdb_save is called when saving data to the RDB file.
  • aof_rewrite is called is called when the AOF is being rewritten, and the module needs to tell Redis what is the sequence of commands to recreate the content of a given key.
  • free is called when a key with the module native type is deleted via DEL or in any other mean, in order to let the module reclaim the memory associated with such a value.
  • digest is called when DEBUG DIGEST is executed and a key holding this module type is found. Currently this is not yet implemented so the function can be left empty.
  • mem_usage is called when the MEMORY command asks for the total memory consumed by a specific key, and is used in order to get the amount of bytes used by the module value.

为什么名字要 9 个字符?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/* Now RDB files are sequences of key-value pairs like the following: */
---------------------------------------------
| 1 byte type | key | a type specific value |
---------------------------------------------

/* in module data, prefix a type specific value with 64-bit integer
 * 64 = 9 * 6 + 10, 9 characters of 6 bits, 10 bits are used in oreder
 * to store the encoding version of the type */
------------------------------------------
| 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 10 |
------------------------------------------

Seting and getting keys

使用 low level APIs 去操作自定义的数据结构,例子如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1],
                                          REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
/* Check the key */
if (type != REDISMODULE_KEYTYPE_EMPTY &&
    RedisModule_ModuleTypeGetType(key) != MyType)
{
    return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}

struct some_private_struct *data;
if (type == REDISMODULE_KEYTYPE_EMPTY)
{
    /* Create an empty value object if the key is currently empty. */
    data = createMyDataStructure();
    RedisModule_ModuleTypeSetValue(key, MyType, data);
}
else
{
    /* Retrieve the private data from a key */
    data = RedisModule_ModuleTypeGetValue(key);
}

/* Do something with data */

Free method

1
2
3
4
5
/* a simple impl of the free method */
void MyTypeFreeCallback(void *value)
{
    RedisModule_Free(value);
}

RDB load and save methods

当调用 RDB saving 或 loading 时,我们需要给出自定义数据类型在磁盘上的排布方式,Redis 提供了在 RDB 文件中存储以下数据类型的 high level APIs:

  • Unsigned 64-bit integers
  • Signed 64-bit integers
  • Doubles
  • Strings

APIs 如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void RedisModule_SaveUnsigned(RedisModuleIO *io, uint64_t value);
uint64_t RedisModule_LoadUnsigned(RedisModuleIO *io);

void RedisModule_SaveSigned(RedisModuleIO *io, int64_t value);
int64_t RedisModule_LoadSigned(RedisModuleIO *io);

void RedisModule_SaveDouble(RedisModuleIO *io, double value);
double RedisModule_LoadDouble(RedisModuleIO *io);

void RedisModule_SaveString(RedisModuleIO *io, RedisModuleString *s);
RedisModuleString *RedisModule_LoadString(RedisModuleIO *io);
void RedisModule_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len);
char *RedisModule_LoadStringBuffer(RedisModuleIO *io);

一个简单的例子如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct double_array
{
    size_t count;
    double *values;
};

/* rdb_save */
void DoubleArrayRDBSave(RedisModuleIO *io, void *ptr)
{
    struct double_array *da = ptr;
    RedisModule_SaveUnsigned(io, da->count);
    for (size_t j = 0; j < da->count; j++)
    {
        RedisModule_SaveDouble(io, da->values[j]);
    }
}

/* rdb_load */
void *DoubleArrayRDBLoad(RedisModuleIO *io, int encver)
{
    if (encver != DOUBLE_ARRAY_ENC_VER)
    {
        /* We should actually log an error here, or try to implement the ability
         * to load older versions of our data structure */
        return NULL;
    }

    struct double_array *da;
    da = RedisModule_Alloc(sizeof(*da));
    da->count = RedisModule_LoadUnsigned(io);
    da->values = RedisModule_Alloc(da->count * sizeof(double));
    for (size_t j = 0; j < da->count; j++)
    {
        da->values[j] = RedisModule_LoadDouble(io);
    }
    return da;
}

AOF rewriting

1
void RedisModule_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...);

Handling multiple encodings

TODO

Allocating memory

自定义数据结构的内存管理建议使用 Redis Module 提供的 APIs:

  • 自定义数据结构使用的内存会被计算在 used_memory 内,方便 Redis 进行内存管理
  • Redis 使用的是 jemalloc 分配器,可以降低内存碎片率
  • 与 Redis 内存分配器相同时,从 RDB loading 函数中返回的值无需再次拷贝,可直接访问使用

Blocking operations with modules

原文地址

How blocking and resuming works

可使用以下接口让客户端进入 blocked state:

1
2
3
4
5
6
RedisModuleBlockedClient *RedisModule_BlockClient(
    RedisModuleCtx *ctx,
    RedisModuleCmdFunc reply_callback,
    RedisModuleCmdFunc timeout_callback,
    void (*free_private)(void*),
    long long timeout_ms);
  • 该函数返回的 RedisModuleBlockedClient 对象,用于之后 unblock 该客户端
  • ctx is the command execution context as usually in the rest of the API
  • reply_callback is the callback, having the same prototype of a normal command function, that is called when the client is unblokced in order to return a reply to the client
  • timeout_callback is the callback, having the same prototype of a normal command function that is called when the client reached the timeout_ms timeout.
  • free_private is the callback that is called in order to free the private data. Private data is a pointer to some data that is passed between the API used to unblock the client, to the callback that will send the reply to the client.
  • timeout_ms is the timeout in milliseconds. When the timeout is reached, the timeout callback is called and the client is automatically aborted.

可使用以下接口 unblock 客户端:

1
2
/* This function is thread safe */
int RedisModule_UnblockClient(RedisModuleBlokcedClient *bc, void *privdata);
  • 在客户端解除阻塞之前,会立即调用在客户端被阻塞时指定的 reply_callback 函数:该函数将有权访问此处使用的 privdata 指针。
  • 在客户端解除阻塞之后,会自动调用在客户端被阻塞时指定的 free_private 函数释放 privdata 占用的空间

一个简单示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    return RedisModule_ReplyWithSimpleString(ctx, "765PRO! FIGHT!");
}

int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    return RedisModule_ReplyWithNull(ctx);
}

void *threadmain(void *arg)
{
    RedisModuleBlockedClient *bc = arg;

    /* Wait one second and unblock,
     * You can think at it as an actually expansive operation of some kind */
    sleep(1);
    RedisModule_UnblockClient(bc, NULL);
}

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);

    pthread_t tid;
    pthread_create(&tid, NULL, threadmian, bc);

    return REDISMODULE_OK;
}

Passing reply data when unblocking

一般而言阻塞客户端的目的是为了等待某些计算的结果,并将结果回复给客户端,该结果往往是在 unblocked 时传递给 reply_callback 的,我们将上述示例进行简单修改,以演示传递 reply 的过程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    /* Obtain the private data */
    long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
    /* Don't free mynumber here, but in the free private data callback */
    return RedisModule_ReplyWithLongLong(ctx, mynumber);
}

int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    return RedisModule_ReplyWithNull(ctx);
}

void free_privdata(void *privdata)
{
    RedisModule_Free(privdata);
}

void *threadmain(void *arg)
{
    RedisModuleBlokcedClient *bc = arg;

    /* Wait one second and unblock,
     * You can think at it as an actually expansive operation of some kind */
    sleep(1);
    long *mynumber = RedisModule_Alloc(sizeof(long));
    *mynumber = rand();
    RedisModule_UnblockClient(bc, mynumber);
}

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);

    pthread_t tid;
    pthread_create(&tid, NULL, threadmian, bc);

    return REDISMODULE_OK;
}
  • privdata 的释放必须放置在 free_privdata callback 中,因为 reply_callback 可能因为客户端连接超时或断开导致无法被调用
  • timeout_callback 中,也可通过 GetBlockedClientPrivateData() 访问 privdata

Aborting the blocking of a client

当实现 Redis Command 的函数中,当申请资源发生错误时,我们既不想 block 客户端,也不想触发 reply_callback,此时我们最好的选择可能是使用以下接口:

1
int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);

这时,我们可以将上一小节的 Example_RedisCommand 函数改写如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);

    pthread_t tid;
    if (pthread_create(&tid, NULL, threadmain, bc) != 0)
    {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx, "Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

Implementing the command, reply and timeout callback using a single function

有人可能喜欢将所有实现放在一个函数中,通过使用以下 API 可达到此目的:

1
2
int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);

比如可将之前的示例改写如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void *threadmain(void *arg)
{
    RedisModuleBlokcedClient *bc = arg;

    /* Wait one second and unblock,
     * You can think at it as an actually expansive operation of some kind */
    sleep(1);
    long *mynumber = RedisModule_Alloc(sizeof(long));
    *mynumber = rand();
    RedisModule_UnblockClient(bc, mynumber);
}

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
    if (RedisModule_IsBlockedReplyRequest(ctx))
    {
        long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
        return RedisModule_ReplyWithLongLong(ctx, mynumber);
    }
    else if (RedisModule_IsBlockedTimeoutRequest(ctx))
    {
        return RedisModule_ReplyWithNull(ctx);
    }

    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx, reply_func, timeout_func, NULL, 0);

    pthread_t tid;
    if (pthread_create(&tid, NULL, threadmain, bc) != 0)
    {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx, "Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}