构建一个插件流数据来源
在Grafana,你可以设置你的仪表盘在一定时间间隔自动刷新,无论您使用什么数据源。不幸的是,这意味着您的查询请求发送的所有数据,无论数据已经改变了。将流添加到插件有助于减少查询所以仪表板是新数据可用时才更新。
在你开始之前
这个指导假设您已经熟悉如何建立一个数据源插件
Grafana使用RxJS不断发送数据从数据源到面板可视化。
注意:了解更多关于RxJs,参考RxJS文档。
流添加到您的数据源
为您的数据源启用流插件新数据可用时更新仪表板。
例如,一个插件可以连接到一个websocket流数据来源,或订阅消息总线,并更新可用的可视化,每当一个新消息。
步骤1:编辑plugin.json
文件
使流的数据源plugin.json
文件。
{“流”:真正的}
步骤2:改变的签名查询
方法
修改的签名查询
方法返回一个可观测的
从rxjs
包中。确保你删除异步
关键字。
从“rxjs”进口{可见};
查询(选项:DataQueryRequest < MyQuery >):可观察到的< DataQueryResponse > {/ /……}
步骤3:创建一个可观测的
实例为每个查询
创建一个可观测的
为每个查询实例,然后结合使用合并
函数的rxjs
包中。
从“rxjs”进口{可观测、合并};
const可见= options.targets.map((目标)= >{返回新的可见< DataQueryResponse >((订户)= > {/ /……});});返回合并(……可见);
步骤4:创建一个CircularDataFrame
实例
在订阅
功能,创建一个CircularDataFrame
实例。
从‘@grafana /数据导入{CircularDataFrame};
const帧= new CircularDataFrame({附加:“尾巴”,容量:1000});框架。refId = query.refId;框架。addField ({name:‘时间’,类型:FieldType。});框架。addField ({name:“价值”,类型:FieldType。});
循环数据帧有能力有限。当一个圆形的数据帧到达它的容量,最古老的数据点。
第五步:发送更新后的数据帧
使用subscriber.next ()
发送更新后的数据帧时接收新的更新。
从‘@grafana /数据导入{LoadingState};
const intervalId = setInterval(() = >{框架。添加({时间:Date.now(),价值:math . random ()});subscriber.next({ data: [frame], key: query.refId, state: LoadingState.Streaming, }); }, 500); return () => { clearInterval(intervalId); };
注意:在实践中,你会来电话
subscriber.next
一旦你收到新websocket数据或消息总线。在上面的示例中,数据被收到每500毫秒。
示例代码进行最后查询
方法
查询(选项:DataQueryRequest < MyQuery >):可观察到的< DataQueryResponse > {= options.targets const流。地图(目标= > {const查询=违约(目标,defaultQuery);返回新可见< DataQueryResponse >(用户= > {const帧= new CircularDataFrame({附加:“尾巴”,容量:1000});框架。refId = query.refId;框架。addField ({name:‘时间’,类型:FieldType。});框架。addField ({name:“价值”,类型:FieldType。});const intervalId = setInterval(() = >{框架。添加({时间:Date.now(),价值:math . random ()}); subscriber.next({ data: [frame], key: query.refId, state: LoadingState.Streaming, }); }, 100); return () => { clearInterval(intervalId); }; }); }); return merge(...streams); }
这个例子的一个限制是,面板可视化清除每次你更新仪表板。如果你有对历史数据的访问,您可以添加它,或回填,第一次调用前的数据帧subscriber.next ()
。
对于流媒体插件的另一个例子,请参考流websocket例子在GitHub上。