moved internal/protocol/cnpeer to cnpeer

also started a util package
This commit is contained in:
2023-12-01 15:29:19 -06:00
parent 0ed19ad6c5
commit e355af19ab
12 changed files with 93 additions and 77 deletions

View File

@@ -1,6 +1,6 @@
package entity
import "github.com/CPunch/gopenfusion/internal/protocol"
import "github.com/CPunch/gopenfusion/cnpeer"
type EntityKind int
@@ -20,6 +20,6 @@ type Entity interface {
SetPosition(x, y, z int)
SetAngle(angle int)
DisappearFromViewOf(peer *protocol.CNPeer)
EnterIntoViewOf(peer *protocol.CNPeer)
DisappearFromViewOf(peer *cnpeer.CNPeer)
EnterIntoViewOf(peer *cnpeer.CNPeer)
}

View File

@@ -3,6 +3,7 @@ package entity
import (
"sync/atomic"
"github.com/CPunch/gopenfusion/cnpeer"
"github.com/CPunch/gopenfusion/internal/protocol"
)
@@ -62,13 +63,13 @@ func (npc *NPC) SetAngle(angle int) {
npc.Angle = angle
}
func (npc *NPC) DisappearFromViewOf(peer *protocol.CNPeer) {
func (npc *NPC) DisappearFromViewOf(peer *cnpeer.CNPeer) {
peer.Send(protocol.P_FE2CL_NPC_EXIT, protocol.SP_FE2CL_NPC_EXIT{
INPC_ID: int32(npc.ID),
})
}
func (npc *NPC) EnterIntoViewOf(peer *protocol.CNPeer) {
func (npc *NPC) EnterIntoViewOf(peer *cnpeer.CNPeer) {
peer.Send(protocol.P_FE2CL_NPC_NEW, protocol.SP_FE2CL_NPC_NEW{
NPCAppearanceData: npc.GetAppearanceData(),
})

View File

@@ -1,17 +1,18 @@
package entity
import (
"github.com/CPunch/gopenfusion/cnpeer"
"github.com/CPunch/gopenfusion/internal/db"
"github.com/CPunch/gopenfusion/internal/protocol"
)
type Player struct {
db.Player
Peer *protocol.CNPeer
Peer *cnpeer.CNPeer
Chunk ChunkPosition
}
func NewPlayer(peer *protocol.CNPeer, player *db.Player) *Player {
func NewPlayer(peer *cnpeer.CNPeer, player *db.Player) *Player {
return &Player{
Player: *player,
Peer: peer,
@@ -51,13 +52,13 @@ func (plr *Player) SetAngle(angle int) {
plr.Angle = angle
}
func (plr *Player) DisappearFromViewOf(peer *protocol.CNPeer) {
func (plr *Player) DisappearFromViewOf(peer *cnpeer.CNPeer) {
peer.Send(protocol.P_FE2CL_PC_EXIT, protocol.SP_FE2CL_PC_EXIT{
IID: int32(plr.PlayerID),
})
}
func (plr *Player) EnterIntoViewOf(peer *protocol.CNPeer) {
func (plr *Player) EnterIntoViewOf(peer *cnpeer.CNPeer) {
peer.Send(protocol.P_FE2CL_PC_NEW, protocol.SP_FE2CL_PC_NEW{
PCAppearanceData: plr.GetAppearanceData(),
})

View File

@@ -1,168 +0,0 @@
package protocol
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"net"
"sync/atomic"
"time"
)
const (
USE_E = iota
USE_FE
)
type PacketEvent struct {
Type int
Pkt *bytes.Buffer
PktID uint32
}
// CNPeer is a simple wrapper for net.Conn connections to send/recv packets over the Fusionfall packet protocol.
type CNPeer struct {
uData interface{}
conn net.Conn
ctx context.Context
whichKey int
alive *atomic.Bool
// May not be set while Send() or Handler() are concurrently running.
E_key []byte
// May not be set while Send() or Handler() are concurrently running.
FE_key []byte
}
func GetTime() uint64 {
return uint64(time.Now().UnixMilli())
}
func NewCNPeer(ctx context.Context, conn net.Conn) *CNPeer {
p := &CNPeer{
conn: conn,
ctx: ctx,
whichKey: USE_E,
alive: &atomic.Bool{},
E_key: []byte(DEFAULT_KEY),
FE_key: nil,
}
return p
}
func (peer *CNPeer) SetUserData(uData interface{}) {
peer.uData = uData
}
func (peer *CNPeer) UserData() interface{} {
return peer.uData
}
func (peer *CNPeer) Send(typeID uint32, data ...interface{}) error {
// grab buffer from pool
buf := GetBuffer()
defer PutBuffer(buf)
// allocate space for packet size
buf.Write(make([]byte, 4))
// body start
pkt := NewPacket(buf)
// encode type id
if err := pkt.Encode(typeID); err != nil {
return err
}
// encode data
for _, trailer := range data {
if err := pkt.Encode(trailer); err != nil {
return err
}
}
// prepend the packet size
binary.LittleEndian.PutUint32(buf.Bytes()[:4], uint32(buf.Len()-4))
// encrypt body
var key []byte
switch peer.whichKey {
case USE_E:
key = peer.E_key
case USE_FE:
key = peer.FE_key
}
EncryptData(buf.Bytes()[4:], key)
// send full packet
// log.Printf("Sending %#v, sizeof: %d, buffer: %v", data, buf.Len(), buf.Bytes())
if _, err := peer.conn.Write(buf.Bytes()); err != nil {
return fmt.Errorf("failed to write packet body! %v", err)
}
return nil
}
func (peer *CNPeer) SetActiveKey(whichKey int) {
peer.whichKey = whichKey
}
func (peer *CNPeer) Kill() {
// de-bounce: only kill if alive
if !peer.alive.CompareAndSwap(true, false) {
return
}
peer.conn.Close()
}
// meant to be invoked as a goroutine
func (peer *CNPeer) Handler(eRecv chan<- *PacketEvent) error {
defer func() {
close(eRecv)
peer.Kill()
}()
peer.alive.Store(true)
for {
select {
case <-peer.ctx.Done():
return nil
default:
// read packet size, the goroutine spends most of it's time parked here
var sz uint32
if err := binary.Read(peer.conn, binary.LittleEndian, &sz); err != nil {
return err
}
// client should never send a packet size outside of this range
if sz > CN_PACKET_BUFFER_SIZE || sz < 4 {
return fmt.Errorf("invalid packet size: %d", sz)
}
// grab buffer && read packet body
buf := GetBuffer()
if _, err := buf.ReadFrom(io.LimitReader(peer.conn, int64(sz))); err != nil {
return fmt.Errorf("failed to read packet body: %v", err)
}
// decrypt
DecryptData(buf.Bytes(), peer.E_key)
pkt := NewPacket(buf)
// create packet && read pktID
var pktID uint32
if err := pkt.Decode(&pktID); err != nil {
return fmt.Errorf("failed to read packet type! %v", err)
}
// dispatch packet
// log.Printf("Got packet ID: %x, with a sizeof: %d\n", pktID, sz)
eRecv <- &PacketEvent{Pkt: buf, PktID: pktID}
}
}
}

View File

@@ -10,13 +10,14 @@ import (
"strconv"
"sync"
"github.com/CPunch/gopenfusion/cnpeer"
"github.com/CPunch/gopenfusion/config"
"github.com/CPunch/gopenfusion/internal/protocol"
)
type PacketHandler func(peer *protocol.CNPeer, pkt protocol.Packet) error
type PacketHandler func(peer *cnpeer.CNPeer, pkt protocol.Packet) error
func StubbedPacket(_ *protocol.CNPeer, _ protocol.Packet) error {
func StubbedPacket(_ *cnpeer.CNPeer, _ protocol.Packet) error {
return nil
}
@@ -28,18 +29,18 @@ type Service struct {
started chan struct{}
stopped chan struct{}
packetHandlers map[uint32]PacketHandler
peers map[chan *protocol.PacketEvent]*protocol.CNPeer
peers map[chan *cnpeer.PacketEvent]*cnpeer.CNPeer
stateLock sync.Mutex
// OnDisconnect is called when a peer disconnects from the service.
// uData is the stored value of the key/value pair in the peer map.
// It may not be set while the service is running. (eg. srvc.Start() has been called)
OnDisconnect func(peer *protocol.CNPeer)
OnDisconnect func(peer *cnpeer.CNPeer)
// OnConnect is called when a peer connects to the service.
// return value is used as the value in the peer map.
// It may not be set while the service is running. (eg. srvc.Start() has been called)
OnConnect func(peer *protocol.CNPeer)
OnConnect func(peer *cnpeer.CNPeer)
}
func RandomPort() (int, error) {
@@ -69,7 +70,7 @@ func NewService(ctx context.Context, name string, port int) *Service {
func (srvc *Service) Reset(ctx context.Context) {
srvc.ctx = ctx
srvc.packetHandlers = make(map[uint32]PacketHandler)
srvc.peers = make(map[chan *protocol.PacketEvent]*protocol.CNPeer)
srvc.peers = make(map[chan *cnpeer.PacketEvent]*cnpeer.CNPeer)
srvc.started = make(chan struct{})
srvc.stopped = make(chan struct{})
}
@@ -80,8 +81,8 @@ func (srvc *Service) AddPacketHandler(pktID uint32, handler PacketHandler) {
}
type newPeerConnection struct {
peer *protocol.CNPeer
channel chan *protocol.PacketEvent
peer *cnpeer.CNPeer
channel chan *cnpeer.PacketEvent
}
func (srvc *Service) Start() error {
@@ -112,22 +113,22 @@ func (srvc *Service) Start() error {
}
// create a new peer and pass it to the event loop
peer := protocol.NewCNPeer(srvc.ctx, conn)
eRecv := make(chan *protocol.PacketEvent)
peer := cnpeer.NewCNPeer(srvc.ctx, conn)
eRecv := make(chan *cnpeer.PacketEvent)
peerConnections <- newPeerConnection{channel: eRecv, peer: peer}
go peer.Handler(eRecv)
}
}
func (srvc *Service) getPeer(channel chan *protocol.PacketEvent) *protocol.CNPeer {
func (srvc *Service) getPeer(channel chan *cnpeer.PacketEvent) *cnpeer.CNPeer {
return srvc.peers[channel]
}
func (srvc *Service) setPeer(channel chan *protocol.PacketEvent, peer *protocol.CNPeer) {
func (srvc *Service) setPeer(channel chan *cnpeer.PacketEvent, peer *cnpeer.CNPeer) {
srvc.peers[channel] = peer
}
func (srvc *Service) removePeer(channel chan *protocol.PacketEvent) {
func (srvc *Service) removePeer(channel chan *cnpeer.PacketEvent) {
delete(srvc.peers, channel)
}
@@ -147,7 +148,7 @@ func (srvc *Service) Stopped() <-chan struct{} {
// if f returns false, the iteration is stopped.
// NOTE: the peer map is not locked while iterating, if you're calling this
// outside of the service's event loop, you'll need to lock the peer map yourself.
func (srvc *Service) RangePeers(f func(peer *protocol.CNPeer) bool) {
func (srvc *Service) RangePeers(f func(peer *cnpeer.CNPeer) bool) {
for _, peer := range srvc.peers {
if !f(peer) {
break
@@ -167,7 +168,7 @@ func (srvc *Service) Unlock() {
func (srvc *Service) stop() {
// OnDisconnect handler might need to do something important
srvc.RangePeers(func(peer *protocol.CNPeer) bool {
srvc.RangePeers(func(peer *cnpeer.CNPeer) bool {
peer.Kill()
if srvc.OnDisconnect != nil {
srvc.OnDisconnect(peer)
@@ -196,7 +197,7 @@ func (srvc *Service) handleEvents(peerPipe <-chan newPeerConnection) {
Chan: reflect.ValueOf(peerPipe),
})
addPoll := func(channel chan *protocol.PacketEvent) {
addPoll := func(channel chan *cnpeer.PacketEvent) {
poll = append(poll, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(channel),
@@ -221,7 +222,7 @@ func (srvc *Service) handleEvents(peerPipe <-chan newPeerConnection) {
addPoll(evnt.channel)
srvc.connect(evnt.channel, evnt.peer)
default: // peer event
channel := poll[chosen].Chan.Interface().(chan *protocol.PacketEvent)
channel := poll[chosen].Chan.Interface().(chan *cnpeer.PacketEvent)
peer := srvc.getPeer(channel)
if peer == nil {
log.Printf("Unknown peer event: %v", value)
@@ -229,7 +230,7 @@ func (srvc *Service) handleEvents(peerPipe <-chan newPeerConnection) {
continue
}
evnt, ok := value.Interface().(*protocol.PacketEvent)
evnt, ok := value.Interface().(*cnpeer.PacketEvent)
if !recvOK || !ok || evnt == nil {
// peer disconnected, remove it from our poll queue
removePoll(chosen)
@@ -250,7 +251,7 @@ func (srvc *Service) handleEvents(peerPipe <-chan newPeerConnection) {
}
}
func (srvc *Service) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt protocol.Packet) error {
func (srvc *Service) handlePacket(peer *cnpeer.CNPeer, typeID uint32, pkt protocol.Packet) error {
if hndlr, ok := srvc.packetHandlers[typeID]; ok {
// fmt.Printf("Handling packet %x\n", typeID)
if err := hndlr(peer, pkt); err != nil {
@@ -263,7 +264,7 @@ func (srvc *Service) handlePacket(peer *protocol.CNPeer, typeID uint32, pkt prot
return nil
}
func (srvc *Service) disconnect(channel chan *protocol.PacketEvent, peer *protocol.CNPeer) {
func (srvc *Service) disconnect(channel chan *cnpeer.PacketEvent, peer *cnpeer.CNPeer) {
log.Printf("Peer %p disconnected from %s\n", peer, srvc.Name)
if srvc.OnDisconnect != nil {
srvc.OnDisconnect(peer)
@@ -272,7 +273,7 @@ func (srvc *Service) disconnect(channel chan *protocol.PacketEvent, peer *protoc
srvc.removePeer(channel)
}
func (srvc *Service) connect(channel chan *protocol.PacketEvent, peer *protocol.CNPeer) {
func (srvc *Service) connect(channel chan *cnpeer.PacketEvent, peer *cnpeer.CNPeer) {
log.Printf("New peer %p connected to %s\n", peer, srvc.Name)
if srvc.OnConnect != nil {
srvc.OnConnect(peer)

View File

@@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/CPunch/gopenfusion/cnpeer"
"github.com/CPunch/gopenfusion/internal/protocol"
"github.com/CPunch/gopenfusion/internal/service"
"github.com/matryer/is"
@@ -67,7 +68,7 @@ func TestService(t *testing.T) {
// our dummy packet handler
wg.Add(maxDummyPeers)
srvc.AddPacketHandler(0x1234, func(peer *protocol.CNPeer, pkt protocol.Packet) error {
srvc.AddPacketHandler(0x1234, func(peer *cnpeer.CNPeer, pkt protocol.Packet) error {
log.Printf("Received packet %#v", pkt)
wg.Done()
return nil
@@ -75,12 +76,12 @@ func TestService(t *testing.T) {
// wait for all dummy peers to connect and disconnect
wg.Add(maxDummyPeers)
srvc.OnConnect = func(peer *protocol.CNPeer) {
srvc.OnConnect = func(peer *cnpeer.CNPeer) {
wg.Done()
}
wg.Add(maxDummyPeers)
srvc.OnDisconnect = func(peer *protocol.CNPeer) {
srvc.OnDisconnect = func(peer *cnpeer.CNPeer) {
wg.Done()
}
@@ -95,7 +96,7 @@ func TestService(t *testing.T) {
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", srvcPort))
is.NoErr(err) // net.Dial error
peer := protocol.NewCNPeer(ctx, conn)
peer := cnpeer.NewCNPeer(ctx, conn)
go func() {
defer peer.Kill()
@@ -106,7 +107,7 @@ func TestService(t *testing.T) {
}()
// we wait until Handler gracefully exits (peer was killed)
peer.Handler(make(chan *protocol.PacketEvent))
peer.Handler(make(chan *cnpeer.PacketEvent))
wg.Done()
}()
}