frp/server/control.go
OoyonghongoO 84d34c8298 UPLOAD
2024-10-05 10:25:11 +08:00

612 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"fmt"
"net"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
pkgerr "github.com/fatedier/frp/pkg/errors"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
"github.com/fatedier/frp/server/proxy"
"github.com/fatedier/golib/control/shutdown"
"github.com/samber/lo"
"github.com/fatedier/frp/extend/api"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunID map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunID: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runID string, ctl *Control) (old *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
var ok bool
old, ok = cm.ctlsByRunID[runID]
if ok {
old.Replaced(ctl)
}
cm.ctlsByRunID[runID] = ctl
return
}
// we should make sure if it's the same control to prevent delete a new one
func (cm *ControlManager) Del(runID string, ctl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
if c, ok := cm.ctlsByRunID[runID]; ok && c == ctl {
delete(cm.ctlsByRunID, runID)
}
}
func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunID[runID]
return
}
func (cm *ControlManager) SearchByID(runId string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
for k, v := range cm.ctlsByRunID {
if strings.IndexAny(k, runId+"-") > -1 {
if v == nil {
return
}
ctl, ok = cm.ctlsByRunID[k]
}
}
return
}
func (cm *ControlManager) Close() error {
cm.mu.Lock()
defer cm.mu.Unlock()
for _, ctl := range cm.ctlsByRunID {
ctl.Close()
}
cm.ctlsByRunID = make(map[string]*Control)
return nil
}
type Control struct {
// all resource managers and controllers
rc *controller.ResourceController
// proxy manager
pxyManager *proxy.Manager
// plugin manager
pluginManager *plugin.Manager
// verifies authentication based on selected method
authVerifier auth.Verifier
// other components can use this to communicate with client
msgTransporter transport.MessageTransporter
// msgDispatcher is a wrapper for control connection.
// It provides a channel for sending messages, and you can register handlers to process messages based on their respective types.
msgDispatcher *msg.Dispatcher
// login message
loginMsg *msg.Login
// control connection
conn net.Conn
// work connections
workConnCh chan net.Conn
// proxies in one client
proxies map[string]proxy.Proxy
// pool count
poolCount int
// ports used, for limitations
portsUsedNum int
// last time got the Ping message
lastPing atomic.Value
// A new run id will be generated when a new client login.
// If run id got from login message has same run id, it means it's the same client, so we can
// replace old controller instantly.
runID string
// control statusAPI
status string
readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown
managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown
mu sync.RWMutex
inLimit uint64
outLimit uint64
// Server configuration information
serverCfg *v1.ServerConfig
xl *xlog.Logger
ctx context.Context
doneCh chan struct{}
}
// TODO(fatedier): Referencing the implementation of frpc, encapsulate the input parameters as SessionContext.
func NewControl(
ctx context.Context,
rc *controller.ResourceController,
pxyManager *proxy.Manager,
pluginManager *plugin.Manager,
authVerifier auth.Verifier,
ctlConn net.Conn,
ctlConnEncrypted bool,
loginMsg *msg.Login,
serverCfg *v1.ServerConfig,
) (*Control, error) {
poolCount := loginMsg.PoolCount
if poolCount > int(serverCfg.Transport.MaxPoolCount) {
poolCount = int(serverCfg.Transport.MaxPoolCount)
}
ctl := &Control{
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
authVerifier: authVerifier,
conn: ctlConn,
loginMsg: loginMsg,
workConnCh: make(chan net.Conn, poolCount+10),
proxies: make(map[string]proxy.Proxy),
poolCount: poolCount,
portsUsedNum: 0,
runID: loginMsg.RunID,
serverCfg: serverCfg,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
doneCh: make(chan struct{}),
}
ctl.lastPing.Store(time.Now())
if ctlConnEncrypted {
cryptoRW, err := netpkg.NewCryptoReadWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
if err != nil {
return nil, err
}
ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
} else {
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
return ctl, nil
}
// Start send a login success message to client and start working.
func (ctl *Control) Start() {
loginRespMsg := &msg.LoginResp{
Version: version.Full(),
RunID: ctl.runID,
Error: "",
}
_ = msg.WriteMsg(ctl.conn, loginRespMsg)
go func() {
for i := 0; i < ctl.poolCount; i++ {
// ignore error here, that means that this control is closed
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
}
}()
go ctl.worker()
}
func (ctl *Control) Close() error {
ctl.conn.Close()
return nil
}
func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl
xl.Infof("由客户端更改 [%s]", newCtl.runID)
ctl.runID = ""
ctl.conn.Close()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Errorf("[HayFrp] 致命错误Panic爆了: %v", err)
xl.Errorf(string(debug.Stack()))
}
}()
select {
case ctl.workConnCh <- conn:
xl.Debugf("新活动链接已注册")
return nil
default:
xl.Debugf("[HayFrp] 活动连接池已满,正在丢弃")
return fmt.Errorf("[HayFrp] 活动连接池已满,正在丢弃")
}
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Errorf("[HayFrp] 致命错误Panic爆了: %v", err)
xl.Errorf(string(debug.Stack()))
}
}()
var ok bool
// get a work connection from the pool
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
return
}
xl.Debugf("[HayFrp] 从池中获取工作连接")
default:
// no work connections available in the poll, send message to frpc to get more
if err := ctl.msgDispatcher.Send(&msg.ReqWorkConn{}); err != nil {
return nil, fmt.Errorf("control is already closed")
}
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
xl.Warnf("[HayFrp] 没有可用的工作连接, %v", err)
return
}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
err = fmt.Errorf("[HayFrp] 尝试获取工作连接超时")
xl.Warnf("%v", err)
return
}
}
// When we get a work connection from pool, replace it with a new one.
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
return
}
func (ctl *Control) heartbeatWorker() {
if ctl.serverCfg.Transport.HeartbeatTimeout <= 0 {
return
}
xl := ctl.xl
go wait.Until(func() {
if time.Since(ctl.lastPing.Load().(time.Time)) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warnf("[HayFrp] 心跳超时")
ctl.conn.Close()
return
}
}, time.Second, ctl.doneCh)
}
// block until Control closed
func (ctl *Control) WaitClosed() {
<-ctl.doneCh
}
func (ctl *Control) worker() {
xl := ctl.xl
go ctl.heartbeatWorker()
go ctl.msgDispatcher.Run()
<-ctl.msgDispatcher.Done()
ctl.conn.Close()
ctl.mu.Lock()
defer ctl.mu.Unlock()
close(ctl.workConnCh)
for workConn := range ctl.workConnCh {
workConn.Close()
}
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.pxyManager.Del(pxy.GetName())
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: msg.CloseProxy{
ProxyName: pxy.GetName(),
},
}
go func() {
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
}
metrics.Server.CloseClient()
xl.Infof("[HayFrp] 客户端退出成功")
close(ctl.doneCh)
}
func (ctl *Control) registerMsgHandlers() {
ctl.msgDispatcher.RegisterHandler(&msg.NewProxy{}, ctl.handleNewProxy)
ctl.msgDispatcher.RegisterHandler(&msg.Ping{}, ctl.handlePing)
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleVisitor{}, msg.AsyncHandler(ctl.handleNatHoleVisitor))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleClient{}, msg.AsyncHandler(ctl.handleNatHoleClient))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleReport{}, msg.AsyncHandler(ctl.handleNatHoleReport))
ctl.msgDispatcher.RegisterHandler(&msg.CloseProxy{}, ctl.handleCloseProxy)
}
func (ctl *Control) handleNewProxy(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.NewProxy)
content := &plugin.NewProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
NewProxy: *inMsg,
}
var remoteAddr string
retContent, err := ctl.pluginManager.NewProxy(content)
if err == nil {
inMsg = &retContent.NewProxy
remoteAddr, err = ctl.RegisterProxy(inMsg)
}
// register proxy in this control
resp := &msg.NewProxyResp{
ProxyName: inMsg.ProxyName,
}
if err != nil {
xl.Warnf("[HayFrp] 新的隧道 [%s] 类型 [%s] 发生错误: %v", inMsg.ProxyName, inMsg.ProxyType, err)
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", inMsg.ProxyName),
err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient))
} else {
resp.RemoteAddr = remoteAddr
xl.Infof("[HayFrp] 新的隧道 [%s] 类型 [%s] 成功", inMsg.ProxyName, inMsg.ProxyType)
metrics.Server.NewProxy(inMsg.ProxyName, inMsg.ProxyType)
}
_ = ctl.msgDispatcher.Send(resp)
}
func (ctl *Control) handlePing(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.Ping)
content := &plugin.PingContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
Ping: *inMsg,
}
retContent, err := ctl.pluginManager.Ping(content)
if err == nil {
inMsg = &retContent.Ping
err = ctl.authVerifier.VerifyPing(inMsg)
}
if err != nil {
xl.Warnf("[HayFrp] 收到无效Ping: %v", err)
_ = ctl.msgDispatcher.Send(&msg.Pong{
Error: util.GenerateResponseErrorString("[HayFrp] 无效Ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)),
})
return
}
ctl.lastPing.Store(time.Now())
xl.Debugf("[HayFrp] 收到心跳")
_ = ctl.msgDispatcher.Send(&msg.Pong{})
}
func (ctl *Control) handleNatHoleVisitor(m msg.Message) {
inMsg := m.(*msg.NatHoleVisitor)
ctl.rc.NatHoleController.HandleVisitor(inMsg, ctl.msgTransporter, ctl.loginMsg.User)
}
func (ctl *Control) handleNatHoleClient(m msg.Message) {
inMsg := m.(*msg.NatHoleClient)
ctl.rc.NatHoleController.HandleClient(inMsg, ctl.msgTransporter)
}
func (ctl *Control) handleNatHoleReport(m msg.Message) {
inMsg := m.(*msg.NatHoleReport)
ctl.rc.NatHoleController.HandleReport(inMsg)
}
func (ctl *Control) handleCloseProxy(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.CloseProxy)
_ = ctl.CloseProxy(inMsg)
xl.Infof("[HayFrp] 关闭隧道 [%s] 成功.", inMsg.ProxyName)
}
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf v1.ProxyConfigurer
s, err := api.NewService(ctl.serverCfg.ApiBaseUrl)
if err != nil {
return remoteAddr, err
}
// Load configures from NewProxy message and validate.
pxyConf, err = config.NewProxyConfigurerFromMsg(pxyMsg, ctl.serverCfg)
if err != nil {
return
}
if ctl.serverCfg.EnableApi {
nowTime := time.Now().Unix()
ok, err := s.CheckProxy(ctl.loginMsg.User, pxyMsg, nowTime, ctl.serverCfg.ApiToken)
if err != nil {
return remoteAddr, err
}
if !ok {
return remoteAddr, fmt.Errorf("[HayFrp] 未知隧道配置文件")
}
}
// User info
userInfo := plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.runID,
}
// NewProxy will return an interface Proxy.
// In fact, it creates different proxies based on the proxy type. We just call run() here.
pxy, err := proxy.NewProxy(ctl.ctx, &proxy.Options{
UserInfo: userInfo,
LoginMsg: ctl.loginMsg,
PoolCount: ctl.poolCount,
ResourceController: ctl.rc,
GetWorkConnFn: ctl.GetWorkConn,
Configurer: pxyConf,
ServerCfg: ctl.serverCfg,
})
if err != nil {
return remoteAddr, err
}
// Check ports used number in each client
if ctl.serverCfg.MaxPortsPerClient > 0 {
ctl.mu.Lock()
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
ctl.mu.Unlock()
err = fmt.Errorf("[HayFrp] 超过最大端口连接数")
return
}
ctl.portsUsedNum += pxy.GetUsedPortsNum()
ctl.mu.Unlock()
defer func() {
if err != nil {
ctl.mu.Lock()
ctl.portsUsedNum -= pxy.GetUsedPortsNum()
ctl.mu.Unlock()
}
}()
}
if ctl.pxyManager.Exist(pxyMsg.ProxyName) {
err = fmt.Errorf("[HayFrp] 端口 [%s] 已被使用", pxyMsg.ProxyName)
return
}
remoteAddr, err = pxy.Run()
if err != nil {
return
}
defer func() {
if err != nil {
pxy.Close()
}
}()
err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
if err != nil {
return
}
ctl.mu.Lock()
ctl.proxies[pxy.GetName()] = pxy
ctl.mu.Unlock()
return
}
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock()
pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok {
ctl.mu.Unlock()
return
}
if ctl.serverCfg.MaxPortsPerClient > 0 {
ctl.portsUsedNum -= pxy.GetUsedPortsNum()
}
pxy.Close()
ctl.pxyManager.Del(pxy.GetName())
delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock()
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: msg.CloseProxy{
ProxyName: pxy.GetName(),
},
}
go func() {
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
return
}