You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
4.6 KiB
Go

1 year ago
package front
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis"
"github.com/gorilla/websocket"
"log"
"net/http"
"sync"
"time"
"worktest/pkg/controller"
"worktest/pkg/redistab"
_ "worktest/pkg/redistab"
)
func RedisRecWebsocket(c *gin.Context) {
var wg sync.WaitGroup
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
cn, err := upgrader.Upgrade(c.Writer, c.Request, http.Header{"Set-Cookie": {"sessionID=1234"}})
if err != nil {
controller.Error(c, err.Error())
return
}
defer cn.Close()
wg.Add(1)
go ListenRedisRec(cn,&wg)
fmt.Println("等待ing")
wg.Wait()
fmt.Println("等待结束")
}
func RedisWebsocket(c *gin.Context) {
var wg sync.WaitGroup
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
cn, err := upgrader.Upgrade(c.Writer, c.Request, http.Header{"Set-Cookie": {"sessionID=1234"}})
if err != nil {
controller.Error(c, err.Error())
return
}
defer cn.Close()
wg.Add(1)
go ListenRedis(cn,&wg)
fmt.Println("等待ing")
wg.Wait()
fmt.Println("等待结束")
}
func ping(ws *websocket.Conn, done chan struct{}) {
ticker := time.NewTicker(10*time.Second)
defer ticker.Stop()
for {
if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
done<- struct{}{}
log.Println("ping:", err)
return
}
}
}
func ListenRedis(cn *websocket.Conn, wg *sync.WaitGroup) {
defer wg.Done()
//redis 订阅数据
pubsub := redistab.RedisClient.Subscribe( redistab.GetChannelKey()) // 订阅 mychannel
ch := pubsub.Channel()
defer pubsub.Close()
stdoutDone := make(chan struct{})
go ping(cn, stdoutDone) //存活检测
for {
/*
fmt.Println("7-----")
var msg *redis.Message
msg, ok := <-ch
fmt.Println("1--------------",msg,ok)
if !ok {
continue
}
*/
//1 pubsub.Receive()//receive 监听数据
var msg *redis.Message
select {
case <-stdoutDone:
fmt.Println("前端终止连接")
return
case msg = <-ch:
fmt.Println("接受数据",msg)
}
err := cn.WriteMessage(websocket.TextMessage, []byte(msg.String()))
if err != nil{
return
}
}
}
func ListenRedis3(cn *websocket.Conn, wg *sync.WaitGroup) {
defer wg.Done()
go func(cn *websocket.Conn) {
for {
fmt.Println("123123")
/*
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
fmt.Println("ping")
// 发送 Ping 消息
err := cn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
// 处理错误
return
}
<-ticker.C
}
*/
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := cn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10 * time.Second)); err != nil {
log.Println("ping:", err)
}
}
}
}
}(cn)
go func(cn *websocket.Conn) {
for {
fmt.Println("123123")
for {
fmt.Println("ping")
// 发送 Ping 消息
err := cn.WriteMessage(websocket.TextMessage, []byte("222222"))
//err := cn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
// 处理错误
return
}
}
}
}(cn)
for {
fmt.Println("123123")
err := cn.WriteMessage(websocket.TextMessage, []byte("1111111"))
if err != nil {
log.Println(" server write err:", err)
break
}
}
}
func ListenRedis2(cn *websocket.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for {
mt, message, err := cn.ReadMessage()
if err != nil {
log.Println("server read:", err)
break
}
log.Printf("server recv msg: %s", message)
msg := string(message)
if msg == "woshi client1" {
message = []byte("client1 去服务端了一趟")
} else if msg == "woshi client2" {
message = []byte("client2 去服务端了一趟")
}
err = cn.WriteMessage(mt, message)
if err != nil {
log.Println(" server write err:", err)
break
}
}
}
func ListenRedisRec(cn *websocket.Conn, wg *sync.WaitGroup) {
defer wg.Done()
//redis 订阅数据
pubsub := redistab.RedisClient.Subscribe( redistab.GetChannelKey()) // 订阅 mychannel
defer pubsub.Close()
//stdoutDone := make(chan struct{})
//go ping(cn, stdoutDone) //存活检测
for {
var message *redis.Message
msg,err := pubsub.Receive()//receive 监听数据
if err != nil{
return
}
switch info := msg.(type) {
case *redis.Subscription:
continue
case *redis.Pong:
continue
case *redis.Message:
message = info
default:
return
}
err = cn.WriteMessage(websocket.TextMessage, []byte(message.String()))
if err != nil{
return
}
}
}