博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于共享内存和多重哈希实现分布式缓存系统
阅读量:6821 次
发布时间:2019-06-26

本文共 6224 字,大约阅读时间需要 20 分钟。

写在前面:

前三篇文字<<>>,<<>>,<<>>主要叙述了MQTT协议的编解码以及基于MQTT协议的一些常见应用场景,并以一个简单的消息推送系统作为例子具体阐述了Mqtt Broker部分的实现,具体包括通过Mqtt,以及Mqtt Over WebSocket的方式打通各个端的数据通信,本篇文字继续以消息推送系统作为例子,阐述其状态系统的实现,具体的消息推送系统架构草图,需要参考回看第一篇文字,不再赘述.

 

1.缓存技术:

常见缓存技术主要有redis,memcached等,甚至也可使用mongodb或者mq,leveldb或者rocksdb等作为缓存,不同技术都有其优缺点,以redis为例,其可以通过代理和集群,实现分布式缓存系统,可以根据业务需要选择是否持久化,支持多种数据结构以及事务,但其服务器内存会是瓶颈,以及在处理一些复杂数据结构时,比如缓存用户登录状态,包括用户的Appid,设备token,是否切换为后台等等,这就可能需要不停地序列化和反序列化,比如redis的hash和sortedset结构等.再比如查找时,以sortedset为例,其底层是基于跳跃表来实现的,查找时间复杂度O(lgn),存储的数据越多,平均查询时间也会越长.所以,需要根据自身业务需求来选用合适技术,作为例子,这里基于共享内存和多重哈希来实现这样一个分布式缓存系统.

 

2.状态系统具体实现:

2.1状态系统架构草图:

 

 

2.2状态系统实现细节说明:

(1)整体架构主要包括负载均衡器,服务发现,MQ队列,缓存集群主从节点,DB;

(2)不同集群节点跨机房或者跨IDC部署,MQ队列用于不同集群之间写数据同步;

(3)每个集群节点包括一主多从节点,图示以一主一从为例示意,主和从节点都包括gm模块形成链表结构进行数据同步,具体可参考RabbitMQ镜像队列相关原理;

(4)主从节点都包括shm模块,即共享内存模块,用于存储状态数据;

(5)基于哈希结构存储数据,以达O(1)时间复杂度,通过多重哈希和最老节点覆盖写机制,解决哈希冲突;

(6)用户登录存储状态数据,登出删除状态数据,每个用户状态都同时存储其存取时间戳,用于过期淘汰或者扩容时做相关判断;

(7)以用户的Appid,Token等为例进行数据缓存;

(8)DB根据业务需要决定是否分库分表;

(9)其他...

 

2.3状态系统代码实现(shm模块,跟之前几篇文字基于golang编写不同,这里基于C/C++编写):

2.3.1定义用户状态数据结构体,以及哈希结构体头部,如下:

const std::string GROCACHE_SHMFILE= "/shm/HashTableShmGroCache";//存储的用户数量,作为例子,这里设置为100,实际应用需要根据自身应用的用户数量合理设置const uint32_t GROCACHE_HASHNODE_CNT = 100; //计算每个用户状态节点占用的字节数: 4+8+4+4+4+4+4+4+4+4+32const uint32_t GROCACHE_HASHNODE_SIZE = 76; const uint32_t GROCACHE_MAXROW_CNT = 4; //定义用户状态数据结构体#pragma pack (1)struct GroCacheNode{    uint32_t hashKey;        //计算哈希值    uint64_t tinyid;    uint32_t appid;    uint32_t settoken;    uint32_t background;    uint32_t sdkappid;    uint32_t busiid;    uint32_t unreadcnt;    uint32_t accessTime;    //数据存取时间    uint32_t tokenlen;    uint8_t token[0];}; //定义哈希结构体头部#pragma pack (1)struct GroCacheHeader{    uint32_t hashMemSize;                       // hashMemSize    uint32_t hashNodeSize;                      // size of each hash node    uint32_t hashNodeCnt;                       // num of all hash node    uint32_t rowCnt;                            // row`s num    uint32_t rowsNodeCnt[GROCACHE_MAXROW_CNT];  // node num of each row's    uint32_t rowsStartNode[GROCACHE_MAXROW_CNT];// each row's starting node index    uint32_t maxUsedRow;                        // max row index (start at 0) in use    uint32_t usedCnt;};

 

2.3.2根据预估用户数量开辟共享内存大小,并计算多重哈希每一级存储的节点数量比例,第一级哈希数组会存储将近70%的数据节点:

//开辟并初始化共享内存,保存共享内存元数据 bool MultiHashTable::initGC(){    bool iRet = true;    do    {        void* pShm = NULL;        string shmFileName = GROCACHE_SHMFILE;        //计算需要分配的共享内存大小,以字节为单位        uint32_t hashMemSize = sizeof(GroCacheHeader) + (GROCACHE_HASHNODE_CNT * GROCACHE_HASHNODE_SIZE);        bool exist = false;      //真正申请开辟共享内存        if(false == ShareMemory::Instance()->create(&pShm, shmFileName, hashMemSize, exist))        {            iRet = false;            break;        }        _grocacheHeader = (GroCacheHeader*)pShm;        ...        if(false == exist)        {            printf("share memory first create, hashtable need initial\n");            memset(_grocacheHeader, 0, sizeof(*_grocacheHeader));            _grocacheHeader->hashMemSize = hashMemSize;            _grocacheHeader->hashNodeSize = GROCACHE_HASHNODE_SIZE;            _grocacheHeader->rowCnt = GROCACHE_MAXROW_CNT;         //通过CalcNodeCntForEachRow函数计算每一级哈希存储的节点数量,以及每一级哈希的起始索引位置            _grocacheHeader->hashNodeCnt = CalcNodeCntForEachRow(GROCACHE_MAXROW_CNT, GROCACHE_HASHNODE_CNT, _grocacheHeader->rowsNodeCnt, _grocacheHeader->rowsStartNode);            _grocacheHeader->maxUsedRow = 0;            _grocacheHeader->usedCnt = 0;        }        //header+ht        _grocacheHt = _grocacheHeader + 1;        printf("_grocacheHeader hashMemSize:%u,hashNodeSize:%u,hashNodeCnt:%u,maxUsedRow:%u\n",                _grocacheHeader->hashMemSize,_grocacheHeader->hashNodeSize,_grocacheHeader->hashNodeCnt,_grocacheHeader->maxUsedRow);    }while(false);    return iRet;}

 

2.3.3开辟共享内存,不同系统共享内存大小有限制,根据需要决定是否修改相关系统配置:

bool ShareMemory::create(void** pShm, const std::string& shmFileName, const uint32_t& shmSize, bool& exist, bool reset){    exist = false;    printf("create shmFileName:%s, shmSize:%u\n", shmFileName.c_str(), shmSize);    bool iRet = false;    do    {        uint32_t shmKey = 0;        if(false == getShmKey(shmFileName, shmKey))        {            printf("create getShmKey failed,shmFileName:%s\n", shmFileName.c_str());            break;        }        int shmFlag = 0666 | IPC_CREAT;        int shmId = shmget(shmKey, 0, 0);        if(-1 == shmId)        {            //no existed shm, need to create a new one            printf("create a new share memory shmSize:%d\n",shmSize);            shmId = shmget(shmKey, shmSize, shmFlag);            if(-1 == shmId)            {                printf("create shmget failed errno=%d , errmsg=%s\n", errno, strerror(errno));                break;            }        }        ...      if(reset)        {            memset(*pShm, 0, shmSize);            exist = false;        }        iRet = true;    }while(false);    return iRet;}

 

3.测试例子:

#include 
#include "MultiHashTable.h"using namespace std;int main() {
   //初始化用户状态节点 GroCacheNode gcSetNode; gcSetNode.appid = 666; gcSetNode.settoken = 1; gcSetNode.background = 1; gcSetNode.sdkappid = 999; gcSetNode.busiid = 9999; gcSetNode.unreadcnt = 0; string token = "user_device_token";     //模拟用户登录,存储用户状态数据 uint64_t uin = 888888888; uint32_t hashKey = 99; MultiHashTable::instance()->SetGroCache(hashKey, uin, gcSetNode, token); //修改用户推送消息未读计数 bool iRet = false; uint32_t unreadcnt = 99; iRet = MultiHashTable::instance()->SetGroCacheUnReadCnt(hashKey, uin, unreadcnt); printf("[main]SetGroCacheUnReadCnt iRet:%d\n", iRet);    //读取用户推送消息未读计数 uint32_t getunreadcnt = 0; iRet = MultiHashTable::instance()->GetGroCacheUnReadCnt(hashKey, uin, getunreadcnt); printf("[main]GetGroCacheUnReadCnt iRet:%d, getunreadcnt:%d\n", iRet, getunreadcnt); return 0;}

 

4.运行测试结果,如下图所示,表明创建了共享内存文件,并计算了每一级哈希的节点数量,最后插入一条用户状态数据,并存取相关未读计数值:

 

出于篇幅考虑,上述使用到的具体一些函数,如MaxPrime,CalcNodeCntForEachRow等,其具体实现就不一一列举出来了,也有一些成员变量,用到了也不一一具体注释了,主要通过架构图和具体代码关键路径叙述实现的一些细节,阐明主要设计思路以及解决问题的主要矛盾,如有错误,恳请指出,转载也请注明出处!!!

 

未完待续...

参考文字:

转载于:https://www.cnblogs.com/huatuo/p/9334510.html

你可能感兴趣的文章
【算法】最短路径之A*搜索
查看>>
【Android网络开发の5】Android中的网络数据下载
查看>>
linux终端使用python的matplotlib模块画图出现“could not open display”问题解决
查看>>
9月国内浏览器市场份额大战:IE份额上升至48.45%
查看>>
Tapestry 教程(五)实现Hi-Lo猜谜游戏
查看>>
2015年12月国内网民地域分布12强:湖北跻身上榜
查看>>
mysql-5.6安装
查看>>
LNMP环境搭建 Ubuntu篇
查看>>
设置低版本VDA注册高版本DDC
查看>>
multi-process script for ping host
查看>>
云数据库SQL Server 2008 R2版推出OSS版本数据上云
查看>>
Android 侵权案下周复审
查看>>
shell基础知识;
查看>>
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查
查看>>
SQL使用中的优化
查看>>
2014年下半年信息系统项目管理师上午试题试题与答案 43
查看>>
centos版本查询
查看>>
python 实战 登录注册
查看>>
南桥和北桥
查看>>
键盘事件
查看>>