Compare commits

...

4 Commits

Author SHA1 Message Date
8f00a0c492 movement.go: added support for basic movement packets
you should be able to view other players and jump around together,
although while testing locally one of the clients would always trigger
the "Some irregularities have been found with your connection to the
server, so your game will be closed" speed check for some reason ???

really not sure, might just be my machine

chunking uhhh works ? kind of, not tested for more than a few seconds
before one of the clients disconnects
2023-06-25 04:27:42 -05:00
f6ab7a9b5d started chunking 2023-06-25 03:33:17 -05:00
f0b9bc6ed6 god forgive me for this commit 2023-06-25 01:51:21 -05:00
dfc00bcb52 shardServer: simplified peer map logic 2023-06-24 22:36:04 -05:00
14 changed files with 566 additions and 124 deletions

View File

@ -33,6 +33,7 @@ var (
SHARD_PORT = 23001 SHARD_PORT = 23001
LOGIN_TIMEOUT = time.Second * 30 LOGIN_TIMEOUT = time.Second * 30
VIEW_DISTANCE = 25600
) )
func getEnv(key string, fallback string) string { func getEnv(key string, fallback string) string {

View File

@ -4,7 +4,7 @@ import (
"database/sql" "database/sql"
"github.com/CPunch/gopenfusion/config" "github.com/CPunch/gopenfusion/config"
"github.com/CPunch/gopenfusion/core" "github.com/CPunch/gopenfusion/core/entity"
"github.com/CPunch/gopenfusion/core/protocol" "github.com/CPunch/gopenfusion/core/protocol"
"github.com/blockloop/scan" "github.com/blockloop/scan"
) )
@ -121,8 +121,8 @@ const (
INNER JOIN Accounts as acc ON p.AccountID = acc.AccountID ` INNER JOIN Accounts as acc ON p.AccountID = acc.AccountID `
) )
func (db *DBHandler) readPlayer(rows *sql.Rows) (*core.Player, error) { func (db *DBHandler) readPlayer(rows *sql.Rows) (*entity.Player, error) {
plr := core.Player{ActiveNanoSlotNum: -1} plr := entity.Player{ActiveNanoSlotNum: 0}
if err := rows.Scan( if err := rows.Scan(
&plr.PlayerID, &plr.AccountID, &plr.Slot, &plr.PCStyle.SzFirstName, &plr.PCStyle.SzLastName, &plr.PlayerID, &plr.AccountID, &plr.Slot, &plr.PCStyle.SzFirstName, &plr.PCStyle.SzLastName,
@ -162,13 +162,13 @@ func (db *DBHandler) readPlayer(rows *sql.Rows) (*core.Player, error) {
return &plr, nil return &plr, nil
} }
func (db *DBHandler) GetPlayer(PlayerID int) (*core.Player, error) { func (db *DBHandler) GetPlayer(PlayerID int) (*entity.Player, error) {
rows, err := db.Query(QUERY_PLAYERS+"WHERE p.PlayerID = $1", PlayerID) rows, err := db.Query(QUERY_PLAYERS+"WHERE p.PlayerID = $1", PlayerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var plr *core.Player var plr *entity.Player
for rows.Next() { for rows.Next() {
plr, err = db.readPlayer(rows) plr, err = db.readPlayer(rows)
if err != nil { if err != nil {
@ -179,13 +179,13 @@ func (db *DBHandler) GetPlayer(PlayerID int) (*core.Player, error) {
return plr, nil return plr, nil
} }
func (db *DBHandler) GetPlayers(AccountID int) ([]core.Player, error) { func (db *DBHandler) GetPlayers(AccountID int) ([]entity.Player, error) {
rows, err := db.Query(QUERY_PLAYERS+"WHERE p.AccountID = $1", AccountID) rows, err := db.Query(QUERY_PLAYERS+"WHERE p.AccountID = $1", AccountID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var plrs []core.Player var plrs []entity.Player
for rows.Next() { for rows.Next() {
plr, err := db.readPlayer(rows) plr, err := db.readPlayer(rows)
if err != nil { if err != nil {

86
core/entity/chunk.go Normal file
View File

@ -0,0 +1,86 @@
package entity
import (
"log"
"sync"
)
type Chunk struct {
Position ChunkPosition
Entities map[Entity]struct{}
lock sync.Mutex
}
func NewChunk(position ChunkPosition) *Chunk {
return &Chunk{
Position: position,
Entities: make(map[Entity]struct{}),
}
}
func (c *Chunk) AddEntity(entity Entity) {
c.Entities[entity] = struct{}{}
}
func (c *Chunk) RemoveEntity(entity Entity) {
delete(c.Entities, entity)
}
// send packet to all peers in this chunk and kill each peer if error
func (c *Chunk) SendPacket(typeID uint32, pkt ...interface{}) {
c.SendPacketExclude(nil, typeID, pkt...)
}
func (c *Chunk) SendPacketExclude(exclude Entity, typeID uint32, pkt ...interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
for entity := range c.Entities {
// only send to players, and exclude the player that sent the packet
if entity.GetKind() != ENTITY_KIND_PLAYER || entity == exclude {
continue
}
plr, ok := entity.(*Player)
if !ok {
log.Panic("Chunk.SendPacket: entity kind was player, but is not a *Player")
}
peer := plr.Peer
if err := peer.Send(typeID, pkt...); err != nil {
log.Printf("Error sending packet to peer %p: %v", peer, err)
peer.Kill()
}
}
}
func (c *Chunk) GetAdjacentPositions() []ChunkPosition {
return []ChunkPosition{
{c.Position.X - 1, c.Position.Y - 1},
{c.Position.X - 1, c.Position.Y},
{c.Position.X - 1, c.Position.Y + 1},
{c.Position.X, c.Position.Y - 1},
{c.Position.X, c.Position.Y},
{c.Position.X, c.Position.Y + 1},
{c.Position.X + 1, c.Position.Y - 1},
{c.Position.X + 1, c.Position.Y},
{c.Position.X + 1, c.Position.Y + 1},
}
}
// https://stackoverflow.com/a/45428032 lol
func ChunkSliceDifference(a, b []*Chunk) []*Chunk {
m := make(map[*Chunk]struct{})
for _, item := range b {
m[item] = struct{}{}
}
var diff []*Chunk
for _, item := range a {
if _, ok := m[item]; !ok {
diff = append(diff, item)
}
}
return diff
}

View File

@ -0,0 +1,15 @@
package entity
import "github.com/CPunch/gopenfusion/config"
type ChunkPosition struct {
X int
Y int
}
func MakeChunkPosition(x, y int) ChunkPosition {
return ChunkPosition{
X: x / (config.VIEW_DISTANCE / 3),
Y: y / (config.VIEW_DISTANCE / 3),
}
}

25
core/entity/entity.go Normal file
View File

@ -0,0 +1,25 @@
package entity
import "github.com/CPunch/gopenfusion/core/protocol"
type EntityKind int
const (
ENTITY_KIND_PLAYER EntityKind = iota
ENTITY_KIND_NPC
)
type Entity interface {
GetKind() EntityKind
GetChunk() ChunkPosition
GetPosition() (x int, y int, z int)
GetAngle() int
SetChunk(chunk ChunkPosition)
SetPosition(x, y, z int)
SetAngle(angle int)
DisappearFromViewOf(peer *protocol.CNPeer)
EnterIntoViewOf(peer *protocol.CNPeer)
}

View File

@ -1,4 +1,4 @@
package core package entity
import ( import (
"github.com/CPunch/gopenfusion/config" "github.com/CPunch/gopenfusion/config"
@ -6,6 +6,8 @@ import (
) )
type Player struct { type Player struct {
Peer *protocol.CNPeer
Chunk ChunkPosition
PlayerID int PlayerID int
AccountID int AccountID int
AccountLevel int AccountLevel int
@ -33,6 +35,51 @@ type Player struct {
ActiveNanoSlotNum int ActiveNanoSlotNum int
Fatigue int Fatigue int
CurrentMissionID int CurrentMissionID int
IPCState int8
}
// ==================== Entity interface ====================
func (plr *Player) GetKind() EntityKind {
return ENTITY_KIND_PLAYER
}
func (plr *Player) GetChunk() ChunkPosition {
return plr.Chunk
}
func (plr *Player) GetPosition() (x int, y int, z int) {
return plr.X, plr.Y, plr.Z
}
func (plr *Player) GetAngle() int {
return plr.Angle
}
func (plr *Player) SetChunk(chunk ChunkPosition) {
plr.Chunk = chunk
}
func (plr *Player) SetPosition(x, y, z int) {
plr.X = x
plr.Y = y
plr.Z = z
}
func (plr *Player) SetAngle(angle int) {
plr.Angle = angle
}
func (plr *Player) DisappearFromViewOf(peer *protocol.CNPeer) {
peer.Send(protocol.P_FE2CL_PC_EXIT, protocol.SP_FE2CL_PC_EXIT{
IID: int32(plr.PlayerID),
})
}
func (plr *Player) EnterIntoViewOf(peer *protocol.CNPeer) {
peer.Send(protocol.P_FE2CL_PC_NEW, protocol.SP_FE2CL_PC_NEW{
PCAppearanceData: plr.GetAppearanceData(),
})
} }
func (plr *Player) ToPCLoadData2CL() protocol.SPCLoadData2CL { func (plr *Player) ToPCLoadData2CL() protocol.SPCLoadData2CL {
@ -62,3 +109,19 @@ func (plr *Player) ToPCLoadData2CL() protocol.SPCLoadData2CL {
IFatigue: 50, IFatigue: 50,
} }
} }
func (plr *Player) GetAppearanceData() protocol.SPCAppearanceData {
return protocol.SPCAppearanceData{
IID: int32(plr.PlayerID),
IHP: int32(plr.HP),
ILv: int16(plr.Level),
IX: int32(plr.X),
IY: int32(plr.Y),
IZ: int32(plr.Z),
IAngle: int32(plr.Angle),
PCStyle: plr.PCStyle,
IPCState: plr.IPCState,
ItemEquip: plr.Equip,
Nano: plr.Nanos[plr.ActiveNanoSlotNum],
}
}

View File

@ -15,27 +15,23 @@ const (
USE_FE USE_FE
) )
type PeerHandler interface {
HandlePacket(peer *CNPeer, typeID uint32, pkt Packet) error
Disconnect(peer *CNPeer)
}
// CNPeer is a simple wrapper for net.Conn connections to send/recv packets over the Fusionfall packet protocol. // CNPeer is a simple wrapper for net.Conn connections to send/recv packets over the Fusionfall packet protocol.
type CNPeer struct { type CNPeer struct {
conn net.Conn conn net.Conn
handler PeerHandler eRecv chan *Event
SzID string SzID string
E_key []byte E_key []byte
FE_key []byte FE_key []byte
AccountID int AccountID int
PlayerID int32
whichKey int whichKey int
alive bool alive bool
} }
func NewCNPeer(handler PeerHandler, conn net.Conn) *CNPeer { func NewCNPeer(eRecv chan *Event, conn net.Conn) *CNPeer {
return &CNPeer{ return &CNPeer{
conn: conn, conn: conn,
handler: handler, eRecv: eRecv,
SzID: "", SzID: "",
E_key: []byte(DEFAULT_KEY), E_key: []byte(DEFAULT_KEY),
FE_key: nil, FE_key: nil,
@ -92,13 +88,14 @@ func (peer *CNPeer) SetActiveKey(whichKey int) {
} }
func (peer *CNPeer) Kill() { func (peer *CNPeer) Kill() {
log.Printf("Killing peer %p", peer)
if !peer.alive { if !peer.alive {
return return
} }
peer.alive = false peer.alive = false
peer.conn.Close() peer.conn.Close()
peer.handler.Disconnect(peer) peer.eRecv <- &Event{Type: EVENT_CLIENT_DISCONNECT, Peer: peer}
} }
// meant to be invoked as a goroutine // meant to be invoked as a goroutine
@ -122,7 +119,6 @@ func (peer *CNPeer) Handler() {
// grab buffer && read packet body // grab buffer && read packet body
if err := func() error { if err := func() error {
buf := pool.Get() buf := pool.Get()
defer pool.Put(buf)
if _, err := buf.ReadFrom(io.LimitReader(peer.conn, int64(sz))); err != nil { if _, err := buf.ReadFrom(io.LimitReader(peer.conn, int64(sz))); err != nil {
return fmt.Errorf("failed to read packet body! %v", err) return fmt.Errorf("failed to read packet body! %v", err)
} }
@ -131,18 +127,15 @@ func (peer *CNPeer) Handler() {
DecryptData(buf.Bytes(), peer.E_key) DecryptData(buf.Bytes(), peer.E_key)
pkt := NewPacket(buf) pkt := NewPacket(buf)
// create packet && read typeID // create packet && read pktID
var typeID uint32 var pktID uint32
if err := pkt.Decode(&typeID); err != nil { if err := pkt.Decode(&pktID); err != nil {
return fmt.Errorf("failed to read packet type! %v", err) return fmt.Errorf("failed to read packet type! %v", err)
} }
// dispatch packet // dispatch packet
log.Printf("Got packet ID: %x, with a sizeof: %d\n", typeID, sz) log.Printf("Got packet ID: %x, with a sizeof: %d\n", pktID, sz)
if err := peer.handler.HandlePacket(peer, typeID, pkt); err != nil { peer.eRecv <- &Event{Type: EVENT_CLIENT_PACKET, Peer: peer, Pkt: buf, PktID: pktID}
return err
}
return nil return nil
}(); err != nil { }(); err != nil {
log.Printf("[FATAL] %v", err) log.Printf("[FATAL] %v", err)

15
core/protocol/event.go Normal file
View File

@ -0,0 +1,15 @@
package protocol
import "bytes"
const (
EVENT_CLIENT_DISCONNECT = iota
EVENT_CLIENT_PACKET
)
type Event struct {
Type int
Peer *CNPeer
Pkt *bytes.Buffer
PktID uint32
}

View File

@ -217,6 +217,7 @@ func (server *LoginServer) CharacterCreate(peer *protocol.CNPeer, pkt protocol.P
pkt.Decode(&charPkt) pkt.Decode(&charPkt)
if !validateCharacterCreation(&charPkt) { if !validateCharacterCreation(&charPkt) {
log.Printf("Invalid character creation packet: %+v", charPkt)
return SendFail(peer) return SendFail(peer)
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/CPunch/gopenfusion/config" "github.com/CPunch/gopenfusion/config"
"github.com/CPunch/gopenfusion/core/db" "github.com/CPunch/gopenfusion/core/db"
"github.com/CPunch/gopenfusion/core/protocol" "github.com/CPunch/gopenfusion/core/protocol"
"github.com/CPunch/gopenfusion/core/protocol/pool"
"github.com/CPunch/gopenfusion/core/redis" "github.com/CPunch/gopenfusion/core/redis"
) )
@ -21,9 +22,10 @@ type LoginServer struct {
port int port int
dbHndlr *db.DBHandler dbHndlr *db.DBHandler
redisHndlr *redis.RedisHandler redisHndlr *redis.RedisHandler
packetHandlers map[uint32]PacketHandler eRecv chan *protocol.Event
peers map[*protocol.CNPeer]bool peers map[*protocol.CNPeer]bool
peersLock sync.Mutex packetHandlers map[uint32]PacketHandler
peerLock sync.Mutex
} }
func NewLoginServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port int) (*LoginServer, error) { func NewLoginServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port int) (*LoginServer, error) {
@ -38,6 +40,7 @@ func NewLoginServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port
dbHndlr: dbHndlr, dbHndlr: dbHndlr,
redisHndlr: redisHndlr, redisHndlr: redisHndlr,
peers: make(map[*protocol.CNPeer]bool), peers: make(map[*protocol.CNPeer]bool),
eRecv: make(chan *protocol.Event),
} }
server.packetHandlers = map[uint32]PacketHandler{ server.packetHandlers = map[uint32]PacketHandler{
@ -63,6 +66,7 @@ func NewLoginServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port
func (server *LoginServer) Start() { func (server *LoginServer) Start() {
log.Printf("Login service hosted on %s:%d\n", config.GetAnnounceIP(), server.port) log.Printf("Login service hosted on %s:%d\n", config.GetAnnounceIP(), server.port)
go server.handleEvents()
for { for {
conn, err := server.listener.Accept() conn, err := server.listener.Accept()
if err != nil { if err != nil {
@ -70,13 +74,32 @@ func (server *LoginServer) Start() {
return return
} }
client := protocol.NewCNPeer(server, conn) client := protocol.NewCNPeer(server.eRecv, conn)
server.Connect(client) server.connect(client)
go client.Handler() go client.Handler()
} }
} }
func (server *LoginServer) HandlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error { func (server *LoginServer) handleEvents() {
for {
select {
case event := <-server.eRecv:
switch event.Type {
case protocol.EVENT_CLIENT_DISCONNECT:
server.disconnect(event.Peer)
case protocol.EVENT_CLIENT_PACKET:
defer pool.Put(event.Pkt)
if err := server.handlePacket(event.Peer, event.PktID, protocol.NewPacket(event.Pkt)); err != nil {
log.Printf("Error handling packet: %v", err)
event.Peer.Kill()
}
}
}
}
}
func (server *LoginServer) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error {
if hndlr, ok := server.packetHandlers[typeID]; ok { if hndlr, ok := server.packetHandlers[typeID]; ok {
if err := hndlr(peer, pkt); err != nil { if err := hndlr(peer, pkt); err != nil {
return err return err
@ -88,16 +111,18 @@ func (server *LoginServer) HandlePacket(peer *protocol.CNPeer, typeID uint32, pk
return nil return nil
} }
func (server *LoginServer) Disconnect(peer *protocol.CNPeer) { func (server *LoginServer) disconnect(peer *protocol.CNPeer) {
server.peersLock.Lock() server.peerLock.Lock()
defer server.peerLock.Unlock()
log.Printf("Peer %p disconnected from LOGIN\n", peer) log.Printf("Peer %p disconnected from LOGIN\n", peer)
delete(server.peers, peer) delete(server.peers, peer)
server.peersLock.Unlock()
} }
func (server *LoginServer) Connect(peer *protocol.CNPeer) { func (server *LoginServer) connect(peer *protocol.CNPeer) {
server.peersLock.Lock() server.peerLock.Lock()
defer server.peerLock.Unlock()
log.Printf("New peer %p connected to LOGIN\n", peer) log.Printf("New peer %p connected to LOGIN\n", peer)
server.peers[peer] = true server.peers[peer] = true
server.peersLock.Unlock()
} }

99
shard/chunks.go Normal file
View File

@ -0,0 +1,99 @@
package shard
import (
"github.com/CPunch/gopenfusion/core/entity"
)
func (server *ShardServer) getChunk(pos entity.ChunkPosition) *entity.Chunk {
chunk, ok := server.chunks[pos]
if !ok {
chunk = entity.NewChunk(pos)
server.chunks[pos] = chunk
}
return chunk
}
func (server *ShardServer) getViewableChunks(pos entity.ChunkPosition) []*entity.Chunk {
chunks := make([]*entity.Chunk, 0, 9)
for _, pos := range server.getChunk(pos).GetAdjacentPositions() {
chunks = append(chunks, server.getChunk(pos))
}
return chunks
}
// sends a packet to all peers in the given chunks, excluding the given peer
func (server *ShardServer) sendOthersPacket(plr *entity.Player, typeID uint32, pkt ...interface{}) error {
chunks := server.getViewableChunks(plr.Chunk)
for _, chunk := range chunks {
chunk.SendPacketExclude(plr, typeID, pkt...)
}
return nil
}
func (server *ShardServer) removeEntityFromChunks(chunks []*entity.Chunk, this entity.Entity) {
for _, chunk := range chunks {
for e, _ := range chunk.Entities {
if e == this {
continue
}
// notify other players we're leaving
if e.GetKind() == entity.ENTITY_KIND_PLAYER {
otherPlr := e.(*entity.Player)
this.DisappearFromViewOf(otherPlr.Peer)
}
// notify us they're leaving
if this.GetKind() == entity.ENTITY_KIND_PLAYER {
thisPlr := this.(*entity.Player)
e.DisappearFromViewOf(thisPlr.Peer)
}
}
}
}
func (server *ShardServer) addEntityToChunks(chunks []*entity.Chunk, this entity.Entity) {
for _, chunk := range chunks {
for e, _ := range chunk.Entities {
if e == this {
continue
}
// notify other players we're entering
if e.GetKind() == entity.ENTITY_KIND_PLAYER {
otherPlr := e.(*entity.Player)
this.EnterIntoViewOf(otherPlr.Peer)
}
// notify us they're entering
if this.GetKind() == entity.ENTITY_KIND_PLAYER {
thisPlr := this.(*entity.Player)
e.EnterIntoViewOf(thisPlr.Peer)
}
}
}
}
func (server *ShardServer) updateEntityChunk(e entity.Entity, from entity.ChunkPosition, to entity.ChunkPosition) {
// no change needed
if from == to {
return
}
oldViewables := server.getViewableChunks(from)
newViewables := server.getViewableChunks(to)
// compute differences
toExit := entity.ChunkSliceDifference(oldViewables, newViewables)
toEnter := entity.ChunkSliceDifference(newViewables, oldViewables)
// update chunks
server.removeEntityFromChunks(toExit, e)
server.addEntityToChunks(toEnter, e)
server.getChunk(from).RemoveEntity(e)
server.getChunk(to).AddEntity(e)
e.SetChunk(to)
}

View File

@ -5,10 +5,29 @@ import (
"log" "log"
"time" "time"
"github.com/CPunch/gopenfusion/core" "github.com/CPunch/gopenfusion/core/entity"
"github.com/CPunch/gopenfusion/core/protocol" "github.com/CPunch/gopenfusion/core/protocol"
"github.com/CPunch/gopenfusion/core/redis"
) )
func (server *ShardServer) attachPlayer(peer *protocol.CNPeer, meta redis.LoginMetadata) (*entity.Player, error) {
// resending a shard enter packet?
old, err := server.getPlayer(peer)
if old != nil {
return nil, fmt.Errorf("resent enter packet!")
}
// attach player
plr, err := server.dbHndlr.GetPlayer(int(meta.PlayerID))
if err != nil {
return nil, err
}
plr.Peer = peer
server.setPlayer(peer, plr)
return plr, nil
}
func (server *ShardServer) RequestEnter(peer *protocol.CNPeer, pkt protocol.Packet) error { func (server *ShardServer) RequestEnter(peer *protocol.CNPeer, pkt protocol.Packet) error {
var enter protocol.SP_CL2FE_REQ_PC_ENTER var enter protocol.SP_CL2FE_REQ_PC_ENTER
pkt.Decode(&enter) pkt.Decode(&enter)
@ -20,36 +39,27 @@ func (server *ShardServer) RequestEnter(peer *protocol.CNPeer, pkt protocol.Pack
return err return err
} }
// attach player plr, err := server.attachPlayer(peer, loginData)
var resp *protocol.SP_FE2CL_REP_PC_ENTER_SUCC if err != nil {
if err := server.UpdatePlayer(peer, func(old *core.Player) (*core.Player, error) {
if old != nil { // resending a shard enter packet?
return nil, fmt.Errorf("resent enter packet!")
}
plr, err := server.dbHndlr.GetPlayer(int(loginData.PlayerID))
if err != nil {
return nil, err
}
resp = &protocol.SP_FE2CL_REP_PC_ENTER_SUCC{
IID: int32(plr.PlayerID),
PCLoadData2CL: plr.ToPCLoadData2CL(),
UiSvrTime: uint64(time.Now().Unix()),
}
return plr, nil
}); err != nil {
peer.Send(protocol.P_FE2CL_REP_PC_ENTER_FAIL, protocol.SP_FE2CL_REP_PC_ENTER_FAIL{})
return err return err
} }
// setup key resp := &protocol.SP_FE2CL_REP_PC_ENTER_SUCC{
IID: int32(plr.PlayerID),
PCLoadData2CL: plr.ToPCLoadData2CL(),
UiSvrTime: uint64(time.Now().Unix()),
}
// setup peer
peer.E_key = protocol.CreateNewKey(resp.UiSvrTime, uint64(resp.IID+1), uint64(resp.PCLoadData2CL.IFusionMatter+1)) peer.E_key = protocol.CreateNewKey(resp.UiSvrTime, uint64(resp.IID+1), uint64(resp.PCLoadData2CL.IFusionMatter+1))
peer.FE_key = loginData.FEKey peer.FE_key = loginData.FEKey
peer.PlayerID = loginData.PlayerID
peer.AccountID = loginData.AccountID
peer.SetActiveKey(protocol.USE_FE) peer.SetActiveKey(protocol.USE_FE)
log.Printf("Player %d (AccountID %d) entered\n", resp.IID, loginData.AccountID) log.Printf("Player %d (AccountID %d) entered\n", resp.IID, loginData.AccountID)
server.updatePlayerPosition(plr, int(plr.X), int(plr.Y), int(plr.Z), int(plr.Angle))
return peer.Send(protocol.P_FE2CL_REP_PC_ENTER_SUCC, resp) return peer.Send(protocol.P_FE2CL_REP_PC_ENTER_SUCC, resp)
} }
@ -57,9 +67,9 @@ func (server *ShardServer) LoadingComplete(peer *protocol.CNPeer, pkt protocol.P
var loadComplete protocol.SP_CL2FE_REQ_PC_LOADING_COMPLETE var loadComplete protocol.SP_CL2FE_REQ_PC_LOADING_COMPLETE
pkt.Decode(&loadComplete) pkt.Decode(&loadComplete)
plr := server.LoadPlayer(peer) plr, err := server.getPlayer(peer)
if plr == nil { if err != nil {
return fmt.Errorf("peer has no player attached!") return err
} }
return peer.Send(protocol.P_FE2CL_REP_PC_LOADING_COMPLETE_SUCC, protocol.SP_FE2CL_REP_PC_LOADING_COMPLETE_SUCC{IPC_ID: int32(plr.PlayerID)}) return peer.Send(protocol.P_FE2CL_REP_PC_LOADING_COMPLETE_SUCC, protocol.SP_FE2CL_REP_PC_LOADING_COMPLETE_SUCC{IPC_ID: int32(plr.PlayerID)})

View File

@ -1 +1,104 @@
package shard package shard
import (
"time"
"github.com/CPunch/gopenfusion/core/entity"
"github.com/CPunch/gopenfusion/core/protocol"
)
func (server *ShardServer) updatePlayerPosition(plr *entity.Player, X, Y, Z, Angle int) error {
plr.X = X
plr.Y = Y
plr.Z = Z
plr.Angle = Angle
server.updateEntityChunk(plr, plr.GetChunk(), entity.MakeChunkPosition(X, Y))
return nil
}
func (server *ShardServer) playerMove(peer *protocol.CNPeer, pkt protocol.Packet) error {
var move protocol.SP_CL2FE_REQ_PC_MOVE
pkt.Decode(&move)
// sanity check
plr, err := server.getPlayer(peer)
if err != nil {
return err
}
// update chunking
if err := server.updatePlayerPosition(plr, int(move.IX), int(move.IY), int(move.IZ), int(move.IAngle)); err != nil {
return err
}
return server.sendOthersPacket(plr, protocol.P_FE2CL_PC_MOVE, protocol.SP_FE2CL_PC_MOVE{
ICliTime: uint64(time.Now().Unix()),
IX: move.IX,
IY: move.IY,
IZ: move.IZ,
FVX: move.FVX,
FVY: move.FVY,
FVZ: move.FVZ,
IAngle: move.IAngle,
CKeyValue: move.CKeyValue,
ISpeed: move.ISpeed,
IID: int32(plr.PlayerID),
ISvrTime: uint64(time.Now().Unix()),
})
}
func (server *ShardServer) playerStop(peer *protocol.CNPeer, pkt protocol.Packet) error {
var stop protocol.SP_CL2FE_REQ_PC_STOP
pkt.Decode(&stop)
// sanity check
plr, err := server.getPlayer(peer)
if err != nil {
return err
}
// update chunking
if err := server.updatePlayerPosition(plr, int(stop.IX), int(stop.IY), int(stop.IZ), plr.Angle); err != nil {
return err
}
return server.sendOthersPacket(plr, protocol.P_FE2CL_PC_STOP, protocol.SP_FE2CL_PC_STOP{
ICliTime: uint64(time.Now().Unix()),
IX: stop.IX,
IY: stop.IY,
IZ: stop.IZ,
IID: int32(plr.PlayerID),
ISvrTime: uint64(time.Now().Unix()),
})
}
func (server *ShardServer) playerJump(peer *protocol.CNPeer, pkt protocol.Packet) error {
var jump protocol.SP_CL2FE_REQ_PC_JUMP
pkt.Decode(&jump)
// sanity check
plr, err := server.getPlayer(peer)
if err != nil {
return err
}
// update chunking
if err := server.updatePlayerPosition(plr, int(jump.IX), int(jump.IY), int(jump.IZ), plr.Angle); err != nil {
return err
}
return server.sendOthersPacket(plr, protocol.P_FE2CL_PC_JUMP, protocol.SP_FE2CL_PC_JUMP{
ICliTime: uint64(time.Now().Unix()),
IX: jump.IX,
IY: jump.IY,
IZ: jump.IZ,
IVX: jump.IVX,
IVY: jump.IVY,
IVZ: jump.IVZ,
IAngle: jump.IAngle,
CKeyValue: jump.CKeyValue,
ISpeed: jump.ISpeed,
IID: int32(plr.PlayerID),
ISvrTime: uint64(time.Now().Unix()),
})
}

View File

@ -7,9 +7,10 @@ import (
"sync" "sync"
"github.com/CPunch/gopenfusion/config" "github.com/CPunch/gopenfusion/config"
"github.com/CPunch/gopenfusion/core"
"github.com/CPunch/gopenfusion/core/db" "github.com/CPunch/gopenfusion/core/db"
"github.com/CPunch/gopenfusion/core/entity"
"github.com/CPunch/gopenfusion/core/protocol" "github.com/CPunch/gopenfusion/core/protocol"
"github.com/CPunch/gopenfusion/core/protocol/pool"
"github.com/CPunch/gopenfusion/core/redis" "github.com/CPunch/gopenfusion/core/redis"
) )
@ -18,14 +19,15 @@ type PacketHandler func(peer *protocol.CNPeer, pkt protocol.Packet) error
func stubbedPacket(_ *protocol.CNPeer, _ protocol.Packet) error { /* stubbed */ return nil } func stubbedPacket(_ *protocol.CNPeer, _ protocol.Packet) error { /* stubbed */ return nil }
type ShardServer struct { type ShardServer struct {
listener net.Listener listener net.Listener
port int port int
dbHndlr *db.DBHandler dbHndlr *db.DBHandler
redisHndlr *redis.RedisHandler redisHndlr *redis.RedisHandler
packetHandlers map[uint32]PacketHandler eRecv chan *protocol.Event
loginMetadataQueue sync.Map // [int64]*LoginMetadata w/ int64 = serialKey packetHandlers map[uint32]PacketHandler
peersLock sync.Mutex peers map[*protocol.CNPeer]*entity.Player
peers sync.Map // [*protocol.CNPeer]core.Player chunks map[entity.ChunkPosition]*entity.Chunk
peerLock sync.Mutex
} }
func NewShardServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port int) (*ShardServer, error) { func NewShardServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port int) (*ShardServer, error) {
@ -40,11 +42,17 @@ func NewShardServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port
dbHndlr: dbHndlr, dbHndlr: dbHndlr,
redisHndlr: redisHndlr, redisHndlr: redisHndlr,
packetHandlers: make(map[uint32]PacketHandler), packetHandlers: make(map[uint32]PacketHandler),
peers: make(map[*protocol.CNPeer]*entity.Player),
chunks: make(map[entity.ChunkPosition]*entity.Chunk),
eRecv: make(chan *protocol.Event),
} }
server.packetHandlers = map[uint32]PacketHandler{ server.packetHandlers = map[uint32]PacketHandler{
protocol.P_CL2FE_REQ_PC_ENTER: server.RequestEnter, protocol.P_CL2FE_REQ_PC_ENTER: server.RequestEnter,
protocol.P_CL2FE_REQ_PC_LOADING_COMPLETE: server.LoadingComplete, protocol.P_CL2FE_REQ_PC_LOADING_COMPLETE: server.LoadingComplete,
protocol.P_CL2FE_REQ_PC_MOVE: server.playerMove,
protocol.P_CL2FE_REQ_PC_STOP: server.playerStop,
protocol.P_CL2FE_REQ_PC_JUMP: server.playerJump,
} }
redisHndlr.RegisterShard(redis.ShardMetadata{ redisHndlr.RegisterShard(redis.ShardMetadata{
@ -55,13 +63,28 @@ func NewShardServer(dbHndlr *db.DBHandler, redisHndlr *redis.RedisHandler, port
return server, nil return server, nil
} }
func (server *ShardServer) RegisterPacketHandler(typeID uint32, hndlr PacketHandler) { func (server *ShardServer) handleEvents() {
server.packetHandlers[typeID] = hndlr for {
select {
case event := <-server.eRecv:
switch event.Type {
case protocol.EVENT_CLIENT_DISCONNECT:
server.disconnect(event.Peer)
case protocol.EVENT_CLIENT_PACKET:
defer pool.Put(event.Pkt)
log.Printf("Received packet %x from %p\n", event.PktID, event.Peer)
if err := server.handlePacket(event.Peer, event.PktID, protocol.NewPacket(event.Pkt)); err != nil {
event.Peer.Kill()
}
}
}
}
} }
func (server *ShardServer) Start() { func (server *ShardServer) Start() {
log.Printf("Shard service hosted on %s:%d\n", config.GetAnnounceIP(), server.port) log.Printf("Shard service hosted on %s:%d\n", config.GetAnnounceIP(), server.port)
go server.handleEvents()
for { for {
conn, err := server.listener.Accept() conn, err := server.listener.Accept()
if err != nil { if err != nil {
@ -69,8 +92,8 @@ func (server *ShardServer) Start() {
return return
} }
client := protocol.NewCNPeer(server, conn) client := protocol.NewCNPeer(server.eRecv, conn)
server.Connect(client) server.connect(client)
go client.Handler() go client.Handler()
} }
} }
@ -79,7 +102,7 @@ func (server *ShardServer) GetPort() int {
return server.port return server.port
} }
func (server *ShardServer) HandlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error { func (server *ShardServer) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error {
if hndlr, ok := server.packetHandlers[typeID]; ok { if hndlr, ok := server.packetHandlers[typeID]; ok {
if err := hndlr(peer, pkt); err != nil { if err := hndlr(peer, pkt); err != nil {
return err return err
@ -91,65 +114,48 @@ func (server *ShardServer) HandlePacket(peer *protocol.CNPeer, typeID uint32, pk
return nil return nil
} }
func (server *ShardServer) Disconnect(peer *protocol.CNPeer) { func (server *ShardServer) disconnect(peer *protocol.CNPeer) {
server.peerLock.Lock()
defer server.peerLock.Unlock()
// remove from chunk(s)
plr, ok := server.peers[peer]
if ok {
log.Printf("Player %d (AccountID %d) disconnected\n", plr.PlayerID, plr.AccountID)
server.removeEntityFromChunks(server.getViewableChunks(plr.Chunk), plr)
server.getChunk(plr.Chunk).RemoveEntity(plr)
}
log.Printf("Peer %p disconnected from SHARD\n", peer) log.Printf("Peer %p disconnected from SHARD\n", peer)
server.peers.Delete(peer) delete(server.peers, peer)
} }
func (server *ShardServer) Connect(peer *protocol.CNPeer) { func (server *ShardServer) connect(peer *protocol.CNPeer) {
server.peerLock.Lock()
defer server.peerLock.Unlock()
log.Printf("New peer %p connected to SHARD\n", peer) log.Printf("New peer %p connected to SHARD\n", peer)
server.peers.Store(peer, nil) server.peers[peer] = nil
} }
func (server *ShardServer) LoadPlayer(peer *protocol.CNPeer) *core.Player { func (server *ShardServer) getPlayer(peer *protocol.CNPeer) (*entity.Player, error) {
val, ok := server.peers.Load(peer) plr, ok := server.peers[peer]
if !ok { if !ok {
return nil return nil, fmt.Errorf("Player not found")
} }
plr, ok := val.(*core.Player) return plr, nil
if !ok {
return nil
}
return plr
} }
// UpdatePlayer locks the peers map, and calls the provided callback. The returned new pointer will be stored, however if an error returns it will be passed back. func (server *ShardServer) setPlayer(peer *protocol.CNPeer, plr *entity.Player) {
// Since it is UNSAFE to write to the returned pointer from LoadPlayer, this wrapper is for the cases that state in the player struct needs to be updated. server.peers[peer] = plr
// TODO: maybe LoadPlayer should return a player by value instead?
// The pointers new and old may be the same if you are just updating struct fields.
func (server *ShardServer) UpdatePlayer(peer *protocol.CNPeer, f func(old *core.Player) (new *core.Player, err error)) error {
server.peersLock.Lock()
defer server.peersLock.Unlock()
// on fail, the player should not be stored
new, err := f(server.LoadPlayer(peer))
if err != nil {
return err
}
server.storePlayer(peer, new)
return nil
} }
func (server *ShardServer) storePlayer(peer *protocol.CNPeer, player *core.Player) { // If f returns false the iteration is stopped.
server.peers.Store(peer, player) func (server *ShardServer) rangePeers(f func(peer *protocol.CNPeer, plr *entity.Player) bool) {
} for peer, plr := range server.peers {
if f(peer, plr) {
// Simple wrapper for server.peers.Range, if f returns false the iteration is stopped. return
func (server *ShardServer) RangePeers(f func(peer *protocol.CNPeer, player *core.Player) bool) {
server.peers.Range(func(key, value any) bool { // simple wrapper to cast the datatypes
peer, ok := key.(*protocol.CNPeer)
if !ok { // this should never happen
panic(fmt.Errorf("ShardServer.peers has an invalid key: peers[%#v] = %#v", key, value))
} }
}
player, ok := value.(*core.Player)
if !ok { // this should also never happen
panic(fmt.Errorf("ShardServer.peers has an invalid value: peers[%#v] = %#v", key, value))
}
return f(peer, player)
})
} }