E011

调用orsci-art中的ARTPeer,实现对等端的分布式网络编程。

 

#include "stdafx.h"
#include "Test_ARTPeer.h"

namespace __Test_ARTPeer__
{


class TMyARTPeer : public TARTPeer
{
private:

public:
TMyARTPeer(const EARTExeServiceMode AARTExeMode = eaem_StarParallel) : TARTPeer(AARTExeMode)
{
}
~TMyARTPeer() {}

public:
virtual void OnARTLogin(const int & AARTID, const string & APeerIP, const TPORTTYPE & APeerPort) override
{
cout << "[OnARTLogin]收到一个新的用户加入:" << AARTID << "\t" << APeerIP << " : " << APeerPort << endl;

} //哪个Star连接了。
virtual void OnARTLogout(const int & AARTID, const string & APeerIP, const TPORTTYPE & APeerPort) override
{
cout << "[OnARTLogout]收到一个用户注销:" << AARTID << "\t" << APeerIP << " : " << APeerPort << endl;
} //哪个Star断开了。

public:
virtual void OnReceiveP2P(const string & APeerIP, const TPORTTYPE & APeerPort, TPIPEComPackage * & pRecvPackage) override
{ //本事件触发接收一个PIPECom包
//cout << "[ART]收到P2P包:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 包大小: " << pRecvPackage->headlen + pRecvPackage->size() << endl;
cout << "[ART]a收到P2P包:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 包大小: " << pRecvPackage->ct_headlen + pRecvPackage->size() << endl;

} //当接收到数据时

public: //真正有用的事件
virtual void OnReceiveRSM(const string & APeerIP, const TPORTTYPE & APeerPort, const int & ATransctionID, const int & AFunctionID, TMemoryStreamTool & recvStream, const int & APeerEntityID, const int & ALocalEntityID) override
{ //本事件触发接收一个PIPECom包
cout << "[MyART]OnReceiveRSM:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 实体号:" << APeerEntityID << " 本地实体号:" << ALocalEntityID << " 包大小: " << recvStream.size() << endl;
cout << "接收的字符串为:" << recvStream.readstring() << endl;
} //当接收到数据时

virtual void OnReceiveRPC(const string & APeerIP, const TPORTTYPE & APeerPort, const int & ATransctionID, const int & AFunctionID, TMemoryStreamTool & recvStream, EARTOnServiceReturnFlag & retServicedFlag, TMemoryStreamTool & retResult, const int & APeerEntityID, const int & ALocalEntityID) override
{ //本事件触发接收一个PIPECom包
cout << "[MyART]OnReceiveRPC:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 实体号:" << APeerEntityID << " 本地实体号:" << ALocalEntityID << " 包大小: " << recvStream.size() << endl;
retServicedFlag = sv_Success;
retResult.writestring("[RPC Reply]新年好!");

retServicedFlag = sv_Success;

} //当接收到数据时
virtual void OnReceiveEVENT(const string & APeerIP, const TPORTTYPE & APeerPort, const int & ATransctionID, const int & AFunctionID, TMemoryStreamTool & recvStream, EARTOnServiceReturnFlag & retServicedFlag, TMemoryStreamTool & retResult, const int & APeerEntityID, const int & ALocalEntityID) override
{
cout << "[MyART]OnReceiveEVENT:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 实体号:" << APeerEntityID << " 本地实体号:" << ALocalEntityID << " 包大小: " << recvStream.size() << endl;
TPIPECom::msg("[EVENT]IP地址" + APeerIP + " -- 端口 " + TARTTools::toStr(APeerPort), "包大小!" + TARTTools::toStr(recvStream.size()));
}

public:
virtual void OnReceiveFILEBegin(const string & APeerIP, const TPORTTYPE & APeerPort, const string & ASuggestFileName, const bool & flagExistOverride, const long long & AFileSize, const string & AAttachInfo, bool & retPermitTransFlag, string & retLocalRealName) override
{
cout << "[TARTPeer::OnReceiveFILEBegin]收到传输文件请求(下面允许接收):IP地址" << APeerIP << " -- 端口 " << APeerPort << " 文件名: " << ASuggestFileName << " 文件大小:" << AFileSize << endl;
retPermitTransFlag = true; //用于控制是否接受文件数据
}
//默认不接收文件传输。即retPermitTransFlag == false。需要用户接管该事件,然后设置retPermitTransFlag == true,才能开始后续的文件传输工作。
virtual void OnReceivingFILEProcess(const string & APeerIP, const TPORTTYPE & APeerPort, const string & AFileName, const string & AAttachInfo, const long long & AFileSize, const long long & ACurReceivedSize, bool & retInteruptFlag) override
{
//cout << "OnReceiveFILEProcess():" << ACurReceivedSize << " / " << AFileSize << endl;
}
virtual void OnReceiveFILEEnd(const string & APeerIP, const TPORTTYPE & APeerPort, const string & AFileName, const long long & AFileSize, const string & AAttachInfo) override
{
cout << "[TARTPeer::OnReceiveFILEEnd]文件传输成功:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 文件名: " << AFileName << " 文件大小:" << AFileSize << endl;
}
virtual void OnReceiveFILERollback(const string & APeerIP, const TPORTTYPE & APeerPort, const string & AFileName, const long long & AFileSize, const string & AAttachInfo, const EFileRollBackReason ARollbackReason) override
{
cout << "[TARTPeer::OnReceiveFILERollback]文件传输取消:IP地址" << APeerIP << " -- 端口 " << APeerPort << " 文件名: " << AFileName << " 文件大小:" << AFileSize << endl;
}
virtual void OnSendingFILEProcess(const string & APeerIP, const TPORTTYPE & APeerPort, const string & ALocalFileName, const string & ASuggestPeerFileName, const string & AAttachInfo, const long long & AFileSize, const long long & ACurSendSize, bool & retInteruptFlag) override
{
//if ((ACurSendSize / (double)AFileSize) * 10)
//cout << "OnSendFILEProcess():" << ACurSendSize << " / " << AFileSize << endl;
}

};


class TTestShareObject : public TShareClass //基于类的共享。
{
public:
int x;
double y;
string z;
public: //注意:下面两个方法内部,请不要增加锁定,锁定过程将由外界使用者自行约定。
virtual void ExportToStream(TStream & retStream) override
{
retStream << x << y << z;
}

virtual bool ImportFromStream(TStream & AStream) override
{
AStream >> x >> y >> z;
return true;
}

void DispInfo()
{
cout << "x = " << x << " y = " << y << " z = " << z << endl;
}
};

TShareInt glb_ShareInt; //设置共享对象


void Demo_ARTPeer()
{

cout << " ======================================================" << endl;
cout << " ARTPeer C++ V3.70 分布式端" << endl;
cout << endl;
cout << "操作:执行过程中,可以输入数字代表发送普通包和Echo包" << endl;
cout << " 1--RSM_NoWait群发演示;2--RPC包, 3 -- 显示共享整数值, 4-读写共享内存, " << endl;
cout << " 5-P2PConnect, 6--P2PDisconnect, 7--RegistStar" << endl;
cout << " 8--UnRegist, 0--显示统计信息, 13--发送文件" << endl;
cout << " 100以上整数(按回车),则退出程序。" << endl;
cout << "本服务程序需要使用TCP服务,若有防火墙拦截,请设置允许!" << endl;
cout << " ======================================================" << endl;
art::TNetTools::Demo_DispLocalNetWorkInfo();
cout << "需要输入:(0)本地IP地址;(1)服务器的IP地址;(2)服务器的端口。" << endl;
cout << endl;

TMyARTPeer mm(eaem_StarParallel);
cout << "Mode = " << mm.getExeServiceMode() << endl;
//TTraceCall::ct_TraceFileName = "TraceStar.log";
//mm.EnableComStat(true); //打开统计功能

#define LOCAL_COMPUTER
//#undef LOCAL_COMPUTER

//#define NORMAL_EXE
#ifdef NORMAL_EXE
cout << "是否打开文件(1打开!):";
cin >> xxyy;
if (xxyy == 1)
desFile = fopen("D:\\aabb.rar", "wb");
#endif

string mLocalBoundIP = "127.0.0.1";
//mLocalBoundIP = "192.168.1.99";
#ifndef LOCAL_COMPUTER
cout << "(0)请输入本地IP地址(但数字代表本地127):";
cin >> mLocalBoundIP;
if (mLocalBoundIP.length() < 5) mLocalBoundIP = "127.0.0.1";
#endif
mm.Open(mLocalBoundIP, 0); //任意绑定!
//mm.Open("192.168.1.99", 0, true); //任意绑定!
cout << "本地IP地址:" << mm.LocalIP() << "\t端口:" << mm.LocalPort() << endl;

//string mIP = "127.0.0.1";
string mIP = mLocalBoundIP;
int mPort = 2910;
#ifndef LOCAL_COMPUTER
cout << "请输入:(1)服务器的IP地址:";
//while (mIP == "") getline(cin, mIP);
cin >> mIP;
if (mIP.length() < 5) mIP = "127.0.0.1";
//#endif
//if (mIP == "") { mIP = "127.0.0.1"; cout << "ps:您没有输入,按照127.0.0.1设置" << endl;} //如果用户直接按下回车,则发送给本机器。
#endif
cout << "请输入:(2)服务器的端口号(如2910,输入单个数字也意味着2910):";
cin >> mPort;
if (mPort < 10) mPort = 2910;

mm.memShare.RegistObject("starsharevar", glb_ShareInt);
//glb_ShareInt.BeginWrite();
glb_ShareInt.data = 7654; //因为整数数据很短,所以不用加锁,否则需要锁定。
//glb_ShareInt.EndWrite();

TARTPackage rp; //接收包
TARTPackage sp; //发送数据包;
//sp.writeint(0);
//sp.writeint(0);
sp << "你好啊,我是一只小小鸟!";
cout << "数据长度:" << sp.size() << " 总长度:" << sp.headlen + sp.size() << endl;
int mFlag = 1;
while ((mFlag >= 0) && (mFlag <= 100))
{
cout << "继续发送数据包(输入1或2发送,其它数字退出):";
cin >> mFlag;
//for (int k = 0; k < 100; k ++) //进行循环,压力测试。
if (mFlag == 0)
{
cout << "=============================================================" << endl;
cout << "本地IP地址:" << mm.LocalIP() << " : " << mm.LocalPort() << endl;
cout << "发送包/数据量:" << mm.getComStat().send.PackageCount << " / " << mm.getComStat().send.WholeSize << endl;
cout << "接收包/数据量:" << mm.getComStat().recv.PackageCount << " / " << mm.getComStat().recv.WholeSize << endl;
cout << "当前内存申请包数量:" << TPIPEComPackagePool::GetAllocateTotalCount() << endl;
cout << "客户信息情况如下:" << endl;
cout << endl;
cout << "----------------- PIPECom ClientTable------------------------------" << endl;
mm.PIPEComClientInfoTable().DispClientInfoTable();
cout << "----------------- ARTBase ClientTable------------------------------" << endl;
mm.ARTBaseClientInfoTable().DispClientInfoTable();
cout << "----------------- ARTPeer ClientTable ---------------------------" << endl;
cout << mm.ARTClientTable().DispClientTable() << endl;;
cout << "------------------ 本地内存共享情况 --------------------------" << endl;
cout << mm.memShare.SearchShareMemBasicList() << endl;
cout << " 本地IP地址:" << mm.LocalIP() << " : " << mm.LocalPort() << endl;
}

if (mFlag == 1)
{
cout << "请输入ART客户端端口号(0--群发):";
int mPort;
cin >> mPort;

TMemoryStream mss;
mss.writestring("[RSM]新春快乐!2012---win!!!");
if (mPort == 0)
{
mm.RSMToAll(0, 2, sp, 7, 8);
//bool mResultFlag = mm.RSM(mARTID, 0, 2, sp, 7, 8);
//cout << "RSM发送结果:" << mResultFlag << endl;
}
else mm.RSM(mLocalBoundIP, mPort, 0, 3, sp, 88, 99);
}
if (mFlag == 2)
{
cout << "请输入ART客户端端口号:";
int mPort;
cin >> mPort;
//if (mARTID != 0)
{
TMemoryStream mss;
//mss.traceInfo.AddInfo("TMyStar::开始执行RPC,哈哈,测试跟踪效果!");
mss.writestring("[RPC]新春快乐!2012---win!!!");
EARTRPCResult mResultFlag = mm.RPC(mLocalBoundIP, mPort, 0, 1, mss, mss, 6, 9);
cout << "RPC发送结果:" << TARTBaseInterpret::InterpretRPCResult(mResultFlag) << endl;
cout << "RPC结果包大小:" << mss.size() << endl;
if (mResultFlag == rpc_Success) cout << mss.readstring() << endl;
}
}
if (mFlag == 3)
{
cout << "显示glb_ShareInt的值:" << glb_ShareInt.data << endl;
//cout << "查询ART客户表:" << mm.QueryARTTable(mm.ARTClientTable()) << endl;
//cout << "显示客户表:" << endl;
//cout << mm.ARTClientTable().DispClientTable() << endl;
}
//if (mFlag == 2) {EARTRSMResult mResultFlag = mm.RSM_WaitFinishService(mIP, mPort, sp); cout << "MSG发送结果:" << mResultFlag << endl; }
//if (mFlag == 3) cout << mm.UDPSendPackage(mIP, mPort, sp) << endl;
if (mFlag == 4)
{
if (true)
{
cout << "输入要读取的Port:";
unsigned short starport;
cin >> starport;
TShareInt zz;
EMemResult mFlag = mm.memShare.ShareRead(mIP, starport, "starsharevar", zz);
cout << "内存读操作结果 = " << mFlag << endl;
if (mFlag == mem_Success) cout << "读取成功:" << zz.data << endl;
cout << "内存写入操作" << endl;
zz.data += 2;
mm.memShare.ShareWrite(mIP, starport, "starsharevar", zz);
cout << "内存再次读取!" << endl;
zz.data = 3344;
zz.Bind(mm, mIP + ":" + jw::IntToStr(starport) + ":\\starsharevar");
zz.ShareRead();
cout << zz.data << endl;
}
if (false)
{
TShareVar zz(sizeof(int));
EMemResult mFlag = mm.memShare.ShareRead(mIP, 2910, "sharevar", zz);
cout << "内存读操作结果 = " << mFlag << endl;
if (mFlag == mem_Success) cout << "读取成功:" << zz.asInt() << endl;
zz.asInt() ++;
mm.memShare.ShareWrite(mIP, 2910, "sharevar", zz);
}
if (false)
{
TTestShareObject mso;
EMemResult mFlag = mm.memShare.ShareRead(mIP, 2910, "mso", mso);
cout << "内存读操作结果 = " << mFlag << endl;
if (mFlag == mem_Success) cout << "读取成功:" << mso.x << "\t" << mso.y << "\t" << mso.z << endl;
mso.DispInfo();
mso.x ++;
mso.y = mso.y + 100;
mso.z = mso.z + "*" + jw::IntToStr(mso.x);
mm.memShare.ShareWrite(mIP, 2910, "mso", mso);
}
if (false)
{
TShareString shareString;
EMemResult mFlag = mm.memShare.ShareRead(mIP, 2910, "jiangwei", shareString);
cout << "内存读操作结果 = " << mFlag << endl;
if (mFlag == mem_Success) cout << shareString.data << endl;
shareString.data += "*" + jw::IntToStr(rand());
mFlag = mm.memShare.ShareWrite(mIP, 2910, "jiangwei", shareString);
cout << "内存写操作结果 = " << mFlag << endl;
}

if (false)
{
TShareString shareString;
shareString.Bind(mm, "sharestring");
//EMEMResult mFlag = shareString.ShareRead(mIP, 2910);
//cout << "内存读操作结果 = " << mFlag << endl;
//if (mFlag == mem_Success) cout << shareString.data << endl;
//shareString.data += "*" + jw::IntToStr(rand());
//mFlag = shareString.ShareWrite(mIP, 2910);
//cout << "内存写操作结果 = " << mFlag << endl;
}
if (false)
{
TFileStream mDes("D:\\xxyyxx.rar", fmOpenWrite);
//TMemoryStream mDes;
TShareStreamTool mfs(mDes);
cout << "内存读取结果:" << mm.memShare.ShareRead(mIP, 2910, "mfs", mfs) << endl;
}
}
if (mFlag == 5)
{
string svrIP = mIP;
#ifndef LOCAL_COMPUTER
cout << "请输入将要连接的服务器的IP地址:";
svrIP = "";
while (svrIP == "") getline(cin, svrIP);
#endif
cout << "请输入将要连接的服务器的端口号:";
unsigned short svrPort = mPort;
cin >> svrPort;
EPIPEComP2PConnectResultType mResultFlag = mm.Connect(svrIP, svrPort);
//EARTPeerConnectResultType mResultFlag = mm.ConnectCenter(svrIP, svrPort, 10000);
cout << "Connect结果:" << mResultFlag << endl;
}
if (mFlag == 6)
{
string svrIP = mIP;
#ifndef LOCAL_COMPUTER
cout << "请输入将要连接的服务器的IP地址:";
svrIP = "";
while (svrIP == "") getline(cin, svrIP);
#endif
cout << "请输入将要断开的服务器的端口号:";
unsigned short svrPort = mPort;
cin >> svrPort;
EPIPEComP2PDisconnectResultType mFlag = mm.Disconnect(svrIP, svrPort);
cout << "P2PDisconnect结果:" << mFlag << endl;
}
if (mFlag == 7)
{
//if (mm.is_ConnectCenter() == true)
{
string svrIP = mIP;
cout << "需要输入Login对方的端口号和本地ARTID值!" << endl;
cout << "首先输入对方端口号:";
int artPort;
cin >> artPort;
int artid;
cout << "请输入本地的ARTID(0,为需要分配的ID):";
cin >> artid;
vector<int> roleList;
roleList.push_back(34);
roleList.push_back(56);
cout << "ARTLogin返回结果标记:" << mm.Login(svrIP, artPort, artid, "ART", roleList) << endl;
//cout << "本地ARTID为" << mm.LocalARTID() << endl;

//现在测试全局共享内存的写入情况。

}
//else cout << "[Error]请首先连接到Center!" << endl;
}
if (mFlag == 8)
{
//if (mm.is_ConnectCenter() == true)
//{
string svrIP = mIP;
cout << "需要输入Logout的对方端口号:";
int artPort;
cin >> artPort;
mm.Logout(svrIP, artPort);
// cout << "本地ARTID为" << mm.LocalARTID() << endl;
//}
//else cout << "[Error]请首先连接到Center!" << endl;

}
if (mFlag == 11) //开始发送文件
{
//EARTFILEResultType mResultFlag = mm.SendFile(mIP, 2910, "D:\\abc.txt", "d:\\zzpp.txt", "");
//28个字节文件传输测试。(会自动放在一个包中发送)
//cout << "文件传输结果类型:" << mResultFlag << endl;
}
if (mFlag == 12) //开始大文件发送
{
//EARTFILEResultType mResultFlag = mm.SendFile(mIP, 2910, "D:\\aa.rar", "d:\\xxyyxxa.rar", "");
//1.95M文件发送测试。(会分开多个包发送)
//cout << "文件传输结果类型:" << mResultFlag << endl;
}
if (mFlag == 13) //给Star开始大文件发送
{
cout << "输入要发送文件ARTID端(注意:需要双方登录):";
int starid;
cin >> starid;

cout << "请输入文件名:";
string mFileName;
while (mFileName == "") getline(cin, mFileName);
string mDesFileName = mFileName + ".new";
EARTFILEResultType mResultFlag = mm.SendFile(mLocalBoundIP, starid, mFileName, mDesFileName, false, "测试传输文件!");
cout << "文件传输结果类型:" << mResultFlag << endl;
}
if (mFlag == 55)
{
cout << "通信统计信息如下!" << endl;
cout << mm.getComStat().DispStatInfo() << endl;
//cout << "xx= " << xx << "yy = " << yy << endl;
}

}


void E011_Demo()
{
Demo_ARTPeer();
}

}; //end namespace __Test_ARTPeer__

输出

(一)ARTPeer演示程序的运行图

.ARTPeer演示程序的运行图

(二)说明:

(1)ARTPeer按照对等模式实现网络编程。它是在TPIPECom的基础上,提供了P2P数据通信、RPC远程过程调用、RSM消息通知、EVENT事件驱动、网络共享变量、传输文件、文件系统等功能,由于是对等模式,框架中各端是对等的,没有服务中心的概念。应用时,可更具需要自由设计。

(2)ARTPeer是实现P2P架构的分布式网络编程基础类。本演示程序只是演示RPC、RSM等的调用和响应过程。

(3)该P2P的设计和工作原理,请参看书籍:姜维. 《分布式网络系统与Multi-Agent系统编程框架》

(4)在orsci-art包中,使用TARTPeer类作为基础类,派生出子类后实现对等式的分布式网络编程用户端。

(5)orsci包支持ARTPeer网络编程,提供TARTPeer基础类,可下载配套软件orsci进行应用。

书籍 姜维. 《分布式网络系统与Multi-Agent系统编程框架》
软件 orsci-art开发包(C++语言)。