mirror of
https://github.com/CPunch/gopenfusion.git
synced 2025-12-28 03:50:03 +00:00
Switched to redis/postgres, major refactoring
- loginMetadata is passed to shards through redis now - shards announce they're alive via redis.AnnounceShard() which just populates a hashset keyed 'shards' - login servers grab the 'shards' hashset and randomly picks a shard to pass the player to (for now) - ./service shard && ./service login - Many new environment variables, check config/config.go for more info. or for a tl;dr just read the Dockerfile for the required ones - Shard and login services now run in different processes ! (and containers?? wooaaah)
This commit is contained in:
@@ -2,24 +2,25 @@ package db
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
"github.com/CPunch/gopenfusion/core/protocol"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/georgysavva/scany/v2/sqlscan"
|
||||
)
|
||||
|
||||
type Account struct {
|
||||
AccountID int
|
||||
Login string
|
||||
Password string
|
||||
Selected int
|
||||
AccountLevel int
|
||||
Created int
|
||||
LastLogin int
|
||||
BannedUntil int
|
||||
BannedSince int
|
||||
BanReason string
|
||||
AccountID int `db:"accountid"`
|
||||
Login string `db:"login"`
|
||||
Password string `db:"password"`
|
||||
Selected int `db:"selected"`
|
||||
AccountLevel int `db:"accountlevel"`
|
||||
Created int `db:"created"`
|
||||
LastLogin int `db:"lastlogin"`
|
||||
BannedUntil int `db:"banneduntil"`
|
||||
BannedSince int `db:"bannedsince"`
|
||||
BanReason string `db:"banreason"`
|
||||
}
|
||||
|
||||
func (db *DBHandler) NewAccount(Login, Password string) (*Account, error) {
|
||||
@@ -28,13 +29,14 @@ func (db *DBHandler) NewAccount(Login, Password string) (*Account, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
row, err := db.Query("INSERT INTO Accounts (Login, Password, AccountLevel) VALUES(?, ?, ?) RETURNING *", Login, hash, protocol.CN_ACCOUNT_LEVEL__USER)
|
||||
row, err := db.Query("INSERT INTO Accounts (Login, Password, AccountLevel) VALUES($1, $2, $3) RETURNING *", Login, hash, protocol.CN_ACCOUNT_LEVEL__USER)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var account Account
|
||||
if err := scan.Row(&account, row); err != nil {
|
||||
row.Next()
|
||||
if err := sqlscan.ScanRow(&account, row); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -47,13 +49,15 @@ var (
|
||||
)
|
||||
|
||||
func (db *DBHandler) TryLogin(Login, Password string) (*Account, error) {
|
||||
row, err := db.Query("SELECT * FROM Accounts WHERE Login=?", Login)
|
||||
row, err := db.Query("SELECT * FROM Accounts WHERE Login=$1", Login)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var account Account
|
||||
if err := scan.Row(&account, row); err != nil {
|
||||
row.Next()
|
||||
if err := sqlscan.ScanRow(&account, row); err != nil {
|
||||
log.Printf("Error scanning row: %v", err)
|
||||
return nil, LoginErrorInvalidID
|
||||
}
|
||||
|
||||
|
||||
@@ -7,17 +7,17 @@ import (
|
||||
)
|
||||
|
||||
type Inventory struct {
|
||||
PlayerID int
|
||||
Slot int
|
||||
ID int
|
||||
Type int
|
||||
Opt int
|
||||
TimeLimit int
|
||||
PlayerID int `db:"playerid"`
|
||||
Slot int `db:"slot"`
|
||||
ID int `db:"id"`
|
||||
Type int `db:"type"`
|
||||
Opt int `db:"opt"`
|
||||
TimeLimit int `db:"timelimit"`
|
||||
}
|
||||
|
||||
// start && end are both inclusive
|
||||
func (db *DBHandler) GetPlayerInventorySlots(PlayerID int, start int, end int) ([]protocol.SItemBase, error) {
|
||||
rows, err := db.Query("SELECT Slot, Type, ID, Opt, TimeLimit FROM Inventory WHERE Slot BETWEEN ? AND ? AND PlayerID = ?", start, end, PlayerID)
|
||||
rows, err := db.Query("SELECT Slot, Type, ID, Opt, TimeLimit FROM Inventory WHERE Slot BETWEEN $1 AND $2 AND PlayerID = $3", start, end, PlayerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func (db *DBHandler) GetPlayerInventorySlots(PlayerID int, start int, end int) (
|
||||
func (db *DBHandler) SetPlayerInventorySlots(PlayerID int, start int, items []protocol.SItemBase) error {
|
||||
return db.Transaction(func(tx *sql.Tx) error {
|
||||
// delete inventory slots
|
||||
_, err := db.Exec("DELETE FROM Inventory WHERE Slot BETWEEN ? AND ? AND PlayerID = ?", start, start+len(items)-1, PlayerID)
|
||||
_, err := db.Exec("DELETE FROM Inventory WHERE Slot BETWEEN $1 AND $2 AND PlayerID = $3", start, start+len(items)-1, PlayerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -50,7 +50,7 @@ func (db *DBHandler) SetPlayerInventorySlots(PlayerID int, start int, items []pr
|
||||
// insert inventory
|
||||
for i, item := range items {
|
||||
if item.IID != 0 {
|
||||
_, err := db.Exec("INSERT INTO Inventory (PlayerID, Slot, ID, Type, Opt, TimeLimit) VALUES (?, ?, ?, ?, ?, ?)", PlayerID, start+i, item.IID, item.IType, item.IOpt, item.ITimeLimit)
|
||||
_, err := db.Exec("INSERT INTO Inventory (PlayerID, Slot, ID, Type, Opt, TimeLimit) VALUES ($1, $2, $3, $4, $5, $6)", PlayerID, start+i, item.IID, item.IType, item.IOpt, item.ITimeLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,29 +1,31 @@
|
||||
-- this file has been lifted from https://github.com/OpenFusionProject/OpenFusion/blob/master/sql/tables.sql
|
||||
-- this file has been lifted from https://github.com/OpenFusionProject/OpenFusion/bytea/master/sql/tables.sql
|
||||
-- all credit to original contributors!
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS citext;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS Accounts (
|
||||
AccountID INTEGER NOT NULL,
|
||||
Login TEXT NOT NULL UNIQUE COLLATE NOCASE,
|
||||
AccountID SERIAL NOT NULL,
|
||||
Login CITEXT NOT NULL UNIQUE,
|
||||
Password TEXT NOT NULL,
|
||||
Selected INTEGER DEFAULT 1 NOT NULL,
|
||||
AccountLevel INTEGER NOT NULL,
|
||||
Created INTEGER DEFAULT (strftime('%s', 'now')) NOT NULL,
|
||||
LastLogin INTEGER DEFAULT (strftime('%s', 'now')) NOT NULL,
|
||||
Created INTEGER DEFAULT (extract(EPOCH FROM current_time)) NOT NULL,
|
||||
LastLogin INTEGER DEFAULT (extract(EPOCH FROM current_time)) NOT NULL,
|
||||
BannedUntil INTEGER DEFAULT 0 NOT NULL,
|
||||
BannedSince INTEGER DEFAULT 0 NOT NULL,
|
||||
BanReason TEXT DEFAULT '' NOT NULL,
|
||||
PRIMARY KEY(AccountID AUTOINCREMENT)
|
||||
PRIMARY KEY(AccountID)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS Players (
|
||||
PlayerID INTEGER NOT NULL,
|
||||
PlayerID SERIAL NOT NULL,
|
||||
AccountID INTEGER NOT NULL,
|
||||
FirstName TEXT NOT NULL COLLATE NOCASE,
|
||||
LastName TEXT NOT NULL COLLATE NOCASE,
|
||||
FirstName CITEXT NOT NULL,
|
||||
LastName CITEXT NOT NULL,
|
||||
NameCheck INTEGER NOT NULL,
|
||||
Slot INTEGER NOT NULL,
|
||||
Created INTEGER DEFAULT (strftime('%s', 'now')) NOT NULL,
|
||||
LastLogin INTEGER DEFAULT (strftime('%s', 'now')) NOT NULL,
|
||||
Created INTEGER DEFAULT (extract(EPOCH FROM current_time)) NOT NULL,
|
||||
LastLogin INTEGER DEFAULT (extract(EPOCH FROM current_time)) NOT NULL,
|
||||
Level INTEGER DEFAULT 1 NOT NULL,
|
||||
Nano1 INTEGER DEFAULT 0 NOT NULL,
|
||||
Nano2 INTEGER DEFAULT 0 NOT NULL,
|
||||
@@ -43,10 +45,10 @@ CREATE TABLE IF NOT EXISTS Players (
|
||||
Mentor INTEGER DEFAULT 5 NOT NULL,
|
||||
CurrentMissionID INTEGER DEFAULT 0 NOT NULL,
|
||||
WarpLocationFlag INTEGER DEFAULT 0 NOT NULL,
|
||||
SkywayLocationFlag BLOB NOT NULL,
|
||||
FirstUseFlag BLOB NOT NULL,
|
||||
Quests BLOB NOT NULL,
|
||||
PRIMARY KEY(PlayerID AUTOINCREMENT),
|
||||
SkywayLocationFlag bytea NOT NULL,
|
||||
FirstUseFlag bytea NOT NULL,
|
||||
Quests bytea NOT NULL,
|
||||
PRIMARY KEY(PlayerID),
|
||||
FOREIGN KEY(AccountID) REFERENCES Accounts(AccountID) ON DELETE CASCADE,
|
||||
UNIQUE (AccountID, Slot),
|
||||
UNIQUE (FirstName, LastName)
|
||||
@@ -123,8 +125,8 @@ CREATE TABLE IF NOT EXISTS EmailData (
|
||||
ReadFlag INTEGER NOT NULL,
|
||||
ItemFlag INTEGER NOT NULL,
|
||||
SenderID INTEGER NOT NULL,
|
||||
SenderFirstName TEXT NOT NULL COLLATE NOCASE,
|
||||
SenderLastName TEXT NOT NULL COLLATE NOCASE,
|
||||
SenderFirstName CITEXT NOT NULL,
|
||||
SenderLastName CITEXT NOT NULL,
|
||||
SubjectLine TEXT NOT NULL,
|
||||
MsgBody TEXT NOT NULL,
|
||||
Taros INTEGER NOT NULL,
|
||||
|
||||
@@ -19,19 +19,19 @@ func (db *DBHandler) NewPlayer(AccountID int, FirstName, LastName string, slot i
|
||||
var PlayerID int
|
||||
if err := db.Transaction(func(tx *sql.Tx) error {
|
||||
// create player
|
||||
row, err := tx.Query(
|
||||
"INSERT INTO Players (AccountID, Slot, FirstName, LastName, XCoordinate, YCoordinate, ZCoordinate, Angle, HP, NameCheck, Quests, SkywayLocationFlag, FirstUseFlag) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING PlayerID",
|
||||
rows, err := tx.Query(
|
||||
"INSERT INTO Players (AccountID, Slot, FirstName, LastName, XCoordinate, YCoordinate, ZCoordinate, Angle, HP, NameCheck, Quests, SkywayLocationFlag, FirstUseFlag) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING PlayerID",
|
||||
AccountID, slot, FirstName, LastName, config.SPAWN_X, config.SPAWN_Y, config.SPAWN_Z, 0, config.GetMaxHP(1), nameCheck, QuestFlag, SkywayLocationFlag, FirstUseFlag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := scan.Row(&PlayerID, row); err != nil {
|
||||
if err := scan.Row(&PlayerID, rows); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create appearance
|
||||
if _, err := tx.Exec("INSERT INTO Appearances (PlayerID) VALUES (?)", PlayerID); err != nil {
|
||||
if _, err := tx.Exec("INSERT INTO Appearances (PlayerID) VALUES ($1)", PlayerID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -47,13 +47,13 @@ func (db *DBHandler) NewPlayer(AccountID int, FirstName, LastName string, slot i
|
||||
func (db *DBHandler) FinishPlayer(character *protocol.SP_CL2LS_REQ_CHAR_CREATE, AccountId int) error {
|
||||
return db.Transaction(func(tx *sql.Tx) error {
|
||||
// update AppearanceFlag
|
||||
_, err := tx.Exec("UPDATE Players SET AppearanceFlag = 1 WHERE PlayerID = ? AND AccountID = ? AND AppearanceFlag = 0", character.PCStyle.IPC_UID, AccountId)
|
||||
_, err := tx.Exec("UPDATE Players SET AppearanceFlag = 1 WHERE PlayerID = $1 AND AccountID = $2 AND AppearanceFlag = 0", character.PCStyle.IPC_UID, AccountId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update Appearance
|
||||
_, err = tx.Exec("UPDATE Appearances SET Body = ?, EyeColor = ?, FaceStyle = ?, Gender = ?, HairColor = ?, HairStyle = ?, Height = ?, SkinColor = ? WHERE PlayerID = ?",
|
||||
_, err = tx.Exec("UPDATE Appearances SET Body = $1, EyeColor = $2, FaceStyle = $3, Gender = $4, HairColor = $5, HairStyle = $6, Height = $7, SkinColor = $8 WHERE PlayerID = $9",
|
||||
character.PCStyle.IBody,
|
||||
character.PCStyle.IEyeColor,
|
||||
character.PCStyle.IFaceStyle,
|
||||
@@ -70,7 +70,7 @@ func (db *DBHandler) FinishPlayer(character *protocol.SP_CL2LS_REQ_CHAR_CREATE,
|
||||
// update Inventory
|
||||
items := [3]int16{character.SOn_Item.IEquipUBID, character.SOn_Item.IEquipLBID, character.SOn_Item.IEquipFootID}
|
||||
for i := 0; i < len(items); i++ {
|
||||
_, err = tx.Exec("INSERT INTO Inventory (PlayerID, Slot, ID, Type, Opt) VALUES (?, ?, ?, ?, 1)", character.PCStyle.IPC_UID, i, items[i], i+1)
|
||||
_, err = tx.Exec("INSERT INTO Inventory (PlayerID, Slot, ID, Type, Opt) VALUES ($1, $2, $3, $4, 1)", character.PCStyle.IPC_UID, i, items[i], i+1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -81,7 +81,7 @@ func (db *DBHandler) FinishPlayer(character *protocol.SP_CL2LS_REQ_CHAR_CREATE,
|
||||
}
|
||||
|
||||
func (db *DBHandler) FinishTutorial(PlayerID, AccountID int) error {
|
||||
_, err := db.Exec("UPDATE Players SET TutorialFlag = 1 WHERE PlayerID = ? AND AccountID = ? AND TutorialFlag = 0", PlayerID, AccountID)
|
||||
_, err := db.Exec("UPDATE Players SET TutorialFlag = 1 WHERE PlayerID = $1 AND AccountID = $2 AND TutorialFlag = 0", PlayerID, AccountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -92,16 +92,14 @@ func (db *DBHandler) FinishTutorial(PlayerID, AccountID int) error {
|
||||
|
||||
// returns the deleted Slot number
|
||||
func (db *DBHandler) DeletePlayer(PlayerID, AccountID int) (int, error) {
|
||||
row, err := db.Query("DELETE FROM Players WHERE AccountID = ? AND PlayerID = ? RETURNING Slot")
|
||||
row, err := db.Query("DELETE FROM Players WHERE AccountID = $1 AND PlayerID = $2 RETURNING Slot")
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
var slot int
|
||||
for row.Next() {
|
||||
if err := row.Scan(&slot); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
if err := row.Scan(&slot); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return slot, nil
|
||||
@@ -165,7 +163,7 @@ func (db *DBHandler) readPlayer(rows *sql.Rows) (*core.Player, error) {
|
||||
}
|
||||
|
||||
func (db *DBHandler) GetPlayer(PlayerID int) (*core.Player, error) {
|
||||
rows, err := db.Query(QUERY_PLAYERS+"WHERE p.PlayerID = ?", PlayerID)
|
||||
rows, err := db.Query(QUERY_PLAYERS+"WHERE p.PlayerID = $1", PlayerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -182,7 +180,7 @@ func (db *DBHandler) GetPlayer(PlayerID int) (*core.Player, error) {
|
||||
}
|
||||
|
||||
func (db *DBHandler) GetPlayers(AccountID int) ([]core.Player, error) {
|
||||
rows, err := db.Query(QUERY_PLAYERS+"WHERE p.AccountID = ?", AccountID)
|
||||
rows, err := db.Query(QUERY_PLAYERS+"WHERE p.AccountID = $1", AccountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -10,7 +10,8 @@ import (
|
||||
_ "embed"
|
||||
"fmt"
|
||||
|
||||
_ "github.com/glebarez/go-sqlite"
|
||||
"github.com/CPunch/gopenfusion/config"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
type DBHandler struct {
|
||||
@@ -20,10 +21,10 @@ type DBHandler struct {
|
||||
//go:embed migrations/new.sql
|
||||
var createDBQuery string
|
||||
|
||||
func OpenLiteDB(dbPath string) (*DBHandler, error) {
|
||||
sqliteFmt := fmt.Sprintf("%s", dbPath)
|
||||
func OpenPostgresDB(dbAddr string) (*DBHandler, error) {
|
||||
fmt := fmt.Sprintf("postgresql://%s:%s@%s/%s?sslmode=disable", config.GetDBUser(), config.GetDBPass(), dbAddr, config.GetDBName())
|
||||
|
||||
db, err := sql.Open("sqlite", sqliteFmt)
|
||||
db, err := sql.Open("postgres", fmt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
112
core/redis/redis.go
Normal file
112
core/redis/redis.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package redis
|
||||
|
||||
/*
|
||||
used for state management between shard and login servers
|
||||
*/
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/CPunch/gopenfusion/config"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type RedisHandler struct {
|
||||
client *redis.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
type LoginMetadata struct {
|
||||
FEKey []byte `json:",omitempty"`
|
||||
PlayerID int32 `json:",omitempty"`
|
||||
}
|
||||
|
||||
type ShardMetadata struct {
|
||||
IP string
|
||||
Port int
|
||||
}
|
||||
|
||||
const (
|
||||
SHARD_SET = "shards"
|
||||
)
|
||||
|
||||
func OpenRedis(addr string) (*RedisHandler, error) {
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: addr,
|
||||
CredentialsProvider: config.GetRedisCredentials(),
|
||||
})
|
||||
|
||||
_, err := client.Ping(context.Background()).Result()
|
||||
return &RedisHandler{client: client, ctx: context.Background()}, err
|
||||
}
|
||||
|
||||
func (r *RedisHandler) Close() error {
|
||||
return r.client.Close()
|
||||
}
|
||||
|
||||
// we store login queues into redis with the name "loginMetadata_<serialKey>"
|
||||
// set to expire after config.LOGIN_TIMEOUT duration. this way we can easily
|
||||
// have a shared pool of active serial keys & player login data which any
|
||||
// shard can pull from
|
||||
|
||||
func makeLoginMetadataKey(serialKey int64) string {
|
||||
return fmt.Sprintf("loginMetadata_%s", strconv.Itoa(int(serialKey)))
|
||||
}
|
||||
|
||||
func (r *RedisHandler) QueueLogin(serialKey int64, data LoginMetadata) error {
|
||||
value, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add to table
|
||||
return r.client.Set(r.ctx, makeLoginMetadataKey(serialKey), value, config.LOGIN_TIMEOUT).Err()
|
||||
}
|
||||
|
||||
func (r *RedisHandler) GetLogin(serialKey int64) (LoginMetadata, error) {
|
||||
value, err := r.client.Get(r.ctx, makeLoginMetadataKey(serialKey)).Result()
|
||||
if err != nil {
|
||||
return LoginMetadata{}, err
|
||||
}
|
||||
|
||||
var data LoginMetadata
|
||||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||||
return LoginMetadata{}, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (r *RedisHandler) RemoveLogin(serialKey int64) error {
|
||||
return r.client.Del(r.ctx, makeLoginMetadataKey(serialKey)).Err()
|
||||
}
|
||||
|
||||
func (r *RedisHandler) RegisterShard(shard ShardMetadata) error {
|
||||
value, err := json.Marshal(shard)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.client.SAdd(r.ctx, SHARD_SET, value).Err()
|
||||
}
|
||||
|
||||
func (r *RedisHandler) GetShards() []ShardMetadata {
|
||||
shardData := r.client.SMembers(r.ctx, SHARD_SET).Val()
|
||||
|
||||
// unmarshal all shards
|
||||
shards := make([]ShardMetadata, 0, len(shardData))
|
||||
for _, data := range shardData {
|
||||
var shard ShardMetadata
|
||||
if err := json.Unmarshal([]byte(data), &shard); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
shards = append(shards, shard)
|
||||
}
|
||||
|
||||
return shards
|
||||
}
|
||||
Reference in New Issue
Block a user