E012

调用orsci-art中的ARTCenter,实现带有集中中心的分布式网络编程演示。

 

#include "stdafx.h"
#include "Test_ARTCenter.h"
#include "ARTCenter.h"
#include <fstream>
#include "ARTUDP.h"

namespace __Test_ARTCenter__
{
using namespace art;

FILE * desFile;

int xxyy = 0;

int recvFileSize = 0;


class TMyARTCenter : public TARTCenter
{
private:

public:
TMyARTCenter() : TARTCenter()
{
}
~TMyARTCenter() {}

public:
virtual void OnARTLogin(const int & AARTID, const string & APeerIP, const TPORTTYPE & APeerPort) override
{
cout << "[OnARTLogin]收到一个新的用户加入:" << AARTID << "\t" << APeerIP << " : " << APeerPort << endl;

} //哪个Star连接了。
virtual void OnARTLogout(const int & AARTID, const string & APeerIP, const TPORTTYPE & APeerPort) override
{
cout << "[OnARTLogout]收到一个用户注销:" << AARTID << "\t" << APeerIP << " : " << APeerPort << endl;
} //哪个Star断开了。

public:
virtual void OnReceiveP2P(const string & APeerIP, const TPORTTYPE & APeerPort, TPIPEComPackage * & pRecvPackage)
{ //本事件触发接收一个PIPECom包
//cout << "[ART]收到P2P包:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 包大小: " << pRecvPackage->headlen + pRecvPackage->size() << endl;
//cout << pRecvPackage->readstring() << endl;
//TPIPEComPackagePool::LaybackPackage(pRecvPackage); //测试外界释放的效果!
recvFileSize += pRecvPackage->size();
if (xxyy == 1){ fwrite(pRecvPackage->bodybuf, sizeof(char), pRecvPackage->size(), desFile); fflush(desFile);}
else cout << "[ART]a收到P2P包:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 包大小: " << pRecvPackage->ct_headlen + pRecvPackage->size() << endl;
//desFile << pRecvPackage->readstring();
//desFile << endl;
} //当接收到数据时

public: //真正有用的事件
virtual void OnReceiveRSM(const int & APeerARTID, const int & ATransctionID, const int & AFunctionID, TMemoryStreamTool & recvStream, const int & APeerEntityID, const int & ALocalEntityID) override
{ //本事件触发接收一个PIPECom包
cout << "[MyART]OnReceiveRSM:ARTID" << APeerARTID << " 实体号:" << APeerEntityID << " 本地实体号:" << ALocalEntityID << " 包大小: " << recvStream.size() << endl;
cout << "接收的字符串为:" << recvStream.readstring() << endl;
} //当接收到数据时
virtual void OnReceiveRPC(const int & APeerARTID, const int & ATransctionID, const int & AFunctionID, TMemoryStreamTool & recvStream, EARTOnServiceReturnFlag & retServicedFlag, TMemoryStreamTool & retResult, const int & APeerEntityID, const int & ALocalEntityID) override
{ //本事件触发接收一个PIPECom包
//++ xxyy;
//retResult.writestring("[RPC Reply]新年好!");
//return;
retServicedFlag = sv_Success;
++ xxyy;
//return; //试验高速访问的速度
cout << "[MyART]OnReceiveRPC:ARTID=" << APeerARTID << " 实体号:" << APeerEntityID << " 本地实体号:" << ALocalEntityID << " 包大小: " << recvStream.size() << endl;
TPIPECom::msg("ARTID" + jw::IntToStr(APeerARTID), "包大小!" + TARTTools::toStr(recvStream.size()));
retResult.writestring("[RPC Reply]新年好!");
cout << "对方发送过来的消息是:" << recvStream.readstring() << endl;
//cout << "我再读一次,看看引发异常后的跟踪是否好用!" << endl;
//recvStream.readstring();
} //当接收到数据时

virtual void OnBeforeVisitShareMem(const int & APeerARTID, const string & AShareName, const int ADataType, const EARTVisitMemType AVisitType) override
{
//cout << "[MyART]OnBeforeReadShareMem:ARTID=" << APeerARTID << " 变量名: " << AShareName << endl;
}
virtual void OnAfterVisitShareMem(const int & APeerARTID, const string & AShareName, const int ADataType, const EARTVisitMemType AVisitType, const EARTMEMResult AMEMOprState) override
{
//return; //为了进行评测
cout << "[MyART]OnAfterVisitShareMem:ARTID=" << APeerARTID << " 变量名: " << AShareName << " 读写类型:" << AVisitType << " 成功标记:" << AMEMOprState << endl;
}

public:
virtual void OnReceiveFILEBegin(const string & APeerIP, const TPORTTYPE & APeerPort, const string & ASuggestFileName, const long long & AFileSize, const string & AAttachInfo, bool & retPermitTransFlag, string & retLocalRealName)
{
cout << "[TARTStar::OnReceiveFILEBegin]收到传输文件请求(下面允许接收):IP地址" << APeerIP << " -- 端口 " << APeerPort << " 文件名: " << ASuggestFileName << " 文件大小:" << AFileSize << endl;
retPermitTransFlag = true;
}
//默认不接收文件传输。即retPermitTransFlag == false。需要用户接管该事件,然后设置retPermitTransFlag == true,才能开始后续的文件传输工作。
//virtual void OnReceiveFILEContinue(const string & APeerIP, const TPORTTYPE & APeerPort, const string & AFileName, const string & AAttachInfo, const string & ALocalRealName, const unsigned long long & AReceivedSize) {}
virtual void OnReceiveFILEEnd(const string & APeerIP, const TPORTTYPE & APeerPort, const string & AFileName, const long long & AFileSize, const string & AAttachInfo)
{
cout << "[TARTStar::OnReceiveFILEEnd]文件传输成功:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 文件名: " << AFileName << " 文件大小:" << AFileSize << endl;
}
virtual void OnReceiveFILERollback(const string & APeerIP, const TPORTTYPE & APeerPort, const string & AFileName, const long long & AFileSize, const string & AAttachInfo, const EFileRollBackReason ARollbackReason)
{
cout << "[TARTStar::OnReceiveFILERollback]文件传输取消:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 文件名: " << AFileName << " 文件大小:" << AFileSize << endl;
}
};


class TTestShareObject : public TShareClass //基于类的共享。
{
public:
int x;
double y;
string z;
public: //注意:下面两个方法内部,请不要增加锁定,锁定过程将由外界使用者自行约定。
virtual void ExportToStream(TStream & retStream) override
{
retStream << x << y << z;
}

virtual bool ImportFromStream(TStream & AStream) override
{
AStream >> x >> y >> z;
return true;
}

void DispInfo()
{
cout << "x = " << x << " y = " << y << " z = " << z << endl;
}
};

TShareInt glb_ShareInt;


void Demo_ARTClient()
{
cout << " ======================================================" << endl;
cout << " ART Center V3.70(ART Center代理服务器)" << endl;
cout << " --- By JiangWei From 2011-12-27 to 2014-11-01" << endl;
cout << endl;
cout << "操作:执行过程中,可以输入数字代表发送普通包和Echo包" << endl;
cout << " 1--RSM_NoWait;2--RSM_WaitFinish,3--RPC包, 4-查看共享内存, " << endl;
cout << " 5-P2PConnect, 6--P2PDisconnect, 7--RegistCenter" << endl;
cout << " 8--UnRegistCenter, 0--显示统计信息" << endl;
cout << " 15-状态注册,16,17-演示变量自动同步功能" << endl;
cout << " 20-查询某ART的共享变量信息" << endl;
cout << " 其它数字(按回车),则退出程序。" << endl;
cout << "本服务程序需要启用 2910 TCP端口,若有防火墙拦截,请设置允许!" << endl;
cout << " ======================================================" << endl;
art::TNetTools::Demo_DispLocalNetWorkInfo();
cout << "需要输入:(0)本地IP地址;(1)服务器的IP地址;(2)服务器的端口。" << endl;
cout << endl;

TMyARTCenter mm;

mm.EnableShareMem_GlobalAutoAppend = true; //允许自动创建全局共享对象。
//mm.EnableComStat(true); //打开统计功能

//#define LOCAL_COMPUTER

string mLocalBoundIP = "127.0.0.1";
#ifndef LOCAL_COMPUTER
cout << "(0)请输入本地IP地址(1代表127.0.0.1):";
mLocalBoundIP = "";
while (mLocalBoundIP == "") cin >> mLocalBoundIP;
if (mLocalBoundIP.length() < 5) mLocalBoundIP = "127.0.0.1";
//cout << "(1)输入服务端口(1--2910,2--4612):";
#endif
mm.Open(mLocalBoundIP, 2910); //任意绑定!
cout << "本地IP地址:" << mm.LocalIP() << "\t端口:" << mm.LocalPort() << endl;

string mIP = "127.0.0.1";
TPORTTYPE mPort = 2910;

//================ 设置共享内存 ============
TShareString glb_ShareString;
glb_ShareString.data = "大河向西流啊!";
mm.memShare.RegistObject("sharestring", glb_ShareString);

//JW::TFileStream mfs("D:\\aa.rar", fmOpenRead);
//TShareStreamTool sharefs(mfs);
//mm.memShare.RegistObject("mfs", &sharefs);

TTestShareObject mso;
mm.memShare.RegistObject("mso", mso);

mm.memShare.RegistObject("starsharevar", glb_ShareInt);
//glb_ShareInt.BeginWrite();
glb_ShareInt.data = 4567; //因为整数数据很短,所以不用加锁,否则需要锁定。
//glb_ShareInt.EndWrite();

mso.x = 80;
mso.y = 34.56;
mso.z = "大家好啊!";
//-----------------------------------------

TARTPackage rp; //接收包
TARTPackage sp; //发送数据包;
sp << "你好啊,我是一只小小鸟!";
cout << "数据长度:" << sp.size() << " 总长度:" << sp.headlen + sp.size() << endl;
int mFlag = 1;
while ((mFlag >= 0) && (mFlag <= 100))
{
cout << "继续发送数据包(输入1或2发送,其它数字退出):";
cin >> mFlag;
//for (int k = 0; k < 100; k ++) //进行循环,压力测试。
if (mFlag == 0)
{
cout << "=============================================================" << endl;
cout << "本地ARTID:" << mm.LocalARTID() << " IP地址:" << mm.LocalIP() << " : " << mm.LocalPort() << endl;
cout << "发送包/数据量:" << mm.getComStat().send.PackageCount << " / " << mm.getComStat().send.WholeSize << endl;
cout << "接收包/数据量:" << mm.getComStat().recv.PackageCount << " / " << mm.getComStat().recv.WholeSize << endl;
cout << "当前内存申请包数量:" << TPIPEComPackagePool::GetAllocateTotalCount() << endl;
//cout << "客户信息情况如下:" << endl;
cout << "----------------- PIPECom ClientTable------------------------------" << endl;
mm.PIPEComClientInfoTable().DispClientInfoTable();
cout << "----------------- ARTBase ClientTable------------------------------" << endl;
mm.ARTBaseClientInfoTable().DispClientInfoTable();
cout << "----------------- ARTCustom ClientTable ---------------------------" << endl;
cout << mm.ARTClientTable().DispClientTable() << endl;;
cout << "------------------ 本地内存共享情况 --------------------------" << endl;
cout << mm.memShare.SearchShareMemBasicList() << endl;
//准备导出本地共享文件目录。
vector<string> mRootList, mRealPathList;
mm.ExportLocalVirtualRootPathToList(mRootList, mRealPathList);
if (mRootList.size() > 0)
{
cout << "------------------ 本地共享文件目录 --------------------------" << endl;
for ( int k = 0; k < mRootList.size(); k ++)
{
cout << k << "\t" << mRootList[k] << "\t-->\t" << mRealPathList[k] << endl;
}
}
cout << "-------------------------------------------------------------------" << endl;
cout << "本地ARTID = " << mm.LocalARTID() << " 本地IP地址:" << mm.LocalIP() << " : " << mm.LocalPort() << endl;
}
if (mFlag == 77)
{
cout << " xxyy == " << xxyy << endl;
DisplayTotal();
continue;
}
if (mFlag == 15)
{
int mResultFlag = mm.serviceConcept.RegistServiceReady(true, 33, "nihaoa");
cout << "RegistServiceReady发送结果:" << mResultFlag << endl;

map<string, _RSynMemInfo> SynMemList;
_RSynMemInfo mNode;
mNode.IsAuthority = true;
mNode.AuthoryPriory = 5555;
mNode.synType = synmem_ReadWrite;
SynMemList.insert(pair<string, _RSynMemInfo>("starsharevar", mNode));
mNode.IsAuthority = false;
mNode.AuthoryPriory = 6666;
mNode.synType = synmem_Write;
SynMemList.insert(pair<string, _RSynMemInfo>("ni_gg", mNode));

map<string, bool> tempGlobalMap;
mResultFlag = mm.serviceConcept.RegistSynShareMem(true, SynMemList, tempGlobalMap);
cout << "RegistShareMem发送结果:" << mResultFlag << endl;

mResultFlag = mm.serviceConcept.ModifyUserState(45, "我是Center呵!");
cout << "ModifyUserState发送结果:" << mResultFlag << endl;
}
if (mFlag == 16)
{
glb_ShareInt.data = 65432;
int mResultFlag = glb_ShareInt.SynchronizeBlockUpdate();
cout << "同步更新结果:" << mResultFlag << endl;
}
if (mFlag == 20)
{
vector<int> mARTIDList;
vector<string> mIPList;
vector<TPORTTYPE> mPortList;
mm.ARTClientTable().SearchTable(mARTIDList, mIPList, mPortList);
if (mARTIDList.size() == 0)
{
cout << "[Warning]当前用户表不存在用户!您可能尚未注册!" << endl;
}
else
{
cout << "总用户数:" << mARTIDList.size() << endl;
for (int k = 0; k < mARTIDList.size(); k ++)
{
string mInfo;
EGenerationResultType mResultFlag = mm.memShare.QueryShareMemBasicList(mARTIDList[k], mInfo);
if (mResultFlag == egrt_Success)
cout << "ARTID = " << mARTIDList[k] << " 变量共享情况 = " << mInfo << endl;
else
cout << "查询失败,返回的标识!" << mResultFlag << endl;
}
}
}
if (mFlag == 17)
{
//glb_ShareInt.BeginRead();
cout << "当前的 starsharevar 中的值是:" << glb_ShareInt.data << endl;
//glb_ShareInt.EndRead();
}
if (mFlag == 1)
{
cout << "xxyy = " << xxyy << endl;
TMemoryStream mss;
mss.writestring("[RSM]新春快乐!2012---win!!!");
bool mResultFlag = mm.RSM(0, 0, 2, sp);
cout << "MRSM发送结果:" << mResultFlag << endl;
}
if (mFlag == 2)
{
int mARTID = 0;
cout << "请输入ARTID(0取消)";
cin >> mARTID;
if (mARTID != 0)
{
TMemoryStream mss;
mss.writestring("[RPC]新春快乐!2012---win!!!");
EARTRPCResult mResultFlag = mm.RPC(mARTID, 0, 1, mss, mss, 0, 0);
cout << "RPC发送结果:" << mResultFlag << endl;
cout << "RPC结果包大小:" << mss.size() << endl;
if (mResultFlag == rpc_Success) cout << mss.readstring() << endl;
}
}
if (mFlag == 3)
{
TARTPackage retResult;
//EARTRPCResult mFlag = mm.RPC(mIP, mPort, sp, retResult);
cout << "RPC发送结果:" << mFlag << endl;
cout << "RPC结果包大小:" << retResult.size() << endl;
if (retResult.size() > 0) cout << retResult.readstring() << endl;
}
//if (mFlag == 2) cout << mm.EchoTest(mIP, mPort, 15 * 1000) << endl; //测试15秒钟。
//if (mFlag == 3) cout << mm.UDPSendPackage(mIP, mPort, sp) << endl;
if (mFlag == 4)
{
glb_ShareString.LocalBeginRead(); //对于字符串,需要加锁。因为整型一次读出,而字符需要多次读出。
cout << "Share String Value = " << glb_ShareString.data << endl;
glb_ShareString.LocalEndRead();
mso.DispInfo();

glb_ShareInt.LocalBeginRead();
cout << "Share Int Value == " << glb_ShareInt.data << endl;
glb_ShareInt.LocalEndRead();
}
if (mFlag == 5)
{
string svrIP = mIP;
//cout << "请输入将要连接的服务器的IP地址:";
//while (svrIP == "") getline(cin, svrIP);
//cout << "请输入将要连接的服务器的端口号:";
TPORTTYPE svrPort = mPort;
//cin >> svrPort;
//EPIPEComP2PConnectResultType mFlag = mm.Connect(svrIP, svrPort, 10000);
cout << "P2PConnect结果:" << mFlag << endl;
}
/* if (mFlag == 6)
{
string svrIP = mIP;
//cout << "请输入将要连接的服务器的IP地址:";
//while (svrIP == "") getline(cin, svrIP);
cout << "请输入将要连接的服务器的端口号:";
TPORTTYPE svrPort = mPort;
cin >> svrPort;
EPIPEComP2PDisconnectResultType mFlag = mm.Disconnect(svrIP, svrPort, 10000);
cout << "P2PDisconnect结果:" << mFlag << endl;
}
*/
if (mFlag == 7)
{
if (mm.is_open() == true)
{
int artid;
cout << "请输入本地的ARTID(0,为需要分配的ID):";
cin >> artid;
cout << "RegistART返回结果标记:" << mm.Login(artid) << endl;
cout << "本地ARTID为" << mm.LocalARTID() << endl;
}
else cout << "[Error]请首先Open Center!" << endl;
}
if (mFlag == 8)
{
if (mm.is_open() == true)
{
mm.Logout();
cout << "本地ARTID为" << mm.LocalARTID() << endl;
}
else cout << "[Error]请首先Open Center!" << endl;
}
//
}
if (xxyy == 1)
fclose(desFile);
mm.Close();
cout << "总字节:" << recvFileSize << endl;
cout << "现在内存包申请数量:" << TPIPEComPackagePool::GetAllocateTotalCount() << endl;

}


void E012_Demo()
{
Demo_ARTClient();
}

}; //end namespace __Test_ARTCenter__

输出

(一)ARTCenter端运行图

.ARTCenter运行图

(二)说明:

(1)ARTCenter实现具有集中中心的分布式网络编程框架。中心通常具有具有两个作用:一是为其它分布端ARTStar提供一个连接点,以便各个Star可以自动自由连接;二是便于控制整个分布式网络。一般来说,ARTCenter和ARTStar功能几乎一样,只是ARTCenter增加了一项登记整个分布式网络的连接信息、共享变量信息、虚拟网络文件系统信息。

(2)运行ARTCenter之后,就可以启动ARTStar进行其它分布式节点的编程。

(3)ARTCenter的设计和工作原理,请参看书籍:姜维. 《分布式网络系统与Multi-Agent系统编程框架》

(4)在orsci-art包中,使用TARTCenter类作为基础类,派生后进行自定义的分布式中心编程。

(5)orsci包支持ARTCenter网络编程,提供TARTCenter基础类,可下载配套软件orsci-art应用。

书籍 姜维. 《分布式网络系统与Multi-Agent系统编程框架》
软件 orsci-art开发包(C++语言)。