1
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

38 lines
1.0 KiB

package pipeline
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/centrifugal/centrifuge"
)
type LocalSubscribersDataOutput struct {
// TODO: refactor to depend on interface (avoid Centrifuge dependency here).
node *centrifuge.Node
}
func NewLocalSubscribersDataOutput(node *centrifuge.Node) *LocalSubscribersDataOutput {
return &LocalSubscribersDataOutput{node: node}
}
const DataOutputTypeLocalSubscribers = "localSubscribers"
func (out *LocalSubscribersDataOutput) Type() string {
return DataOutputTypeLocalSubscribers
}
func (out *LocalSubscribersDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) {
channelID := vars.Channel
channel := orgchannel.PrependOrgID(vars.OrgID, channelID)
pub := &centrifuge.Publication{
Data: data,
}
err := out.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{})
if err != nil {
return nil, fmt.Errorf("error publishing %s: %w", string(data), err)
}
return nil, nil
}