E010

调用orsci-art中的PIPECom,实现带有管道的多线程网络编程演示。

 

#pragma once

#include "PIPECOM.h"

using namespace orsci;
using namespace art;


namespace TEST_PIPECom_P2P
{

#define ct_PIPE_Count 5 //应用时通常是预先定义管道的数量。

class TMyPIPECom : public TPIPECom
{
public:
TMyPIPECom(const EPIPEComServiceMode APIPEComServiceMode = acsm_MultiServiceThread,
//const int minThreadCount = Config_TPIPECom_MinServiceThreadCount, const int maxThreadCount = Config_TPIPECom_MaxServiceThreadCount,
const int maxTaskBufferCapacity = Config_TPIPECom_TaskCacheQueueCapacity)
: TPIPECom(ct_PIPE_Count, APIPEComServiceMode,
//minThreadCount, maxThreadCount,
maxTaskBufferCapacity){} //默认服务模式为单监听线程。
public:
virtual void OnOpen(const string & ALocalBoundIP, const int & ALocalBoundPort) override //当本地端口打开开始监听时
{
oss.lock();
cout << "[Open事件--本地打开]连接了:本地IP:" << ALocalBoundIP << " 端口:" << ALocalBoundPort << endl;
oss.unlock();
return ;
}
virtual void OnAccept(const string & APeerIP, const int & APeerPort, bool & retAcceptAllowFlag) override
{
oss.lock();
cout << "[Accept事件--收到请求]连接了:本地IP:" << APeerIP << " 端口:" << APeerPort << endl;
oss.unlock();
return ;
}

virtual void OnConnect(const string & APeerIP, const int & APeerPort, const unsigned int APipeCount, int & usertag, void * & userDataPointer) override
{
cout << "[Connect事件--远程连接]PeerIP:" << APeerIP << " 端口:" << APeerPort << endl;
return ;
}

virtual void OnDisconnect(const string & APeerIP, const int & APeerPort, const unsigned int APipeCount, int & usertag, void * & userDataPointer) override
{
cout << "[Disconnect事件--远程断开]PeerIP:" << APeerIP << " 端口:" << APeerPort << endl;
return ;
}

virtual void OnReceiveP2P(const string & APeerIP, const int & APeerPort, TPIPEComPackage * & pRecvPackage) override
{
oss.lock();
cout << "[Receive P2P]来自于IP:" << APeerIP << " -- 端口 " << APeerPort << " 包大小: " << pRecvPackage->ct_headlen + pRecvPackage->size() << endl;
cout << "内存包申请的包总数量情况:" << TPIPEComPackagePool::GetAllocateTotalCount() << endl;
oss.unlock();
//string mMsg = APeerIP + " : " + TARTTools::toStr(APeerPort) + " 包大小: " + TARTTools::toStr(pRecvPackage->ct_headlen + pRecvPackage->size());
//MessageBoxA(0, mMsg.c_str(), "[收到MSG包]OnReceiveMSG", MB_OK + MB_ICONINFORMATION);
if (pRecvPackage->size() > 0)
{
//cout << "当前流位置:" << pRecvPackage->positon() << " 长度:" << pRecvPackage->size() << endl;
string abc = pRecvPackage->readstring();
cout << "对方发送的数据是:" << abc << endl;
}
} //当接收到数据时

virtual void OnReceivePIPE(const string & APeerIP, const TPORTTYPE & APeerPort, const unsigned int APipeIndex, TPIPEComPackage * & pRecvPackage) override
{ //本事件触发接收一个PIPECom包
cout << "收到PIPE包,来自管道 " << APipeIndex << " :IP地址" << APeerIP << " -- 端口 " << APeerPort << " 包大小: " << pRecvPackage->ct_headlen + pRecvPackage->size() << endl;
} //当接收到数据时

//virtual void OnBeforeSend(const string & APeerIP, const unsigned short int APeerPort, TPIPEComPackage & ASendPackage); //即将发送数据前
//virtual void OnAfterSend(const string & APeerIP, const unsigned short int APeerPort, TPIPEComPackage & ASendPackage); //发送数据之后
//virtual void OnDisconnect(const string & APeerIP, const unsigned short int APeerPort); //断开连接之后
virtual void OnClose() override
{
cout << "[Close事件--本地断开]端口不再监听!" << endl;
}

virtual void OnException(const EPIPEComExceptionEventType AExceptionEventType, const int AErrorCode, const string & AErrorMsg, bool & retIgnoreException) override
{
cout << "[Exception事件--异常处理]{jw 呼拉呼拉,出现异常拉}错误信息: " << AErrorCode << " -- " << AErrorMsg << endl;
//cout << "请输入是否需要忽略异常?;
//retIgnoreExceptionFlag = true; //控制是否忽略异常!如果外部处理完毕了,则可以选择不让内部进一步处理异常。
return ;
} //什么也不做,需要进一步重载实现更加高级功能。
};


inline void Demo_COMClient()
{
cout << " ======================================================" << endl;
cout << " PIPECom 客户端(兼职服务器)" << endl;
cout << endl;
cout << "操作:执行过程中,可以输入数字代表发送普通包和Echo包" << endl;
cout << " 0--显示统计信息;" << endl;
cout << " 1--普通P2P数据包;2--P2P发送输入的消息" << endl;
cout << " 3--发送PIPE包" << endl;
cout << " 5--Connect连接, 6--Disconnect断开!" << endl;
cout << " 9--输出客户表" << endl;
cout << " 其它数字(按回车),则退出程序。" << endl;
cout << " ======================================================" << endl;
TNetTools::Demo_DispLocalNetWorkInfo();
cout << "需要输入:(0)本地IP地址;(1)服务器的IP地址;(2)服务器的端口。" << endl;
cout << endl;

TMyPIPECom mm(acsm_MultiServiceThread, 6);
//TMyPIPECom mm(acsm_ListenThread, 3, 4, 6);
mm.EnableComStat(true); //打开统计功能

string mLocalBoundIP;
cout << "(0)请输入本地IP地址:";
while (mLocalBoundIP == "") cin >> mLocalBoundIP;
if (mLocalBoundIP.size() < 8) mLocalBoundIP = "127.0.0.1";

mm.Open(mLocalBoundIP, 0, true); //任意绑定!
//mm.Open("192.168.1.99", 0, true); //任意绑定!

string mPeerIP;
cout << "请输入:(1)服务器的IP地址:";
while (mPeerIP == "") getline(cin, mPeerIP);
if (mPeerIP.size() < 8) mPeerIP = "127.0.0.1";
cout << "请输入:(2)服务器的端口号(如2910):";
int mPeerPort;
cin >> mPeerPort;

TPIPEComPackage rp; //接收包
TPIPEComPackage sp; //发送数据包;
sp.writestring("你好啊,我是一只小小鸟!");
cout << "数据长度:" << sp.size() << " 总长度:" << sp.ct_headlen + sp.size() << endl;
sp.reset();
cout << sp.readstring() << endl;
sp.reset();

//return;
int mFlag = 1;
while ((mFlag >= 0) && (mFlag <= 9))
{
cout << "继续发送数据包(输入1或2发送,其它数字退出):";
cin >> mFlag;
//for (int k = 0; k < 100; k ++) //进行循环,压力测试。
if (mFlag == 0)
{
cout << "本地IP地址:" << mm.LocalIP() << " : " << mm.LocalPort() << endl;
cout << "发送包/数据量:" << mm.getComStat().send.PackageCount << " / " << mm.getComStat().send.WholeSize << endl;
cout << "接收包/数据量:" << mm.getComStat().recv.PackageCount << " / " << mm.getComStat().recv.WholeSize << endl;
cout << "当前内存申请包数量:" << TPIPEComPackagePool::GetAllocateTotalCount() << endl;
cout << "======= PIPECom 中的客户信息如下" << endl;
mm.PIPEComClientInfoTable().DispClientInfoTable();

cout << "======= P2P 中的客户表信息如下...........==========" << endl;
mm.getp2p().DispInDos_TableInfo();
//cout << "匿名发送次数:" << mm.
}
if (mFlag == 9) mm.PIPEComClientInfoTable().DispClientInfoTable();
if (mFlag == 1)
{
string mPeerIP;
cout << "请输入:(1)消息发送给对方的IP地址:";
while (mPeerIP == "") getline(cin, mPeerIP);
if (mPeerIP.size() < 8) mPeerIP = "127.0.0.1";
cout << "请输入:(2)消息发送给对方的端口号(如2910):";
int mPeerPort;
cin >> mPeerPort;
mm.P2PSendPackage(mPeerIP, mPeerPort, sp);
}
if (mFlag == 2)
{
string mPeerIP;
cout << "请输入:(1)消息发送给对方的IP地址:";
while (mPeerIP == "") getline(cin, mPeerIP);
if (mPeerIP.size() < 8) mPeerIP = "127.0.0.1";
cout << "请输入:(2)消息发送给对方的端口号(如2910):";
int mPeerPort;
cin >> mPeerPort;
cout << "请输入:(3)需要发送的消息" << endl;
string mMsg;
while (mMsg == "") getline(cin, mMsg);
sp.clear();
sp.writestring(mMsg);
mm.P2PSendPackage(mPeerIP, mPeerPort, sp);
}
if (mFlag == 3)
{
string mPeerIP;
cout << "请输入:(1)消息发送给对方的IP地址:";
while (mPeerIP == "") getline(cin, mPeerIP);
if (mPeerIP.size() < 8) mPeerIP = "127.0.0.1";
cout << "请输入:(2)消息发送给对方的端口号(如2910):";
int mPeerPort;
cin >> mPeerPort;
cout << mm.PIPESendPackage(mPeerIP, mPeerPort, 2, sp) << endl; //2是随意给出的,将来按照管道进行约定,不能达到或超过ct_PIPE_Count。
}
//if (mFlag == 4) cout << "MSG调用结果==" << mm.MSGSendPackage(mIP, mPort, sp, 10*1000) << endl; //按照10秒钟超时设置。
if (mFlag == 5)
{
string mPeerIP;
cout << "请输入:(1)连接对方的IP地址:";
while (mPeerIP == "") getline(cin, mPeerIP);
if (mPeerIP.size() < 8) mPeerIP = "127.0.0.1";
cout << "请输入:(2)连接对方的端口号(如2910):";
int mPeerPort;
cin >> mPeerPort;
//mm.Connect("127.0.0.1", mPeerPort, ct_PIPE_Count);
mm.Connect(mPeerIP, mPeerPort, ct_PIPE_Count);
}
if (mFlag == 6)
{
string mPeerIP;
cout << "请输入:(1)断开对方的IP地址:";
while (mPeerIP == "") getline(cin, mPeerIP);
if (mPeerIP.size() < 8) mPeerIP = "127.0.0.1";
cout << "请输入:(2)断开对方的端口号(如2910):";
int mPeerPort;
cin >> mPeerPort;
//mm.Disconnect("127.0.0.1", mPeerPort);
mm.Disconnect(mPeerIP, mPeerPort);
}
}
mm.Close();
}

};


inline void E010_Demo()
{
TEST_PIPECom_P2P::Demo_COMClient();
}

输出

(一)Peer端One运行图

.PIPECom运行图

(二)说明:

(1)PIPECom在P2P的基础上实现,实现对等模式的网络编程。任意一端都可以连接其它端。

(2)PIPECom提供了P2P连接和多个Pipe连接,每个连接是并发的,这样不会因为一类信息的阻塞而影响到其它类信息的传输和运行。

(3)PIPECom的设计和工作原理,请参看书籍:姜维. 《分布式网络系统与Multi-Agent系统编程框架》

(3)在orsci-art包中,使用TPIPECom类作为基础类,派生后进行自定义的管道通信编程。

(4)orsci包支持PIPE网络编程,提供TPIPECom基础类,可下载配套软件orsci-art应用。

书籍 姜维. 《分布式网络系统与Multi-Agent系统编程框架》
软件 orsci-art开发包(C++语言)。