From 83f66f492470a19632d56ceec37b8e843c0a44af Mon Sep 17 00:00:00 2001 From: gy <997485446@qq.com> Date: Tue, 15 Aug 2023 11:22:48 +0800 Subject: [PATCH] 123 --- pkg/controller/front/link.go | 261 +++++++++++++++++++++++++++++++++ pkg/redistab/redis_key_test.go | 28 ++++ 2 files changed, 289 insertions(+) create mode 100644 pkg/controller/front/link.go create mode 100644 pkg/redistab/redis_key_test.go diff --git a/pkg/controller/front/link.go b/pkg/controller/front/link.go new file mode 100644 index 0000000..9a7c5ad --- /dev/null +++ b/pkg/controller/front/link.go @@ -0,0 +1,261 @@ +package front + +import ( + "fmt" + "github.com/gin-gonic/gin" + "github.com/go-redis/redis" + "github.com/gorilla/websocket" + "log" + "net/http" + "sync" + "time" + "worktest/pkg/controller" + "worktest/pkg/redistab" + _ "worktest/pkg/redistab" +) + + + +func RedisRecWebsocket(c *gin.Context) { + var wg sync.WaitGroup + var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + cn, err := upgrader.Upgrade(c.Writer, c.Request, http.Header{"Set-Cookie": {"sessionID=1234"}}) + + if err != nil { + controller.Error(c, err.Error()) + return + } + + defer cn.Close() + wg.Add(1) + + go ListenRedisRec(cn,&wg) + + fmt.Println("等待ing") + wg.Wait() + fmt.Println("等待结束") +} + +func RedisWebsocket(c *gin.Context) { + var wg sync.WaitGroup + var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + cn, err := upgrader.Upgrade(c.Writer, c.Request, http.Header{"Set-Cookie": {"sessionID=1234"}}) + + if err != nil { + controller.Error(c, err.Error()) + return + } + + defer cn.Close() + wg.Add(1) + + go ListenRedis(cn,&wg) + + fmt.Println("等待ing") + wg.Wait() + fmt.Println("等待结束") +} + +func ping(ws *websocket.Conn, done chan struct{}) { + ticker := time.NewTicker(10*time.Second) + defer ticker.Stop() + for { + if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + done<- struct{}{} + log.Println("ping:", err) + return + } + } +} + +func ListenRedis(cn *websocket.Conn, wg *sync.WaitGroup) { + + defer wg.Done() + + + //redis 订阅数据 + pubsub := redistab.RedisClient.Subscribe( redistab.GetChannelKey()) // 订阅 mychannel + ch := pubsub.Channel() + defer pubsub.Close() + + + stdoutDone := make(chan struct{}) + go ping(cn, stdoutDone) //存活检测 + + + for { + + /* + fmt.Println("7-----") + var msg *redis.Message + msg, ok := <-ch + fmt.Println("1--------------",msg,ok) + if !ok { + continue + } + */ + + //1 pubsub.Receive()//receive 监听数据 + + var msg *redis.Message + select { + case <-stdoutDone: + fmt.Println("前端终止连接") + return + case msg = <-ch: + fmt.Println("接受数据",msg) + } + + err := cn.WriteMessage(websocket.TextMessage, []byte(msg.String())) + if err != nil{ + return + } + } + +} + +func ListenRedis3(cn *websocket.Conn, wg *sync.WaitGroup) { + + defer wg.Done() + + + go func(cn *websocket.Conn) { + for { + fmt.Println("123123") + /* + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + + fmt.Println("ping") + // 发送 Ping 消息 + err := cn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + // 处理错误 + return + } + + <-ticker.C + } + */ + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := cn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10 * time.Second)); err != nil { + log.Println("ping:", err) + } + } + } + } + }(cn) + + go func(cn *websocket.Conn) { + for { + fmt.Println("123123") + for { + + fmt.Println("ping") + // 发送 Ping 消息 + err := cn.WriteMessage(websocket.TextMessage, []byte("222222")) + //err := cn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + // 处理错误 + return + } + + } + } + }(cn) + + + for { + fmt.Println("123123") + err := cn.WriteMessage(websocket.TextMessage, []byte("1111111")) + if err != nil { + log.Println(" server write err:", err) + break + } + } + +} + +func ListenRedis2(cn *websocket.Conn, wg *sync.WaitGroup) { + + defer wg.Done() + + for { + + mt, message, err := cn.ReadMessage() + if err != nil { + log.Println("server read:", err) + break + } + + log.Printf("server recv msg: %s", message) + msg := string(message) + if msg == "woshi client1" { + message = []byte("client1 去服务端了一趟") + + } else if msg == "woshi client2" { + message = []byte("client2 去服务端了一趟") + } + + err = cn.WriteMessage(mt, message) + if err != nil { + log.Println(" server write err:", err) + break + } + } + +} + + + +func ListenRedisRec(cn *websocket.Conn, wg *sync.WaitGroup) { + + defer wg.Done() + + //redis 订阅数据 + pubsub := redistab.RedisClient.Subscribe( redistab.GetChannelKey()) // 订阅 mychannel + defer pubsub.Close() + + //stdoutDone := make(chan struct{}) + //go ping(cn, stdoutDone) //存活检测 + + for { + + var message *redis.Message + msg,err := pubsub.Receive()//receive 监听数据 + if err != nil{ + return + } + + switch info := msg.(type) { + case *redis.Subscription: + continue + case *redis.Pong: + continue + case *redis.Message: + message = info + default: + return + } + + + err = cn.WriteMessage(websocket.TextMessage, []byte(message.String())) + + if err != nil{ + return + } + } + +} diff --git a/pkg/redistab/redis_key_test.go b/pkg/redistab/redis_key_test.go new file mode 100644 index 0000000..18d2831 --- /dev/null +++ b/pkg/redistab/redis_key_test.go @@ -0,0 +1,28 @@ +package redistab + +import ( +"fmt" + "testing" + "time" +) + +func TestGetProductBitMap(t *testing.T) { + // 创建一个带有缓冲区的channel + ch := make(chan int, 3) + + // 启动多个监听者 + for i := 1; i <= 3; i++ { + go func(i int) { + // 从channel中读取数据,并将其打印到控制台 + fmt.Printf("监听器%d收到消息:%d\n", i, <-ch) + }(i) + } + + // 向channel发送消息 + for i := 1; i <= 5; i++ { + ch <- i + } + + // 等待所有监听者处理完所有消息 + time.Sleep(3*time.Second) +}