From 1f66acfd254188058503bd2fbb1b76c7ffe39cb7 Mon Sep 17 00:00:00 2001 From: CPunch Date: Mon, 27 Nov 2023 21:23:28 -0600 Subject: [PATCH] holy refactor started out as me making a service abstraction.. - db.Player exists again, and entity.Player uses it as an embedded struct - chunk.ForEachEntity() lets you add/remove entities during iteration now - removed account related fields from CNPeer - protocol/pool has been merged with protocol. use protocol.GetBuffer() and protocol.PutBuffer(). - new protocol/internal/service! service.Service is an abstraction layer to handle multiple CNPeer* connections and allows you to associate each with an interface{} uData. In the future it might also handle a task queue for jobs that modify/interact with the player's uData, called from service.handleEvents() - PacketHandler callback type has a new param! uData is passed as well now - much of loginserver/shardserver is now handled by the shared service abstraction - SHARD: NPC_ENTER packets are now sent on player loading complete rather than on enter. --- internal/db/players.go | 44 ++++++-- internal/entity/chunk.go | 10 +- internal/entity/player.go | 43 +++----- internal/protocol/cnpeer.go | 59 ++++++----- internal/protocol/{pool => }/pool.go | 8 +- internal/service/service.go | 138 +++++++++++++++++++++++++ login/login.go | 55 ++++++---- login/loginserver.go | 122 +++++----------------- shard/chat.go | 34 +++--- shard/join.go | 46 +++++---- shard/movement.go | 29 +++--- shard/shardserver.go | 148 +++++---------------------- 12 files changed, 378 insertions(+), 358 deletions(-) rename internal/protocol/{pool => }/pool.go (53%) create mode 100644 internal/service/service.go diff --git a/internal/db/players.go b/internal/db/players.go index 81b9791..efc1547 100644 --- a/internal/db/players.go +++ b/internal/db/players.go @@ -4,11 +4,41 @@ import ( "database/sql" "github.com/CPunch/gopenfusion/config" - "github.com/CPunch/gopenfusion/internal/entity" "github.com/CPunch/gopenfusion/internal/protocol" "github.com/blockloop/scan" ) +type Player struct { + PlayerID int + AccountID int + AccountLevel int + Slot int + PCStyle protocol.SPCStyle + PCStyle2 protocol.SPCStyle2 + EquippedNanos [3]int + Nanos [config.NANO_COUNT]protocol.SNano + Equip [config.AEQUIP_COUNT]protocol.SItemBase + Inven [config.AINVEN_COUNT]protocol.SItemBase + Bank [config.ABANK_COUNT]protocol.SItemBase + SkywayLocationFlag []byte + FirstUseFlag []byte + Quests []byte + HP int + Level int + Taros int + FusionMatter int + Mentor int + X, Y, Z int + Angle int + BatteryN int + BatteryW int + WarpLocationFlag int + ActiveNanoSlotNum int + Fatigue int + CurrentMissionID int + IPCState int8 +} + // returns PlayerID, error func (db *DBHandler) NewPlayer(AccountID int, FirstName, LastName string, slot int) (int, error) { nameCheck := 1 // for now, we approve all names @@ -121,8 +151,8 @@ const ( INNER JOIN Accounts as acc ON p.AccountID = acc.AccountID ` ) -func (db *DBHandler) readPlayer(rows *sql.Rows) (*entity.Player, error) { - plr := entity.Player{ActiveNanoSlotNum: 0} +func (db *DBHandler) readPlayer(rows *sql.Rows) (*Player, error) { + plr := Player{ActiveNanoSlotNum: 0} if err := rows.Scan( &plr.PlayerID, &plr.AccountID, &plr.Slot, &plr.PCStyle.SzFirstName, &plr.PCStyle.SzLastName, @@ -162,13 +192,13 @@ func (db *DBHandler) readPlayer(rows *sql.Rows) (*entity.Player, error) { return &plr, nil } -func (db *DBHandler) GetPlayer(PlayerID int) (*entity.Player, error) { +func (db *DBHandler) GetPlayer(PlayerID int) (*Player, error) { rows, err := db.Query(QUERY_PLAYERS+"WHERE p.PlayerID = $1", PlayerID) if err != nil { return nil, err } - var plr *entity.Player + var plr *Player for rows.Next() { plr, err = db.readPlayer(rows) if err != nil { @@ -179,13 +209,13 @@ func (db *DBHandler) GetPlayer(PlayerID int) (*entity.Player, error) { return plr, nil } -func (db *DBHandler) GetPlayers(AccountID int) ([]entity.Player, error) { +func (db *DBHandler) GetPlayers(AccountID int) ([]Player, error) { rows, err := db.Query(QUERY_PLAYERS+"WHERE p.AccountID = $1", AccountID) if err != nil { return nil, err } - var plrs []entity.Player + var plrs []Player for rows.Next() { plr, err := db.readPlayer(rows) if err != nil { diff --git a/internal/entity/chunk.go b/internal/entity/chunk.go index 614ac5e..f83a605 100644 --- a/internal/entity/chunk.go +++ b/internal/entity/chunk.go @@ -38,11 +38,17 @@ func (c *Chunk) SendPacket(typeID uint32, pkt ...interface{}) { } // calls f for each entity in this chunk, if f returns true, stop iterating +// f can safely add/remove entities from the chunk func (c *Chunk) ForEachEntity(f func(entity Entity) bool) { + // copy entities to avoid locking for the entire iteration + entities := make(map[Entity]struct{}) c.lock.Lock() - defer c.lock.Unlock() - for entity := range c.entities { + entities[entity] = struct{}{} + } + c.lock.Unlock() + + for entity := range entities { if f(entity) { break } diff --git a/internal/entity/player.go b/internal/entity/player.go index ed9f356..afa5e4c 100644 --- a/internal/entity/player.go +++ b/internal/entity/player.go @@ -1,41 +1,22 @@ package entity import ( - "github.com/CPunch/gopenfusion/config" + "github.com/CPunch/gopenfusion/internal/db" "github.com/CPunch/gopenfusion/internal/protocol" ) type Player struct { - Peer *protocol.CNPeer - Chunk ChunkPosition - PlayerID int - AccountID int - AccountLevel int - Slot int - PCStyle protocol.SPCStyle - PCStyle2 protocol.SPCStyle2 - EquippedNanos [3]int - Nanos [config.NANO_COUNT]protocol.SNano - Equip [config.AEQUIP_COUNT]protocol.SItemBase - Inven [config.AINVEN_COUNT]protocol.SItemBase - Bank [config.ABANK_COUNT]protocol.SItemBase - SkywayLocationFlag []byte - FirstUseFlag []byte - Quests []byte - HP int - Level int - Taros int - FusionMatter int - Mentor int - X, Y, Z int - Angle int - BatteryN int - BatteryW int - WarpLocationFlag int - ActiveNanoSlotNum int - Fatigue int - CurrentMissionID int - IPCState int8 + db.Player + Peer *protocol.CNPeer + Chunk ChunkPosition +} + +func NewPlayer(peer *protocol.CNPeer, player *db.Player) *Player { + return &Player{ + Player: *player, + Peer: peer, + Chunk: MakeChunkPosition(player.X, player.Y), + } } // ==================== Entity interface ==================== diff --git a/internal/protocol/cnpeer.go b/internal/protocol/cnpeer.go index fe263c5..9013567 100644 --- a/internal/protocol/cnpeer.go +++ b/internal/protocol/cnpeer.go @@ -6,9 +6,8 @@ import ( "io" "log" "net" + "sync/atomic" "time" - - "github.com/CPunch/gopenfusion/internal/protocol/pool" ) const ( @@ -18,15 +17,16 @@ const ( // CNPeer is a simple wrapper for net.Conn connections to send/recv packets over the Fusionfall packet protocol. type CNPeer struct { - conn net.Conn - eRecv chan *Event - SzID string - E_key []byte - FE_key []byte - AccountID int - PlayerID int32 - whichKey int - alive bool + conn net.Conn + eRecv chan *Event + whichKey int + alive *atomic.Bool + + // May not be set while Send() or Handler() are concurrently running. + E_key []byte + + // May not be set while Send() or Handler() are concurrently running. + FE_key []byte } func GetTime() uint64 { @@ -34,22 +34,23 @@ func GetTime() uint64 { } func NewCNPeer(eRecv chan *Event, conn net.Conn) *CNPeer { - return &CNPeer{ - conn: conn, - eRecv: eRecv, - SzID: "", - E_key: []byte(DEFAULT_KEY), - FE_key: nil, - AccountID: -1, - whichKey: USE_E, - alive: true, + p := &CNPeer{ + conn: conn, + eRecv: eRecv, + whichKey: USE_E, + alive: &atomic.Bool{}, + + E_key: []byte(DEFAULT_KEY), + FE_key: nil, } + + return p } func (peer *CNPeer) Send(typeID uint32, data ...interface{}) error { // grab buffer from pool - buf := pool.Get() - defer pool.Put(buf) + buf := GetBuffer() + defer PutBuffer(buf) // allocate space for packet size buf.Write(make([]byte, 4)) @@ -73,12 +74,14 @@ func (peer *CNPeer) Send(typeID uint32, data ...interface{}) error { binary.LittleEndian.PutUint32(buf.Bytes()[:4], uint32(buf.Len()-4)) // encrypt body + var key []byte switch peer.whichKey { case USE_E: - EncryptData(buf.Bytes()[4:], peer.E_key) + key = peer.E_key case USE_FE: - EncryptData(buf.Bytes()[4:], peer.FE_key) + key = peer.FE_key } + EncryptData(buf.Bytes()[4:], key) // send full packet log.Printf("Sending %#v, sizeof: %d, buffer: %v", data, buf.Len(), buf.Bytes()) @@ -94,11 +97,12 @@ func (peer *CNPeer) SetActiveKey(whichKey int) { func (peer *CNPeer) Kill() { log.Printf("Killing peer %p", peer) - if !peer.alive { + + if !peer.alive.Load() { return } + peer.alive.Store(false) - peer.alive = false peer.conn.Close() peer.eRecv <- &Event{Type: EVENT_CLIENT_DISCONNECT, Peer: peer} } @@ -107,6 +111,7 @@ func (peer *CNPeer) Kill() { func (peer *CNPeer) Handler() { defer peer.Kill() + peer.alive.Store(true) for { // read packet size, the goroutine spends most of it's time parked here var sz uint32 @@ -123,7 +128,7 @@ func (peer *CNPeer) Handler() { // grab buffer && read packet body if err := func() error { - buf := pool.Get() + buf := GetBuffer() if _, err := buf.ReadFrom(io.LimitReader(peer.conn, int64(sz))); err != nil { return fmt.Errorf("failed to read packet body! %v", err) } diff --git a/internal/protocol/pool/pool.go b/internal/protocol/pool.go similarity index 53% rename from internal/protocol/pool/pool.go rename to internal/protocol/pool.go index cf76c54..6fbe7ef 100644 --- a/internal/protocol/pool/pool.go +++ b/internal/protocol/pool.go @@ -1,4 +1,4 @@ -package pool +package protocol import ( "bytes" @@ -9,11 +9,13 @@ var allocator = &sync.Pool{ New: func() any { return new(bytes.Buffer) }, } -func Get() *bytes.Buffer { +// grabs a *bytes.Buffer from the pool +func GetBuffer() *bytes.Buffer { return allocator.Get().(*bytes.Buffer) } -func Put(buf *bytes.Buffer) { +// returns a *bytes.Buffer to the pool +func PutBuffer(buf *bytes.Buffer) { buf.Reset() allocator.Put(buf) } diff --git a/internal/service/service.go b/internal/service/service.go new file mode 100644 index 0000000..6f3a074 --- /dev/null +++ b/internal/service/service.go @@ -0,0 +1,138 @@ +package service + +import ( + "fmt" + "log" + "net" + "sync" + + "github.com/CPunch/gopenfusion/config" + "github.com/CPunch/gopenfusion/internal/protocol" +) + +type PacketHandler func(peer *protocol.CNPeer, uData interface{}, pkt protocol.Packet) error + +func StubbedPacket(_ *protocol.CNPeer, _ interface{}, _ protocol.Packet) error { + return nil +} + +type Service struct { + listener net.Listener + port int + Name string + eRecv chan *protocol.Event + packetHandlers map[uint32]PacketHandler + peers *sync.Map + + // OnDisconnect is called when a peer disconnects from the service. + // uData is the stored value of the key/value pair in the peer map. + // It may not be set while the service is running. (eg. srvc.Start() has been called) + OnDisconnect func(peer *protocol.CNPeer, uData interface{}) + + // OnConnect is called when a peer connects to the service. + // return value is used as the value in the peer map. + // It may not be set while the service is running. (eg. srvc.Start() has been called) + OnConnect func(peer *protocol.CNPeer) (uData interface{}) +} + +func NewService(name string, port int) (*Service, error) { + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + + service := &Service{ + listener: listener, + port: port, + Name: name, + eRecv: make(chan *protocol.Event), + packetHandlers: make(map[uint32]PacketHandler), + peers: &sync.Map{}, + } + + return service, nil +} + +// may not be called while the service is running (eg. srvc.Start() has been called) +func (service *Service) AddPacketHandler(pktID uint32, handler PacketHandler) { + service.packetHandlers[pktID] = handler +} + +func (service *Service) Start() { + log.Printf("%s service hosted on %s:%d\n", service.Name, config.GetAnnounceIP(), service.port) + + go service.handleEvents() + for { + conn, err := service.listener.Accept() + if err != nil { + log.Println("Connection error: ", err) + return + } + + peer := protocol.NewCNPeer(service.eRecv, conn) + service.connect(peer) + } +} + +func (service *Service) handleEvents() { + for event := range service.eRecv { + switch event.Type { + case protocol.EVENT_CLIENT_DISCONNECT: + service.disconnect(event.Peer) + case protocol.EVENT_CLIENT_PACKET: + if err := service.handlePacket(event.Peer, event.PktID, protocol.NewPacket(event.Pkt)); err != nil { + log.Printf("Error handling packet: %v", err) + event.Peer.Kill() + } + + // the packet buffer is given to us by the event, so we'll need to make sure to return it to the pool + protocol.PutBuffer(event.Pkt) + } + } +} + +func (service *Service) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error { + uData, _ := service.peers.Load(peer) + if hndlr, ok := service.packetHandlers[typeID]; ok { + if err := hndlr(peer, uData, pkt); err != nil { + return err + } + } else { + log.Printf("[WARN] unknown packet ID: %x\n", typeID) + } + + return nil +} + +func (service *Service) disconnect(peer *protocol.CNPeer) { + if service.OnDisconnect != nil { + uData, _ := service.peers.Load(peer) + service.OnDisconnect(peer, uData) + } + + log.Printf("Peer %p disconnected from %s\n", peer, service.Name) + service.peers.Delete(peer) +} + +func (service *Service) connect(peer *protocol.CNPeer) { + // default uData to nil, but if the service has an OnConnect + // handler, use the result from that + uData := interface{}(nil) + if service.OnConnect != nil { + uData = service.OnConnect(peer) + } + + log.Printf("New peer %p connected to %s\n", peer, service.Name) + service.peers.Store(peer, uData) + go peer.Handler() +} + +func (service *Service) SetPeerData(peer *protocol.CNPeer, uData interface{}) { + service.peers.Store(peer, uData) +} + +func (service *Service) RangePeers(f func(peer *protocol.CNPeer, uData interface{}) bool) { + service.peers.Range(func(key, value any) bool { + return f(key.(*protocol.CNPeer), value) + }) +} diff --git a/login/login.go b/login/login.go index 8859647..fb3b449 100644 --- a/login/login.go +++ b/login/login.go @@ -25,8 +25,6 @@ const ( ) func (server *LoginServer) AcceptLogin(peer *protocol.CNPeer, SzID string, IClientVerC int32, ISlotNum int8, data []protocol.SP_LS2CL_REP_CHAR_INFO) error { - peer.SzID = SzID - resp := protocol.SP_LS2CL_REP_LOGIN_SUCC{ SzID: SzID, ICharCount: int8(len(data)), @@ -62,7 +60,7 @@ func (server *LoginServer) AcceptLogin(peer *protocol.CNPeer, SzID string, IClie return nil } -func (server *LoginServer) Login(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) Login(peer *protocol.CNPeer, _account interface{}, pkt protocol.Packet) error { var loginPkt protocol.SP_CL2LS_REQ_LOGIN pkt.Decode(&loginPkt) @@ -74,9 +72,9 @@ func (server *LoginServer) Login(peer *protocol.CNPeer, pkt protocol.Packet) err } // client is resending a login packet?? - if peer.AccountID != -1 { + if _account != nil { SendError(LOGIN_ERROR) - return fmt.Errorf("out of order P_CL2LS_REQ_LOGIN") + return fmt.Errorf("out of order P_CL2LS_REQ_LOGIN: %v", _account) } // attempt login @@ -99,7 +97,7 @@ func (server *LoginServer) Login(peer *protocol.CNPeer, pkt protocol.Packet) err } // grab player data - peer.AccountID = account.AccountID + server.service.SetPeerData(peer, account) plrs, err := server.dbHndlr.GetPlayers(account.AccountID) if err != nil { SendError(LOGIN_DATABASE_ERROR) @@ -138,7 +136,7 @@ func (server *LoginServer) Login(peer *protocol.CNPeer, pkt protocol.Packet) err return server.AcceptLogin(peer, loginPkt.SzID, loginPkt.IClientVerC, 1, charInfo[:len(plrs)]) } -func (server *LoginServer) CheckCharacterName(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) CheckCharacterName(peer *protocol.CNPeer, account interface{}, pkt protocol.Packet) error { var charPkt protocol.SP_CL2LS_REQ_CHECK_CHAR_NAME pkt.Decode(&charPkt) @@ -149,17 +147,17 @@ func (server *LoginServer) CheckCharacterName(peer *protocol.CNPeer, pkt protoco }) } -func (server *LoginServer) SaveCharacterName(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) SaveCharacterName(peer *protocol.CNPeer, account interface{}, pkt protocol.Packet) error { var charPkt protocol.SP_CL2LS_REQ_SAVE_CHAR_NAME pkt.Decode(&charPkt) - if peer.AccountID == -1 { + if account == nil { peer.Send(protocol.P_LS2CL_REP_SAVE_CHAR_NAME_FAIL, protocol.SP_LS2CL_REP_SAVE_CHAR_NAME_FAIL{}) return fmt.Errorf("out of order P_LS2CL_REP_SAVE_CHAR_NAME_FAIL") } // TODO: sanity check SzFirstName && SzLastName - PlayerID, err := server.dbHndlr.NewPlayer(peer.AccountID, charPkt.SzFirstName, charPkt.SzLastName, int(charPkt.ISlotNum)) + PlayerID, err := server.dbHndlr.NewPlayer(account.(*db.Account).AccountID, charPkt.SzFirstName, charPkt.SzLastName, int(charPkt.ISlotNum)) if err != nil { peer.Send(protocol.P_LS2CL_REP_SAVE_CHAR_NAME_FAIL, protocol.SP_LS2CL_REP_SAVE_CHAR_NAME_FAIL{}) return err @@ -211,16 +209,20 @@ func SendFail(peer *protocol.CNPeer) error { return nil } -func (server *LoginServer) CharacterCreate(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) CharacterCreate(peer *protocol.CNPeer, account interface{}, pkt protocol.Packet) error { var charPkt protocol.SP_CL2LS_REQ_CHAR_CREATE pkt.Decode(&charPkt) + if account == nil { + return SendFail(peer) + } + if !validateCharacterCreation(&charPkt) { log.Printf("Invalid character creation packet: %+v", charPkt) return SendFail(peer) } - if err := server.dbHndlr.FinishPlayer(&charPkt, peer.AccountID); err != nil { + if err := server.dbHndlr.FinishPlayer(&charPkt, account.(*db.Account).AccountID); err != nil { log.Printf("Error finishing player: %v", err) return SendFail(peer) } @@ -239,11 +241,15 @@ func (server *LoginServer) CharacterCreate(peer *protocol.CNPeer, pkt protocol.P }) } -func (server *LoginServer) CharacterDelete(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) CharacterDelete(peer *protocol.CNPeer, account interface{}, pkt protocol.Packet) error { var charPkt protocol.SP_CL2LS_REQ_CHAR_DELETE pkt.Decode(&charPkt) - slot, err := server.dbHndlr.DeletePlayer(int(charPkt.IPC_UID), peer.AccountID) + if account == nil { + return SendFail(peer) + } + + slot, err := server.dbHndlr.DeletePlayer(int(charPkt.IPC_UID), account.(*db.Account).AccountID) if err != nil { return SendFail(peer) } @@ -253,10 +259,14 @@ func (server *LoginServer) CharacterDelete(peer *protocol.CNPeer, pkt protocol.P }) } -func (server *LoginServer) ShardSelect(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) ShardSelect(peer *protocol.CNPeer, account interface{}, pkt protocol.Packet) error { var selection protocol.SP_CL2LS_REQ_CHAR_SELECT pkt.Decode(&selection) + if account == nil { + return SendFail(peer) + } + shards := server.redisHndlr.GetShards() if len(shards) == 0 { SendFail(peer) @@ -278,9 +288,10 @@ func (server *LoginServer) ShardSelect(peer *protocol.CNPeer, pkt protocol.Packe log.Printf("Error getting player: %v", err) return SendFail(peer) } + accountID := account.(*db.Account).AccountID - if plr.AccountID != peer.AccountID { - log.Printf("HACK: player %d tried to join shard as player %d", peer.AccountID, plr.AccountID) + if plr.AccountID != accountID { + log.Printf("HACK: player %d tried to join shard as player %d", accountID, plr.AccountID) return SendFail(peer) } @@ -288,7 +299,7 @@ func (server *LoginServer) ShardSelect(peer *protocol.CNPeer, pkt protocol.Packe server.redisHndlr.QueueLogin(key, redis.LoginMetadata{ FEKey: peer.FE_key, PlayerID: int32(selection.IPC_UID), - AccountID: peer.AccountID, + AccountID: accountID, }) // craft response @@ -303,11 +314,15 @@ func (server *LoginServer) ShardSelect(peer *protocol.CNPeer, pkt protocol.Packe return peer.Send(protocol.P_LS2CL_REP_SHARD_SELECT_SUCC, resp) } -func (server *LoginServer) FinishTutorial(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *LoginServer) FinishTutorial(peer *protocol.CNPeer, account interface{}, pkt protocol.Packet) error { var charPkt protocol.SP_CL2LS_REQ_SAVE_CHAR_TUTOR pkt.Decode(&charPkt) - if err := server.dbHndlr.FinishTutorial(int(charPkt.IPC_UID), peer.AccountID); err != nil { + if account == nil { + return SendFail(peer) + } + + if err := server.dbHndlr.FinishTutorial(int(charPkt.IPC_UID), account.(*db.Account).AccountID); err != nil { return SendFail(peer) } diff --git a/login/loginserver.go b/login/loginserver.go index 4cd8321..5f6a374 100644 --- a/login/loginserver.go +++ b/login/loginserver.go @@ -1,126 +1,52 @@ package login import ( - "fmt" - "log" - "net" - "sync" - - "github.com/CPunch/gopenfusion/config" "github.com/CPunch/gopenfusion/internal/db" "github.com/CPunch/gopenfusion/internal/protocol" - "github.com/CPunch/gopenfusion/internal/protocol/pool" "github.com/CPunch/gopenfusion/internal/redis" + "github.com/CPunch/gopenfusion/internal/service" ) -type PacketHandler func(peer *protocol.CNPeer, pkt protocol.Packet) error - -func stubbedPacket(_ *protocol.CNPeer, _ protocol.Packet) error { /* stubbed */ return nil } - type LoginServer struct { - listener net.Listener - port int - dbHndlr *db.DBHandler - redisHndlr *redis.RedisHandler - eRecv chan *protocol.Event - peers map[*protocol.CNPeer]bool - packetHandlers map[uint32]PacketHandler - peerLock sync.Mutex + service *service.Service + dbHndlr *db.DBHandler + redisHndlr *redis.RedisHandler } func NewLoginServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port int) (*LoginServer, error) { - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + srvc, err := service.NewService("LOGIN", port) if err != nil { return nil, err } server := &LoginServer{ - listener: listener, - port: port, + service: srvc, dbHndlr: dbHndlr, redisHndlr: redisHndlr, - peers: make(map[*protocol.CNPeer]bool), - eRecv: make(chan *protocol.Event), } - server.packetHandlers = map[uint32]PacketHandler{ - protocol.P_CL2LS_REQ_LOGIN: server.Login, - protocol.P_CL2LS_REQ_CHECK_CHAR_NAME: server.CheckCharacterName, - protocol.P_CL2LS_REQ_SAVE_CHAR_NAME: server.SaveCharacterName, - protocol.P_CL2LS_REQ_CHAR_CREATE: server.CharacterCreate, - protocol.P_CL2LS_REQ_CHAR_SELECT: server.ShardSelect, - protocol.P_CL2LS_REQ_CHAR_DELETE: server.CharacterDelete, - protocol.P_CL2LS_REQ_SHARD_SELECT: stubbedPacket, - protocol.P_CL2LS_REQ_SHARD_LIST_INFO: stubbedPacket, - protocol.P_CL2LS_CHECK_NAME_LIST: stubbedPacket, - protocol.P_CL2LS_REQ_SAVE_CHAR_TUTOR: server.FinishTutorial, - protocol.P_CL2LS_REQ_PC_EXIT_DUPLICATE: stubbedPacket, - protocol.P_CL2LS_REP_LIVE_CHECK: stubbedPacket, - protocol.P_CL2LS_REQ_CHANGE_CHAR_NAME: stubbedPacket, - protocol.P_CL2LS_REQ_SERVER_SELECT: stubbedPacket, + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_LOGIN, server.Login) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_CHECK_CHAR_NAME, server.CheckCharacterName) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_SAVE_CHAR_NAME, server.SaveCharacterName) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_CHAR_CREATE, server.CharacterCreate) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_CHAR_SELECT, server.ShardSelect) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_CHAR_DELETE, server.CharacterDelete) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_SHARD_SELECT, service.StubbedPacket) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_SHARD_LIST_INFO, service.StubbedPacket) + srvc.AddPacketHandler(protocol.P_CL2LS_CHECK_NAME_LIST, service.StubbedPacket) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_SAVE_CHAR_TUTOR, server.FinishTutorial) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_PC_EXIT_DUPLICATE, service.StubbedPacket) + srvc.AddPacketHandler(protocol.P_CL2LS_REP_LIVE_CHECK, service.StubbedPacket) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_CHANGE_CHAR_NAME, service.StubbedPacket) + srvc.AddPacketHandler(protocol.P_CL2LS_REQ_SERVER_SELECT, service.StubbedPacket) + + srvc.OnConnect = func(peer *protocol.CNPeer) interface{} { + return nil } return server, nil } func (server *LoginServer) Start() { - log.Printf("Login service hosted on %s:%d\n", config.GetAnnounceIP(), server.port) - - go server.handleEvents() - for { - conn, err := server.listener.Accept() - if err != nil { - log.Println("Connection error: ", err) - return - } - - client := protocol.NewCNPeer(server.eRecv, conn) - server.connect(client) - go client.Handler() - } -} - -func (server *LoginServer) handleEvents() { - for event := range server.eRecv { - switch event.Type { - case protocol.EVENT_CLIENT_DISCONNECT: - server.disconnect(event.Peer) - case protocol.EVENT_CLIENT_PACKET: - if err := server.handlePacket(event.Peer, event.PktID, protocol.NewPacket(event.Pkt)); err != nil { - log.Printf("Error handling packet: %v", err) - event.Peer.Kill() - } - - // the packet is given to us by the event, so we'll need to make sure to return it to the pool - pool.Put(event.Pkt) - } - } -} - -func (server *LoginServer) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error { - if hndlr, ok := server.packetHandlers[typeID]; ok { - if err := hndlr(peer, pkt); err != nil { - return err - } - } else { - log.Printf("[WARN] unknown packet ID: %x\n", typeID) - } - - return nil -} - -func (server *LoginServer) disconnect(peer *protocol.CNPeer) { - server.peerLock.Lock() - defer server.peerLock.Unlock() - - log.Printf("Peer %p disconnected from LOGIN\n", peer) - delete(server.peers, peer) -} - -func (server *LoginServer) connect(peer *protocol.CNPeer) { - server.peerLock.Lock() - defer server.peerLock.Unlock() - - log.Printf("New peer %p connected to LOGIN\n", peer) - server.peers[peer] = true + server.service.Start() } diff --git a/shard/chat.go b/shard/chat.go index dc05b16..15fe52c 100644 --- a/shard/chat.go +++ b/shard/chat.go @@ -1,16 +1,20 @@ package shard -import "github.com/CPunch/gopenfusion/internal/protocol" +import ( + "fmt" -func (server *ShardServer) freeChat(peer *protocol.CNPeer, pkt protocol.Packet) error { + "github.com/CPunch/gopenfusion/internal/entity" + "github.com/CPunch/gopenfusion/internal/protocol" +) + +func (server *ShardServer) freeChat(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var chat protocol.SP_CL2FE_REQ_SEND_FREECHAT_MESSAGE pkt.Decode(&chat) - // sanity check - plr, err := server.getPlayer(peer) - if err != nil { - return err + if _plr == nil { + return fmt.Errorf("freeChat: _plr is nil") } + plr := _plr.(*entity.Player) // spread message return server.sendAllPacket(plr, protocol.P_FE2CL_REP_SEND_FREECHAT_MESSAGE_SUCC, protocol.SP_FE2CL_REP_SEND_FREECHAT_MESSAGE_SUCC{ @@ -20,15 +24,14 @@ func (server *ShardServer) freeChat(peer *protocol.CNPeer, pkt protocol.Packet) }) } -func (server *ShardServer) menuChat(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) menuChat(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var chat protocol.SP_CL2FE_REQ_SEND_MENUCHAT_MESSAGE pkt.Decode(&chat) - // sanity check - plr, err := server.getPlayer(peer) - if err != nil { - return err + if _plr == nil { + return fmt.Errorf("menuChat: _plr is nil") } + plr := _plr.(*entity.Player) // spread message return server.sendAllPacket(plr, protocol.P_FE2CL_REP_SEND_MENUCHAT_MESSAGE_SUCC, protocol.SP_FE2CL_REP_SEND_MENUCHAT_MESSAGE_SUCC{ @@ -38,15 +41,14 @@ func (server *ShardServer) menuChat(peer *protocol.CNPeer, pkt protocol.Packet) }) } -func (server *ShardServer) emoteChat(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) emoteChat(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var chat protocol.SP_CL2FE_REQ_PC_AVATAR_EMOTES_CHAT pkt.Decode(&chat) - // sanity check - plr, err := server.getPlayer(peer) - if err != nil { - return err + if _plr == nil { + return fmt.Errorf("emoteChat: _plr is nil") } + plr := _plr.(*entity.Player) // spread message return server.sendAllPacket(plr, protocol.P_FE2CL_REP_PC_AVATAR_EMOTES_CHAT, protocol.SP_FE2CL_REP_PC_AVATAR_EMOTES_CHAT{ diff --git a/shard/join.go b/shard/join.go index b3f6e5e..75cf4f1 100644 --- a/shard/join.go +++ b/shard/join.go @@ -10,27 +10,29 @@ import ( ) func (server *ShardServer) attachPlayer(peer *protocol.CNPeer, meta redis.LoginMetadata) (*entity.Player, error) { - // resending a shard enter packet? - old, _ := server.getPlayer(peer) - if old != nil { - return nil, fmt.Errorf("resent enter packet") - } - - // attach player - plr, err := server.dbHndlr.GetPlayer(int(meta.PlayerID)) + dbPlr, err := server.dbHndlr.GetPlayer(int(meta.PlayerID)) if err != nil { return nil, err } - plr.Peer = peer + plr := entity.NewPlayer(peer, dbPlr) - server.setPlayer(peer, plr) + // once we create the player, it's memory address is owned by the + // server.Start() goroutine. the only functions allowed to access + // it are the packet handlers as no other goroutines will be + // concurrently accessing it. + server.service.SetPeerData(peer, plr) return plr, nil } -func (server *ShardServer) RequestEnter(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) RequestEnter(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var enter protocol.SP_CL2FE_REQ_PC_ENTER pkt.Decode(&enter) + // resending a shard enter packet? + if _plr != nil { + return fmt.Errorf("resent enter packet") + } + loginData, err := server.redisHndlr.GetLogin(enter.IEnterSerialKey) if err != nil { // the error codes for P_FE2CL_REP_PC_ENTER_FAIL aren't referenced in the client :( @@ -52,29 +54,35 @@ func (server *ShardServer) RequestEnter(peer *protocol.CNPeer, pkt protocol.Pack // setup peer peer.E_key = protocol.CreateNewKey(resp.UiSvrTime, uint64(resp.IID+1), uint64(resp.PCLoadData2CL.IFusionMatter+1)) peer.FE_key = loginData.FEKey - peer.PlayerID = loginData.PlayerID - peer.AccountID = loginData.AccountID peer.SetActiveKey(protocol.USE_FE) log.Printf("Player %d (AccountID %d) entered\n", resp.IID, loginData.AccountID) - if err := peer.Send(protocol.P_FE2CL_REP_PC_ENTER_SUCC, resp); err != nil { return err } - // we send the chunk updates (PC_NEW, NPC_NEW, etc.) after the enter packet - server.updatePlayerPosition(plr, int(plr.X), int(plr.Y), int(plr.Z), int(plr.Angle)) return nil } -func (server *ShardServer) LoadingComplete(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) LoadingComplete(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var loadComplete protocol.SP_CL2FE_REQ_PC_LOADING_COMPLETE pkt.Decode(&loadComplete) - plr, err := server.getPlayer(peer) + // was the peer attached to a player? + if _plr == nil { + return fmt.Errorf("loadingComplete: plr is nil") + } + plr := _plr.(*entity.Player) + + err := peer.Send(protocol.P_FE2CL_REP_PC_LOADING_COMPLETE_SUCC, protocol.SP_FE2CL_REP_PC_LOADING_COMPLETE_SUCC{IPC_ID: int32(plr.PlayerID)}) if err != nil { return err } - return peer.Send(protocol.P_FE2CL_REP_PC_LOADING_COMPLETE_SUCC, protocol.SP_FE2CL_REP_PC_LOADING_COMPLETE_SUCC{IPC_ID: int32(plr.PlayerID)}) + // we send the chunk updates (PC_NEW, NPC_NEW, etc.) after the enter packet + chunkPos := entity.MakeChunkPosition(plr.X, plr.Y) + viewableChunks := server.getViewableChunks(chunkPos) + plr.SetChunkPos(chunkPos) + server.addEntityToChunks(plr, viewableChunks) + return nil } diff --git a/shard/movement.go b/shard/movement.go index a395983..898df47 100644 --- a/shard/movement.go +++ b/shard/movement.go @@ -1,6 +1,8 @@ package shard import ( + "fmt" + "github.com/CPunch/gopenfusion/internal/entity" "github.com/CPunch/gopenfusion/internal/protocol" ) @@ -13,15 +15,14 @@ func (server *ShardServer) updatePlayerPosition(plr *entity.Player, X, Y, Z, Ang server.updateEntityChunk(plr, plr.GetChunkPos(), entity.MakeChunkPosition(X, Y)) } -func (server *ShardServer) playerMove(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) playerMove(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var move protocol.SP_CL2FE_REQ_PC_MOVE pkt.Decode(&move) - // sanity check - plr, err := server.getPlayer(peer) - if err != nil { - return err + if _plr == nil { + return fmt.Errorf("playerMove: _plr is nil") } + plr := _plr.(*entity.Player) // update chunking server.updatePlayerPosition(plr, int(move.IX), int(move.IY), int(move.IZ), int(move.IAngle)) @@ -42,15 +43,14 @@ func (server *ShardServer) playerMove(peer *protocol.CNPeer, pkt protocol.Packet }) } -func (server *ShardServer) playerStop(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) playerStop(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var stop protocol.SP_CL2FE_REQ_PC_STOP pkt.Decode(&stop) - // sanity check - plr, err := server.getPlayer(peer) - if err != nil { - return err + if _plr == nil { + return fmt.Errorf("playerStop: _plr is nil") } + plr := _plr.(*entity.Player) // update chunking server.updatePlayerPosition(plr, int(stop.IX), int(stop.IY), int(stop.IZ), plr.Angle) @@ -65,15 +65,14 @@ func (server *ShardServer) playerStop(peer *protocol.CNPeer, pkt protocol.Packet }) } -func (server *ShardServer) playerJump(peer *protocol.CNPeer, pkt protocol.Packet) error { +func (server *ShardServer) playerJump(peer *protocol.CNPeer, _plr interface{}, pkt protocol.Packet) error { var jump protocol.SP_CL2FE_REQ_PC_JUMP pkt.Decode(&jump) - // sanity check - plr, err := server.getPlayer(peer) - if err != nil { - return err + if _plr == nil { + return fmt.Errorf("playerJump: _plr is nil") } + plr := _plr.(*entity.Player) // update chunking server.updatePlayerPosition(plr, int(jump.IX), int(jump.IY), int(jump.IZ), plr.Angle) diff --git a/shard/shardserver.go b/shard/shardserver.go index 0a03ec7..6744881 100644 --- a/shard/shardserver.go +++ b/shard/shardserver.go @@ -1,60 +1,47 @@ package shard import ( - "fmt" - "log" - "net" - "sync" - "github.com/CPunch/gopenfusion/config" "github.com/CPunch/gopenfusion/internal/db" "github.com/CPunch/gopenfusion/internal/entity" "github.com/CPunch/gopenfusion/internal/protocol" - "github.com/CPunch/gopenfusion/internal/protocol/pool" "github.com/CPunch/gopenfusion/internal/redis" + "github.com/CPunch/gopenfusion/internal/service" ) type PacketHandler func(peer *protocol.CNPeer, pkt protocol.Packet) error type ShardServer struct { - listener net.Listener - port int - dbHndlr *db.DBHandler - redisHndlr *redis.RedisHandler - eRecv chan *protocol.Event - packetHandlers map[uint32]PacketHandler - peers map[*protocol.CNPeer]*entity.Player - chunks map[entity.ChunkPosition]*entity.Chunk - peerLock sync.Mutex + service *service.Service + dbHndlr *db.DBHandler + redisHndlr *redis.RedisHandler + chunks map[entity.ChunkPosition]*entity.Chunk } func NewShardServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port int) (*ShardServer, error) { - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + srvc, err := service.NewService("SHARD", port) if err != nil { return nil, err } server := &ShardServer{ - listener: listener, - port: port, - dbHndlr: dbHndlr, - redisHndlr: redisHndlr, - packetHandlers: make(map[uint32]PacketHandler), - peers: make(map[*protocol.CNPeer]*entity.Player), - chunks: make(map[entity.ChunkPosition]*entity.Chunk), - eRecv: make(chan *protocol.Event), + service: srvc, + dbHndlr: dbHndlr, + redisHndlr: redisHndlr, + chunks: make(map[entity.ChunkPosition]*entity.Chunk), } - server.packetHandlers = map[uint32]PacketHandler{ - protocol.P_CL2FE_REQ_PC_ENTER: server.RequestEnter, - protocol.P_CL2FE_REQ_PC_LOADING_COMPLETE: server.LoadingComplete, - protocol.P_CL2FE_REQ_PC_MOVE: server.playerMove, - protocol.P_CL2FE_REQ_PC_STOP: server.playerStop, - protocol.P_CL2FE_REQ_PC_JUMP: server.playerJump, - protocol.P_CL2FE_REQ_SEND_FREECHAT_MESSAGE: server.freeChat, - protocol.P_CL2FE_REQ_SEND_MENUCHAT_MESSAGE: server.menuChat, - protocol.P_CL2FE_REQ_PC_AVATAR_EMOTES_CHAT: server.emoteChat, - } + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_PC_ENTER, server.RequestEnter) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_PC_LOADING_COMPLETE, server.LoadingComplete) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_PC_MOVE, server.playerMove) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_PC_STOP, server.playerStop) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_PC_JUMP, server.playerJump) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_SEND_FREECHAT_MESSAGE, server.freeChat) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_SEND_MENUCHAT_MESSAGE, server.menuChat) + srvc.AddPacketHandler(protocol.P_CL2FE_REQ_PC_AVATAR_EMOTES_CHAT, server.emoteChat) + + srvc.OnConnect = server.onConnect + srvc.OnDisconnect = server.onDisconnect redisHndlr.RegisterShard(redis.ShardMetadata{ IP: config.GetAnnounceIP(), @@ -64,97 +51,18 @@ func NewShardServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port return server, nil } -func (server *ShardServer) handleEvents() { - for event := range server.eRecv { - switch event.Type { - case protocol.EVENT_CLIENT_DISCONNECT: - server.disconnect(event.Peer) - case protocol.EVENT_CLIENT_PACKET: - if err := server.handlePacket(event.Peer, event.PktID, protocol.NewPacket(event.Pkt)); err != nil { - event.Peer.Kill() - } - - // the packet is given to us by the event, so we'll need to make sure to return it to the pool - pool.Put(event.Pkt) - } - } -} - func (server *ShardServer) Start() { server.LoadNPCs() + server.service.Start() +} - log.Printf("Shard service hosted on %s:%d\n", config.GetAnnounceIP(), server.port) - go server.handleEvents() - for { - conn, err := server.listener.Accept() - if err != nil { - log.Println("Connection error: ", err) - return - } - - client := protocol.NewCNPeer(server.eRecv, conn) - server.connect(client) - go client.Handler() +func (server *ShardServer) onDisconnect(peer *protocol.CNPeer, _plr interface{}) { + // remove from chunks + if _plr != nil { + server.removeEntity(_plr.(*entity.Player)) } } -func (server *ShardServer) GetPort() int { - return server.port -} - -func (server *ShardServer) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error { - if hndlr, ok := server.packetHandlers[typeID]; ok { - if err := hndlr(peer, pkt); err != nil { - return err - } - } else { - log.Printf("[WARN] unknown packet ID: %x\n", typeID) - } - +func (server *ShardServer) onConnect(peer *protocol.CNPeer) interface{} { return nil } - -func (server *ShardServer) disconnect(peer *protocol.CNPeer) { - server.peerLock.Lock() - defer server.peerLock.Unlock() - - // remove from chunk(s) - plr, ok := server.peers[peer] - if ok { - log.Printf("Player %d (AccountID %d) disconnected\n", plr.PlayerID, plr.AccountID) - server.removeEntity(plr) - } - - log.Printf("Peer %p disconnected from SHARD\n", peer) - delete(server.peers, peer) -} - -func (server *ShardServer) connect(peer *protocol.CNPeer) { - server.peerLock.Lock() - defer server.peerLock.Unlock() - - log.Printf("New peer %p connected to SHARD\n", peer) - server.peers[peer] = nil -} - -func (server *ShardServer) getPlayer(peer *protocol.CNPeer) (*entity.Player, error) { - plr, ok := server.peers[peer] - if !ok { - return nil, fmt.Errorf("player not found") - } - - return plr, nil -} - -func (server *ShardServer) setPlayer(peer *protocol.CNPeer, plr *entity.Player) { - server.peers[peer] = plr -} - -// If f returns false the iteration is stopped. -func (server *ShardServer) rangePeers(f func(peer *protocol.CNPeer, plr *entity.Player) bool) { - for peer, plr := range server.peers { - if f(peer, plr) { - return - } - } -}