package front import ( "fmt" "github.com/gin-gonic/gin" "github.com/go-redis/redis" "github.com/gorilla/websocket" "log" "net/http" "sync" "time" "worktest/pkg/controller" "worktest/pkg/redistab" _ "worktest/pkg/redistab" ) func RedisRecWebsocket(c *gin.Context) { var wg sync.WaitGroup var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } cn, err := upgrader.Upgrade(c.Writer, c.Request, http.Header{"Set-Cookie": {"sessionID=1234"}}) if err != nil { controller.Error(c, err.Error()) return } defer cn.Close() wg.Add(1) go ListenRedisRec(cn,&wg) fmt.Println("等待ing") wg.Wait() fmt.Println("等待结束") } func RedisWebsocket(c *gin.Context) { var wg sync.WaitGroup var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } cn, err := upgrader.Upgrade(c.Writer, c.Request, http.Header{"Set-Cookie": {"sessionID=1234"}}) if err != nil { controller.Error(c, err.Error()) return } defer cn.Close() wg.Add(1) go ListenRedis(cn,&wg) fmt.Println("等待ing") wg.Wait() fmt.Println("等待结束") } func ping(ws *websocket.Conn, done chan struct{}) { ticker := time.NewTicker(10*time.Second) defer ticker.Stop() for { if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { done<- struct{}{} log.Println("ping:", err) return } } } func ListenRedis(cn *websocket.Conn, wg *sync.WaitGroup) { defer wg.Done() //redis 订阅数据 pubsub := redistab.RedisClient.Subscribe( redistab.GetChannelKey()) // 订阅 mychannel ch := pubsub.Channel() defer pubsub.Close() stdoutDone := make(chan struct{}) go ping(cn, stdoutDone) //存活检测 for { /* fmt.Println("7-----") var msg *redis.Message msg, ok := <-ch fmt.Println("1--------------",msg,ok) if !ok { continue } */ //1 pubsub.Receive()//receive 监听数据 var msg *redis.Message select { case <-stdoutDone: fmt.Println("前端终止连接") return case msg = <-ch: fmt.Println("接受数据",msg) } err := cn.WriteMessage(websocket.TextMessage, []byte(msg.String())) if err != nil{ return } } } func ListenRedis3(cn *websocket.Conn, wg *sync.WaitGroup) { defer wg.Done() go func(cn *websocket.Conn) { for { fmt.Println("123123") /* ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { fmt.Println("ping") // 发送 Ping 消息 err := cn.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { // 处理错误 return } <-ticker.C } */ ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if err := cn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10 * time.Second)); err != nil { log.Println("ping:", err) } } } } }(cn) go func(cn *websocket.Conn) { for { fmt.Println("123123") for { fmt.Println("ping") // 发送 Ping 消息 err := cn.WriteMessage(websocket.TextMessage, []byte("222222")) //err := cn.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { // 处理错误 return } } } }(cn) for { fmt.Println("123123") err := cn.WriteMessage(websocket.TextMessage, []byte("1111111")) if err != nil { log.Println(" server write err:", err) break } } } func ListenRedis2(cn *websocket.Conn, wg *sync.WaitGroup) { defer wg.Done() for { mt, message, err := cn.ReadMessage() if err != nil { log.Println("server read:", err) break } log.Printf("server recv msg: %s", message) msg := string(message) if msg == "woshi client1" { message = []byte("client1 去服务端了一趟") } else if msg == "woshi client2" { message = []byte("client2 去服务端了一趟") } err = cn.WriteMessage(mt, message) if err != nil { log.Println(" server write err:", err) break } } } func ListenRedisRec(cn *websocket.Conn, wg *sync.WaitGroup) { defer wg.Done() //redis 订阅数据 pubsub := redistab.RedisClient.Subscribe( redistab.GetChannelKey()) // 订阅 mychannel defer pubsub.Close() //stdoutDone := make(chan struct{}) //go ping(cn, stdoutDone) //存活检测 for { var message *redis.Message msg,err := pubsub.Receive()//receive 监听数据 if err != nil{ return } switch info := msg.(type) { case *redis.Subscription: continue case *redis.Pong: continue case *redis.Message: message = info default: return } err = cn.WriteMessage(websocket.TextMessage, []byte(message.String())) if err != nil{ return } } }