抵制不良游戏 拒绝盗版游戏 注意自我保护 谨防受骗上当
适度游戏益脑 沉迷游戏伤身 合理安排时间 享受健康生活
已阅读并同意
友情提示您未同意用户协议或对战玩法,无法登录游戏。请先阅读以下相关内容若同意请选中对应选择框游戏服务器之长连接服务器(python)(1) - CSDN博客
游戏服务器之长连接服务器(python)(1)
&游戏服务器之长连接服务器实现tcp连接的数据异步收发。
特点如下:
(1)一个服务器进程含一个服务器对象
(2)服务器对象包含被动连接工厂和主动连接工厂
(3)两个工厂分别处理被动连接和主动连接
(4)被动连接和主动连接的网络收发都在在reactor的网络线程池里处理
(5)每个连接对应一个会话对象,接受的数据缓存在会话,在会话里反序列化和派送到具体的逻辑服务;会话发送数据时压入reactor网络线程池的队列里处理
1、服务器对象
在配置里配置服务器对象,是自己的名字就建立被动连接接收工厂,是其他的服务器对象,就连接过去。
class Server:
.服务器对象,各个服务都通过服务器对象,可进行
1)服务之间通讯
2)读取配置
.服务器对象主要的功能包括
2)读取配置
3)装载服务
4)实现通讯机制
.服务器维护以下关键数据结构
1)路由表:当收到事件后需要查询路由表寻找事件的接收服务器
2)服务表:该表保存服务数据,服务和网络的关系
3)通讯频道表:该表保存当前的通道数据信息
def __init__(self, conf):
self.conf = conf
self.channels = {}
self.route = {} # service_id: ServiceItem
self.services = {} # service_id: service object
self.lock = Lock()
self.name = conf.myself.name
(&temp forbid init server data&)
self.init_server_data()
# create route table
for s in conf.servers:
for svcId, svc in s.services.items():
self.route[svcId] = ServiceItem(svcId, s.name)#路由表存储所有的服务器对象
set_server(self)
def init_server_data(self):
(&init system&)
ServerData.init_system()
#(&begin init data&)
#ServerData.init_load()
def getServerConfig(self, serverName):
u&&&获得指定服务器的配置信息&&&
for server in self.conf.servers:
if server.name == serverName:
return server
return None
def getServiceConfig(self, serviceId):
u&&&获得指定服务的配置信息&&&
return self.conf.myself.services[serviceId]
# ----- channel & service ------------------------------------------------
def requestCreateChannel(self, target, transport):
. 网络层在网络协议建立后,请求服务器对象建立和对端Server的通讯频道
1)服务器对象创建Channel对象
2)服务器对象为每个服务设置通讯Channel&&&
channel = Channel(transport)
self.channels[target] = channel
for k, v in self.route.items():
if v.serverName == target:
v.channel = channel
(u&service (%d) is in the server(%s)&, k, target)
(u&[%s] : server(%s) is connected&, self.name, target)
channel.setListener(self)
return channel
def requestDestroyChannel(self, target):
.网络层在网络协议断连后,请求服务器对象销毁和对端Server的通讯频道
1)服务器对象销毁Channel对象
2)服务器对象设置相应服务Channel=None,后续发给该服务的消息将无法送达
del self.channels[target]
for k, v in self.route.items():
if v.serverName == target:
v.channel = None
(u&server(%s) is disconnected&, target)
def getChannel(self, serviceId):
item = self.route[serviceId]
if item != None:
return item.channel
return None
def registeService(self, service):
u&&&服务注册函数,服务注册成功后,则可以收发消息&&&
self.services[service.serviceId] = service
def unregisteService(self, service):
u&&&服务注销函数,服务注册成功后,则可以收发消息&&&
del self.services[service.serviceId]
# ----- event ------------------------------------------------------------
def onChannelEvent(self, channel, event):
Channel收到相应的事件后将调用服务器对象的onChannelEvent
. 服务器对象根据dstId,转发给相应的本地服务
self._dispatchEvent(event)
#发送消息如果是本服务器对象,就直接派送到具体的服务,否则就网络发送到对方的服务器对象
def sendEvent(self, eventData, srcId= -1, dstId= -1, eventType= -1, \
userId= -1, param= -1, senceId=-1, origEvent=None):
.可通过调用本函数,发送事件给指定服务
1)服务器对象首先检查目标服务的位置
2)对于远程服务,通过Channel对象发送事件至对端服务器
3)对于本地服务,则直接转发
eventData : 发送的数据
srcId : 源服务的标记,如origEvent不为空,则为origEvent.dstId
dstId : 目标服务的标记,如origEvent不为空,则为origEvent.srcId
eventType: 事件
型,如果不填写则为-1
origEvent: 为源事件
# check whether event should be sent over network or not
if origEvent != None:
srcId = origEvent.dstId
dstId = origEvent.srcId
userId = origEvent.param1
param = origEvent.param2
senceId = -1
if dstId not in self.route:
log.warning(¬ find service:%d&, dstId)
item = self.route[dstId]
if item == None:
log.warning(u&Event(%d) Missing due to no this service(%d)&, \
eventType, dstId)
return - 1
event = Event.createEvent(srcId, dstId, eventType, userId, param, eventData)
if item.serverName == self.name: # local event
self._dispatchEvent(event)
if item.channel == None:
log.warning(u&Event(%s) Missing due to network problem&, eventData)
item.channel.sendEvent(event)
def notifyEvent(self, userId, data, event_type, sceneId=0, serviceId=0):
. 发送通知事件给客户端
self.sendEvent(data, serviceId, NOTIFY_SERVICE, \
NOTIFY_SERVICE_SENDTO_CLIENT.REQ, userId, event_type, sceneId)
def _dispatchEvent(self, event):
u&&&分发事件至本地服务的函数&&&
svc = self.services[event.dstId]
if svc != None:
svc.dispatch(event)
except Full, e:
log.warning(u&Event(%s) Missing due to queue is full&, event)
log.warning(u&Event(%s) Missing due to no this service&, event)
def _dispatchTimerEvent(self, event):
u&&&分发时间事件至本地服务的函数&&&
svc = self.services[event.dstId]
if svc != None:
svc.dispatchTimerEvent(event)
log.warning(u&Event(%s) Missing due to no this service&, event)
def localBroadcastEvent(self, eventType, param1, param2, eventData):
u&&&实现本地服务广播&&&
for id, svc in self.services.items():
evt = Event.createEvent(-1, svc.serviceId, eventType, param1, \
param2, eventData)
self._dispatchEvent(evt)
# ----- timer -------------------------------------------------------------
def setTimerByFunc(self, delay, func, *args, **kw):
u&&&设置定时器,并通过另外的函数唤醒&&&
reactor.callLater(delay, func, args, kw)
def setTimerByClientEvent(self, delay, sid, event):
.设置定时器,直接分发调用者传入的事件,也是通过消息队列的方式返回给客户端
def onTime(server, sid, evt):
svc = self.services[sid]
if svc != None:
svc.dispatchTimerEvent(evt)
log.warning(u&Event(%s) Missing due to no this service&, event)
return reactor.callLater(delay, onTime, self, sid, event)
def setTimerByEvent(self, delay, sid, eventData):
u&&&设置定时器的函数&&&
event = Event.createEvent(0, sid, 1, -1, -1, eventData)
def onTime(server, evt):
self._dispatchTimerEvent(evt)
return reactor.callLater(delay, onTime, self, event)
# ----- run --------------------------------------------------------------
def installServices(self):
u&&&通过配置文件的信息,启动,初始化和注册服务&&&
for svcId, svc in self.conf.myself.services.items():
name = svc.options[&module&]
__import__(name)
module = sys.modules[name]
(u&...install service : %s&, name)
self.registeService(eval(svc.options[&code&]))
# 校准服务器时间
ServerData.fix_server_time()
def run(self):
.主循环函数
1)设置Twisted网络
2)广播系统准备好
3)调用Twisted.run进入事件循环
conf = self.conf
(u&Setup network configuration....&)
#在配置里配置服务器对象,是自己的名字就建立被动连接接收工厂,是其他的服务器对象,就连接过去
#网络收发在reactor的网络线程里处理
for i in range(0, len(conf.servers)):
if conf.servers[i].name == self.name:
reactor.listenTCP(conf.servers[i].port,ServerConnectionFactory(self, self.name))#反应器的被动连接建立工厂
(u&conf.servers[i].ip: %s, port: %s&, \
conf.servers[i].ip, conf.servers[i].port)
reactor.connectTCP(self.conf.servers[i].ip, conf.servers[i].port,
ClientConnectionFactory(self, self.name))#反应器的主动连接的工厂
(u&System Started&)
reactor.run()
reactor.stop()
def stop(self):
u&&&停止各个服务&&&
for service in self.services.values():
service.stop()
#time.sleep(5)
for service in self.services.values():
self.unregisteService(service)
log.debug(&stop server&)
路由表的服务器对象
class ServiceItem:
. 路由表中代表服务的数据结构
1)serviceId,服务Id
2)serverName 所在服务的名字
3)channel 和所在服务的连接频道对象
def __init__(self, serviceId, serverName, channel=None):
self.serviceId = serviceId
self.serverName = serverName # 服务器名字
self.channel = channel
2、主动连接协议和被动连接协议
连接处理如下:
工厂接到连接就建立协议,失去连接就销毁协议
协议接到数据就缓存到缓冲区
连接用于建立会话和销毁会话,以及数据接收发送
接收的数据缓存到会话的缓冲区
(1)被动连接协议
class ServerConnection(protocol.Protocol):
. ServerConnection用于维护其它服务器主动到本服务器的连接
. ServerConnection维护一个Channel对象,当握手协议完成后,Channel对象被创建
def connectionMade(self):
self.channel = None
(u&server(%s) low level connection is Made : &, \
str(self.transport.getPeer()))
self.buffer = &&
def connectionLost(self, reason):
. 底层连接断开后,ServerConnection请求服务器销毁Channel对象
if self.channel != None:
self.host.requestDestroyChannel(self.clientName)
self.channel = None
(u&server(%s) low level connection is Lost : %s&, \
str(self.transport.getPeer()), reason)
def dataReceived(self, data):
. 本函数,有两种处理逻辑
. 1)Channel已经建立
. 当收到数据后,底层连接不进行数据解析,而是交Channel处理
. 2)Channel未建立
. 进行握手
if (self.channel != None):
self.channel.writeBuffer(data)
self.buffer += data
if len(self.buffer) & lengthOfShakeHand:
(magicCode1, magicCode2, self.clientName) = struct.unpack(formatOfShakeHand, self.buffer[:lengthOfShakeHand])
self.clientName = self.clientName.strip()
self.transport.write(struct.pack(formatOfShakeHand, magicCode1, magicCode2, self.host.name))
self.channel = self.host.requestCreateChannel(self.clientName, self.transport)
self.buffer = self.buffer[lengthOfShakeHand:]
self.channel.writeBuffer(self.buffer)
(u&server(%s:%s) shake hand successful&, \
str(self.transport.getPeer()), self.clientName)
def __del__(self):
(u&server(%s) low level connection is deleted : &, \
str(self.transport.getPeer()))
被动连接工厂
class ServerConnectionFactory(protocol.ServerFactory):#协议建立工厂
protocol = ServerConnection
def __init__(self, host, serverName):
#protocol.ServerFactory.__init__(self)
self.host = host
self.serverName = serverName
def buildProtocol(self, addr):
p = protocol.ServerFactory.buildProtocol(self, addr)
p.host = self.host
p.serverName = self.serverName
p.clientName = None
(2)主动连接协议
class ClientConnection(protocol.Protocol):
ClienConnection维护本服务器到其它服务器的连接
def connectionMade(self):
self.channel = None
(u&client(%s) low level connection is Made : &, \
str(self.transport.getPeer()))
self.transport.write(struct.pack(formatOfShakeHand, 20, 7, self.clientName))
self.buffer = &&
def connectionLost(self, reason):
if self.channel != None:
self.host.requestDestroyChannel(self.serverName)
self.channel = None
(u&client (%s) low level connection is Lost : %s&, \
str(self.transport.getPeer()), reason)
def dataReceived(self, data):
if (self.channel != None):
self.channel.writeBuffer(data)
self.buffer += data
if len(self.buffer) & lengthOfShakeHand:
(magicCode1, magicCode2, self.serverName) = struct.unpack(formatOfShakeHand, self.buffer[:lengthOfShakeHand])
self.serverName = self.serverName.strip()
self.channel = self.host.requestCreateChannel(self.serverName, self.transport)#第一次接收数据就建立会话
self.buffer = self.buffer[lengthOfShakeHand:]
self.channel.writeBuffer(self.buffer)
(u&server(%s:%s) shake hand successful&, \
str(self.transport.getPeer()), self.serverName)
def __del__(self):
(u&client(%s) low level connection is deleted : &, \
str(self.transport.getPeer()))
主动连接工厂
class ClientConnectionFactory(protocol.ReconnectingClientFactory):#协议建立工厂
def __init__(self, host, clientName):
#protocol.ReconnectingClientFactory.__init__(self)
self.clientName = clientName
self.host = host
self.maxDelay = RECONNECTION_DELAY
def buildProtocol(self, addr):
self.resetDelay()
p = ClientConnection()
p.host = self.host
p.serverName = None
p.clientName = self.clientName
def clientConnectionLost(self, connector, reason):
protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
(u&trying to reconnecting host: %s, clientName: %s&, self.host, self.clientName)
def clientConnectionFailed(self, connector, reason):
protocol.ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
(u&trying to reconnecting host: %s, clientName: %s&, self.host, self.clientName)
会话对象保存该连接的缓存和发送传输对象(封装套接字的)和***者(服务器对象),方便处理数据收发。
处理收发的数据,收到的数据先添加到缓冲区,烦序列化后派送到具体的服务(逻辑线程),发送的数据则由反应器(网络线程)发送出去。
class Channel:
.Channel类用于和网络上其它服务器进行通讯,对于每个连接的Server,本服务器会维护一个Channel实例
.用于接收和发送相关事件消息
.Channel会注册至服务器上,从网络上收到消息后会缓冲以及解析,生成事件对象并交服务器处理
.Channel会打包发送的数据,并通过Twisted提供的传输机制来发送数据,Channel对象不进行数据缓存,
.Twisted会解决发送过程中的缓存问题
def __init__(self, transport):
.通过建立的网络传输对象,初始化Channel实例
self.transport = transport
self.buffer = &&
self.listener = None
def sendEvent(self, event):
. 发送消息方法
. 该方法会使用反应器的callFromThread方法进行处理
. 1) 解决线程安全的问题
. 2) 解决网络发送的问题
. 注意:trasport.write方法不是线程安全的方法
reactor.callFromThread(self._sendEvent, event) # 使用callFromThread,reactor保证线程安全(把函数和参数压入reactor线程池的队列中处理)
def _sendEvent(self, event):
self.transport.write(event.toStream())
def writeBuffer(self, data):#网络写到会话的缓存并读取,在连接类(协议类的封装)的数据接收里会写入到缓存
. 在网络上收到消息后,该方法将被调用
. 该方法缓存并解析事件,同时交***器处理
self.buffer += data
while True:
(result, event) = Event.createEventFromStream(self.buffer)#反序列化,只有长度达到包头指定包体长度才会返回成功
if result:
self.buffer = self.buffer[event.length:]#截取该消息
if (self.listener != None):
self.listener.onChannelEvent(self, event)#***器是服务器对象,这里派送到具体的服务
def setListener(self, listener)://服务器对象
u&&& listener: server.onChannelEvent &&&
self.listener = listener
保存该连接的缓存和发送传输对象(封装套接字的)和***者(服务器对象)。
处理收发的数据,收的派送,发的由反应器发送出去。
本文已收录于以下专栏:
相关文章推荐
2、服务应用
(1)ai服务
1、服务基础
(1)服务基类
封装服务消息队列和服务控制函数
class BaseService:
u&&&
.该类为所有服务类的基类
.每...
游戏服务器之长连接服务器实现tcp连接的数据异步收发。
一个网络收发处理进程,一个服务器对象逻辑处理进程。两个进程之间使用管道通信。
网络收发处理进程:
(1)网络处理是由反应器的子线程来处理的。
写了这么久的html5,感觉html5学得差不多了,是时候去接触更多的语言来扩充自己的能力了。我先后看了Swift,Java等语言。首先开发Swift需要一台mac,对于我这个寒士而言,过于奢华了一些...
到了新的环境,老大让我有空研究下一代服务器技术,作为一个长期任务。
新的服务器想达到的目标:
1、分布式系统,对象(Entity)之间的关系类似于Actor模型。
2...
本系列文章的第一章就已经提到,用加载多个python动态链接库的方式,可以巧妙的避开GIL,实现多个python环境运行在同一个进程内。
但是从上一个试验,到思考具体的实际用法,又经过...
服务器的参数调优
TCP/IP参数配置最大文件描述符应用运行时调优OutOfMemory Killer
客户端的参数调优服务器测试
Netty服务器Spray服务...
paramiko是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。
使用paramiko可以很好的解决以下问题:
需要使用windows客户端,
Socketsocket通常也称作”套接字”,用于描述IP地址和端口,是一个通信链的句柄,应用程序通常通过”套接字”向网络发出请求或者应答网络请求。socket起源于Unix,而Unix/Linux基...
原文在:/blog/cs/?p=1062
其中讲到了很多TCP的调整参数,现转之,十分精彩
对于一个server,我们一般考虑他所能支撑的qps,但有那么...
'''
Created on
@author: qs
'''
#from twisted.internet import epollreactor
#epollreactor.i...
他的最新文章
讲师:宋宝华
讲师:何宇健
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)用户名:delxu
文章数:172
评论数:427
访问量:1704400
注册日期:
阅读量:1297
阅读量:3317
阅读量:585028
阅读量:470589
51CTO推荐博文
这个帖子解决了我一直困惑的问题,好的很,拜谢。红色字为本人的注解。原贴见: =====以下为转帖====== 起因:sysprep时应该是选择了重新生成sid,但在硬盘对拷过程中,换盘重启时不慎使镜像系统启动过了,所以使得部分客户端SID相同了。
(注:这里有误,应该是SusClientID相同造成的,所以后面的解决方法是删掉并且重新激活生成SusClientID。事实上,sysprep修改的是computer SID,就本人的案例来看,2台计算机的SID的确是不同,但是SusClientID相同。
查看计算机SID的方法,可以去微软网站下载Sysinternal的PsGetSID工具:)
现像:其它到现在还没有发现什么,就是在wsus 3.0 sp1控制台上,看到客户端,有时来台A,有时来台B,有A没B,有B没A,只能显示一台。
解决方案:均在客户端上操作,
net stop wuauserv
删掉 HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\WindowsUpdate 下面 SusClientId 和 SusClientIdValidation 键
删掉 %windir%下面的SoftwareDistribution目录
net start wuauserv
运行 wuauclt.exe /Detectnow /Resetauthorization 命令
飞快,在控制台就可以看到新加入的客户端了!
了这篇文章
类别:┆阅读(0)┆评论(0)