ribbonè´è½½å衡详解
æå¡ç«¯è´è½½åè¡¡ï¼å¨å®¢æ·ç«¯åæå¡ç«¯ä¸é´ä½¿ç¨ä»£çï¼lvs å nginxã
硬件è´è½½åè¡¡ç设å¤ææ¯è½¯ä»¶è´è½½åè¡¡ç软件模åé½ä¼ç»´æ¤ä¸ä¸ªä¸æå¯ç¨çæå¡ç«¯æ¸ åï¼éè¿å¿è·³æ£æµæ¥åé¤æ éçæå¡ç«¯èç¹ä»¥ä¿è¯æ¸ åä¸é½æ¯å¯ä»¥æ£å¸¸è®¿é®çæå¡ç«¯èç¹ãå½å®¢æ·ç«¯åé请æ±å°è´è½½å衡设å¤çæ¶åï¼è¯¥è®¾å¤ææç§ç®æ³ï¼æ¯å¦çº¿æ§è½®è¯¢ãææéè´è½½ãææµéè´è½½çï¼ä»ç»´æ¤çå¯ç¨æå¡ç«¯æ¸ åä¸ååºä¸å°æå¡ç«¯ç«¯å°åï¼ç¶åè¿è¡è½¬åã
客æ·ç«¯è´è½½åè¡¡ï¼æ ¹æ®èªå·±çæ åµåè´è½½ãRibbonã
客æ·ç«¯è´è½½åè¡¡åæå¡ç«¯è´è½½åè¡¡æ大çåºå«å¨äº æå¡ç«¯å°åå表çåå¨ä½ç½®ï¼ä»¥åè´è½½ç®æ³å¨åªéã
2ãSpring Cloudçè´è½½åè¡¡æºå¶çå®ç°
Spring Cloud Ribbonæ¯ä¸ä¸ªåºäºHTTPåTCPç客æ·ç«¯è´è½½åè¡¡å·¥å ·ï¼å®åºäºNetflix Ribbonå®ç°ãéè¿Spring Cloudçå°è£ ï¼å¯ä»¥è®©æ们轻æ¾å°å°é¢åæå¡çREST模ç请æ±èªå¨è½¬æ¢æ客æ·ç«¯è´è½½åè¡¡çæå¡è°ç¨ãRibbonå®ç°å®¢æ·ç«¯çè´è½½åè¡¡ï¼è´è½½åè¡¡å¨æä¾å¾å¤å¯¹.netflix.client.conf.CommonClientConfigKeyã
<clientName>.<nameSpace>.NFLoadBalancerClassName=xx
<clientName>.<nameSpace>.NFLoadBalancerRuleClassName=xx
<clientName>.<nameSpace>.NFLoadBalancerPingClassName=xx
<clientName>.<nameSpace>.NIWSServerListClassName=xx
<clientName>.<nameSpace>.NIWSServerListFilterClassName=xx
com.netflix.client.config.IClientConfigï¼Ribbonç客æ·ç«¯é ç½®ï¼é»è®¤éç¨com.netflix.client.config.DefaultClientConfigImplå®ç°ã
com.netflix.loadbalancer.IRuleï¼Ribbonçè´è½½åè¡¡çç¥ï¼é»è®¤éç¨com.netflix.loadbalancer.ZoneAvoidanceRuleå®ç°ï¼è¯¥çç¥è½å¤å¨å¤åºåç¯å¢ä¸éåºæä½³åºåçå®ä¾è¿è¡è®¿é®ã
com.netflix.loadbalancer.IPingï¼Ribbonçå®ä¾æ£æ¥çç¥ï¼é»è®¤éç¨com.netflix.loadbalancer.NoOpPingå®ç°ï¼è¯¥æ£æ¥çç¥æ¯ä¸ä¸ªç¹æ®çå®ç°ï¼å®é ä¸å®å¹¶ä¸ä¼æ£æ¥å®ä¾æ¯å¦å¯ç¨ï¼èæ¯å§ç»è¿åtrueï¼é»è®¤è®¤ä¸ºæææå¡å®ä¾é½æ¯å¯ç¨çã
com.netflix.loadbalancer.ServerListï¼æå¡å®ä¾æ¸ åçç»´æ¤æºå¶ï¼é»è®¤éç¨com.netflix.loadbalancer.ConfigurationBasedServerListå®ç°ã
com.netflix.loadbalancer.ServerListFilterï¼æå¡å®ä¾æ¸ åè¿æ»¤æºå¶ï¼é»è®¤éorg.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilterï¼è¯¥çç¥è½å¤ä¼å è¿æ»¤åºä¸è¯·æ±æ¹å¤äºååºåçæå¡å®ä¾ã
com.netflix.loadbalancer.ILoadBalancerï¼è´è½½åè¡¡å¨ï¼é»è®¤éç¨com.netflix.loadbalancer.ZoneAwareLoadBalancerå®ç°ï¼å®å ·å¤äºåºåæç¥çè½åã
ä¸é¢çé ç½®æ¯å¨é¡¹ç®ä¸æ²¡æå¼å ¥spring Cloud Eurekaï¼å¦æå¼å ¥äºEurekaåRibbonä¾èµæ¶ï¼èªå¨åé ç½®ä¼æä¸äºä¸åã
éè¿èªå¨åé ç½®çå®ç°ï¼å¯ä»¥è½»æ¾çå®ç°å®¢æ·ç«¯çè´è½½åè¡¡ãåæ¶ï¼é对ä¸äºä¸ªæ§åéæ±ï¼æ们å¯ä»¥æ¹ä¾¿çæ¿æ¢ä¸é¢çè¿äºé»è®¤å®ç°ï¼åªéè¦å¨springbootåºç¨ä¸å建对åºçå®ç°å®ä¾å°±è½è¦çè¿äºé»è®¤çé ç½®å®ç°ã
@Configuration
public class MyRibbonConfiguration {
@Bean
public IRule ribbonRule(){
return new RandomRule();
}
}
è¿æ ·å°±ä¼ä½¿ç¨P使ç¨äºRandomRuleå®ä¾æ¿ä»£äºé»è®¤çcom.netflix.loadbalancer.ZoneAvoidanceRuleã
ä¹å¯ä»¥ä½¿ç¨@RibbonClient注解å®ç°æ´ç»ç²åº¦ç客æ·ç«¯é ç½®
对äºRibbonçåæ°é常æäºç§æ¹å¼ï¼å ¨å±é 置以åæå®å®¢æ·ç«¯é ç½®
å ¨å±é ç½®çæ¹å¼å¾ç®å
åªéè¦ä½¿ç¨ribbon.<key>=<value>æ ¼å¼è¿è¡é ç½®å³å¯ãå ¶ä¸ï¼<key>代表äºRibbon客æ·ç«¯é ç½®çåæ°åï¼<value>å代表äºå¯¹åºåæ°çå¼ãæ¯å¦ï¼æ们å¯ä»¥æ³ä¸é¢è¿æ ·é ç½®Ribbonçè¶ æ¶æ¶é´
ribbon.ConnectTimeout=
ribbon.ServerListRefreshInterval= ribbonè·åæå¡å®æ¶æ¶é´
å ¨å±é ç½®å¯ä»¥ä½ä¸ºé»è®¤å¼è¿è¡è®¾ç½®ï¼å½æå®å®¢æ·ç«¯é ç½®äºç¸åºçkeyçå¼æ¶ï¼å°è¦çå ¨å±é ç½®çå 容
æå®å®¢æ·ç«¯çé ç½®æ¹å¼
<client>.ribbon.<key>=<value>çæ ¼å¼è¿è¡é ç½®.<client>表示æå¡åï¼æ¯å¦æ²¡ææå¡æ²»çæ¡æ¶çæ¶åï¼å¦Eurekaï¼ï¼æ们éè¦æå®å®ä¾æ¸ åï¼å¯ä»¥æå®æå¡åæ¥å详ç»çé ç½®ï¼
user-service.ribbon.listOfServers=localhost:,码框localhost:,localhost:
对äºRibbonåæ°çkey以åvalueç±»åçå®ä¹ï¼å¯ä»¥éè¿æ¥çcom.netflix.client.config.CommonClientConfigKeyç±»ã
å½å¨spring Cloudçåºç¨åæ¶å¼å ¥Spring cloud RibbonåSpring Cloud Eurekaä¾èµæ¶ï¼ä¼è§¦åEurekaä¸å®ç°ç对Ribbonçèªå¨åé ç½®ãè¿æ¶çserverListçç»´æ¤æºå¶å®ç°å°è¢«com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerListçå®ä¾æè¦çï¼è¯¥å®ç°ä¼è®²æå¡æ¸ åå表交ç»Eurekaçæå¡æ²»çæºå¶æ¥è¿è¡ç»´æ¤ãIPingçå®ç°å°è¢«com.netflix.niws.loadbalancer.NIWSDiscoveryPingçå®ä¾æè¦çï¼è¯¥å®ä¾ä¹å°å®ä¾æ¥å£çä»»å¡äº¤ç»äºæå¡æ²»çæ¡æ¶æ¥è¿è¡ç»´æ¤ãé»è®¤æ åµä¸ï¼ç¨äºè·åå®ä¾è¯·æ±çServerListæ¥å£å®ç°å°éç¨Spring Cloud Eurekaä¸å°è£ çorg.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerListï¼å ¶ç®çæ¯ä¸ºäºè®©å®ä¾ç»´æ¤çç¥æ´å éç¨ï¼æ以å°ä½¿ç¨ç©çå æ°æ®æ¥è¿è¡è´è½½åè¡¡ï¼èä¸æ¯ä½¿ç¨åççAWS AMIå æ°æ®ãå¨ä¸Spring cloud Eurekaç»å使ç¨çæ¶åï¼ä¸éè¦åå»æå®ç±»ä¼¼çuser-service.ribbon.listOfServersçåæ°æ¥æå®å ·ä½çæå¡å®ä¾æ¸ åï¼å 为Eurekaå°ä¼ä¸ºæ们维æ¤æææå¡çå®ä¾æ¸ åï¼è对äºRibbonçåæ°é ç½®ï¼æ们ä¾ç¶å¯ä»¥éç¨ä¹åç两ç§é ç½®æ¹å¼æ¥å®ç°ã
æ¤å¤ï¼ç±äºspring Cloud Ribboné»è®¤å®ç°äºåºå亲åçç¥ï¼æ以ï¼å¯ä»¥éè¿Eurekaå®ä¾çå æ°æ®é ç½®æ¥å®ç°åºååçå®ä¾é ç½®æ¹æ¡ãæ¯å¦å¯ä»¥å°ä¸åæºæ¿çå®ä¾é ç½®æä¸åçåºåå¼ï¼ä½ä¸ºè·¨åºåç容å¨æºå¶å®ç°ãèå®ç°ä¹é常ç®åï¼åªéè¦æå¡å®ä¾çå æ°æ®ä¸å¢å zoneåæ°æ¥æå®èªå·±æå¨çåºåï¼æ¯å¦ï¼
eureka.instance.metadataMap.zone=shanghai
å¨Spring Cloud Ribbonä¸Spring Cloud Eurekaç»åçå·¥ç¨ä¸ï¼æ们å¯ä»¥éè¿åæ°ç¦ç¨Eureka对Ribbonæå¡å®ä¾çç»´æ¤å®ç°ãè¿æ¶åéè¦èªå·±å»ç»´æ¤æå¡å®ä¾å表äºã
ribbon.eureka.enabled=false.
ç±äºSpring Cloud Eurekaå®ç°çæå¡æ²»çæºå¶å¼ºè°äºcapåççapæºå¶ï¼å³å¯ç¨æ§åå¯é æ§ï¼ï¼ä¸zookeeperè¿ç±»å¼ºè°cpï¼ä¸è´æ§ï¼å¯é æ§ï¼æå¡è´¨éæ¡æ¶æ大çåºå«å°±æ¯ï¼Eureka为äºå®ç°æ´é«çæå¡å¯ç¨æ§ï¼çºç²äºä¸å®çä¸è´æ§ï¼å¨æ端æ åµä¸å®æ¿æ¥åæ éå®ä¾ä¹ä¸è¦ä¸¢å¼"å¥åº·"å®ä¾ã
æ¯å¦è¯´ï¼å½æå¡æ³¨åä¸å¿çç½ç»åçæ éæå¼æ¶åï¼ç±äºææçæå¡å®ä¾æ æ³ç»´æ¤ç»çº¦å¿è·³ï¼å¨å¼ºè°apçæå¡æ²»çä¸å°ä¼ææææå¡å®ä¾åé¤æï¼èEurekaåä¼å ä¸ºè¶ è¿%çå®ä¾ä¸¢å¤±å¿è·³è触åä¿æ¤æºå¶ï¼æ³¨åä¸å¿å°ä¼ä¿çæ¤æ¶çææèç¹ï¼ä»¥å®ç°æå¡é´ä¾ç¶å¯ä»¥è¿è¡äºç¸è°ç¨çåºæ¯ï¼å³ä½¿å ¶ä¸æé¨åæ éèç¹ï¼ä½è¿æ ·åå¯ä»¥ç»§ç»ä¿é大å¤æ°æå¡çæ£å¸¸æ¶è´¹ã
å¨Camdençæ¬ï¼æ´åäºspring retryæ¥å¢å¼ºRestTemplateçéè¯è½åï¼å¯¹äºæ们å¼åè æ¥è¯´ï¼åªéè¦ç®åé ç½®ï¼å³å¯å®æéè¯çç¥ã
spring.cloud.loadbalancer.retry.enabled=true
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=
user-service.ribbon.ConnectTimeout=
user-service.ribbon.ReadTimeout=
user-service.ribbon.OkToRetryOnAllOperations=true
user-service.ribbon.MaxAutoRetriesNextServer=2
user-service.ribbon.maxAutoRetries=1
spring.cloud.loadbalancer.retry.enabled:该åæ°ç¨æ¥å¼å¯éè¯æºå¶ï¼å®é»è®¤æ¯å ³éçã
hystrix.command.default.execution.isolation.thread.timeoutInMillisecondsï¼æè·¯å¨çè¶ æ¶æ¶é´éè¦å¤§äºRibbonçè¶ æ¶æ¶é´ï¼ä¸ç¶ä¸ä¼è§¦åéè¯ã
user-service.ribbon.ConnectTimeoutï¼è¯·æ±è¿æ¥è¶ æ¶æ¶é´ã
user-service.ribbon.ReadTimeoutï¼è¯·æ±å¤ççè¶ æ¶æ¶é´
user-service.ribbon.OkToRetryOnAllOperationsï¼å¯¹æææä½è¯·æ±é½è¿è¡éè¯ã
user-service.ribbon.MaxAutoRetriesNextServerï¼åæ¢å®ä¾çéè¯æ¬¡æ°ã
user-service.ribbon.maxAutoRetriesï¼å¯¹å½åå®ä¾çéè¯æ¬¡æ°ã
æ ¹æ®ä»¥ä¸é ç½®ï¼å½è®¿é®å°æ é请æ±çæ¶åï¼å®ä¼åå°è¯è®¿é®ä¸æ¬¡å½åå®ä¾ï¼æ¬¡æ°ç±maxAutoRetriesé ç½®ï¼ï¼å¦æä¸è¡ï¼å°±æ¢ä¸ä¸ªå®ä¾è¿è¡è®¿é®ï¼å¦æè¿æ¯ä¸è¡ï¼åæ¢ä¸ä¸ªå®ä¾è®¿é®ï¼æ´æ¢æ¬¡æ°ç±MaxAutoRetriesNextServeré ç½®ï¼ï¼å¦æä¾ç¶ä¸è¡ï¼è¿å失败
项ç®å¯å¨çæ¶åä¼èªå¨ç为æ们å è½½LoadBalancerAutoConfigurationèªå¨é 置类ï¼è¯¥èªå¨é 置类åå§åæ¡ä»¶æ¯è¦æ±classpathå¿ é¡»è¦æRestTemplateè¿ä¸ªç±»ï¼å¿ é¡»è¦æLoadBalancerClientå®ç°ç±»ã
LoadBalancerAutoConfiguration为æ们干äºäºä»¶äºï¼ç¬¬ä¸ä»¶æ¯å建äºLoadBalancerInterceptoræ¦æªå¨beanï¼ç¨äºå®ç°å¯¹å®¢æ·ç«¯å起请æ±æ¶è¿è¡æ¦æªï¼ä»¥å®ç°å®¢æ·ç«¯è´è½½åè¡¡ãå建äºä¸ä¸ª
RestTemplateCustomizerçbeanï¼ç¨äºç»RestTemplateå¢å LoadBalancerInterceptoræ¦æªå¨ã
æ¯æ¬¡è¯·æ±çæ¶åé½ä¼æ§è¡org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptorçinterceptæ¹æ³ï¼èLoadBalancerInterceptorå ·æLoadBalancerClientï¼å®¢æ·ç«¯è´è½½å®¢æ·ç«¯ï¼å®ä¾çä¸ä¸ªå¼ç¨ï¼
å¨æ¦æªå¨ä¸éè¿æ¹æ³è·åæå¡åç请æ±urlï¼æ¯å¦/p/1bddb5dc
Spring cloudç³»åå Ribbonçåè½æ¦è¿°ã主è¦ç»ä»¶åå±æ§æ件é ç½®
/p/faffa
æ¬äººæéäºç¬è®°ä¸è®°å½çåèæç«
ææ¡£ï¼_ribbon è´è½½åè¡¡.note
é¾æ¥ï¼/noteshare?id=efc3efbbefd8ed0b9&sub=B0E6DFEEBDAF
一文搞懂大数据批量处理框架Spring Batch的完美解析方案是什么。
如今微服务架构讨论的代码如火如荼。但在企业架构里除了大量的架源架源OLTP交易外,还存在海量的码框批处理交易。在诸如银行的代码量学回踩黄金线公式源码金融机构中,每天有3-4万笔的架源架源批处理作业需要处理。针对OLTP,码框业界有大量的代码开源框架、优秀的架源架源架构设计给予支撑;但批处理领域的框架确凤毛麟角。是码框时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以SpringBatch为例,代码和大家一起探秘批处理的架源架源世界。
初识批处理典型场景探秘领域模型及关键架构实现作业健壮性与扩展性批处理框架的码框不足与增强批处理典型业务场景
对账是典型的批处理业务处理场景,各个金融机构的代码往来业务和跨主机系统的业务都会涉及到对账的过程,如大小额支付、银联交易、人行往来、现金管理、POS业务、ATM业务、证券公司资金账户、证券公司与证券结算公司。
下面是某行网银的部分日终跑批实例场景需求。
涉及到的需求点包括:
批量的每个单元都需要错误处理和回退;每个单元在不同平台中运行;需要有分支选择;每个单元需要监控和获取单元处理日志;提供多种触发规则,按日期,日历,周期触发;
除此之外典型的批处理适用于如下的业务场景:
定期提交批处理任务(日终处理)并行批处理:并行处理任务企业消息驱动处理大规模的并行处理手动或定时重启按顺序处理依赖的任务(可扩展为工作流驱动的批处理)部分处理:忽略记录(例如在回滚时)完整的批处理事务
与OLTP类型交易不同,批处理作业两个典型特征是批量执行与自动执行(需要无人值守):前者能够处理大批量数据的导入、导出和业务逻辑计算;后者无需人工干预,能够自动化执行批量任务。
在关注其基本功能之外,还需要关注如下的几点:
健壮性:不会因为无效数据或错误数据导致程序崩溃;可靠性:通过跟踪、监控、日志及相关的处理策略(重试、跳过、重启)实现批作业的可靠执行;扩展性:通过并发或者并行技术实现应用的纵向和横向扩展,满足海量数据处理的性能需求;
苦于业界真的缺少比较好的批处理框架,SpringBatch是业界目前为数不多的优秀批处理框架(Java语言开发),SpringSource和Accenture(埃森哲)共同贡献了智慧。
Accenture在批处理架构上有着丰富的工业级别的经验,贡献了之前专用的批处理体系框架(这些框架历经数十年研发和使用,为SpringBatch提供了大量的参考经验)。
SpringSource则有着深刻的技术认知和Spring框架编程模型,同时借鉴了JCL(JobControlLanguage)和COBOL的语言特性。年JSR-将批处理纳入规范体系,并被包含在了JEE7之中。这意味着,所有的JEE7应用服务器都会有批处理的能力,目前第一个实现此规范的应用服务器是Glassfish4。当然也可以在JavaSE中使用。
但最为关键的一点是:JSR-规范大量借鉴了SpringBatch框架的设计思路,从上图中的核心模型和概念中可以看出究竟,核心的概念模型完全一致。
通过SpringBatch框架可以构建出轻量级的健壮的并行处理应用,支持事务、并发、流程、监控、纵向和横向扩展,提供统一的接口管理和任务管理。
框架提供了诸如以下的核心能力,让大家更关注在业务处理上。更是提供了如下的丰富能力:
明确分离批处理的执行环境和应用将通用核心的服务以接口形式提供提供“开箱即用”的简单的默认的核心执行接口提供Spring框架中配置、自定义、和扩展服务所有默认实现的核心服务能够容易的被扩展与替换,不会影响基础层提供一个简单的部署模式,使用Maven进行编译批处理关键领域模型及关键架构
先来个HelloWorld示例,一个典型的批处理作业。
典型的一个作业分为3部分:作业读、作业处理、作业写,也是典型的三步式架构。整个批处理框架基本上围绕Read、Process、Writer来处理。除此之外,框架提供了作业调度器、作业仓库(用以存放Job的元数据信息,支持内存、京东邮件源码DB两种模式)。
完整的领域概念模型参加下图:
JobLauncher(作业调度器)是SpringBatch框架基础设施层提供的运行Job的能力。通过给定的Job名称和作JobParameters,可以通过JobLauncher执行Job。
通过JobLauncher可以在Java程序中调用批处理任务,也可以在通过命令行或者其它框架(如定时调度框架Quartz)中调用批处理任务。
JobRepository来存储Job执行期的元数据(这里的元数据是指JobInstance、JobExecution、JobParameters、StepExecution、ExecutionContext等数据),并提供两种默认实现。
一种是存放在内存中;另一种将元数据存放在数据库中。通过将元数据存放在数据库中,可以随时监控批处理Job的执行状态。Job执行结果是成功还是失败,并且使得在Job失败的情况下重新启动Job成为可能。Step表示作业中的一个完整步骤,一个Job可以有一个或者多个Step组成。
批处理框架运行期的模型也非常简单:
JobInstance(作业实例)是一个运行期的概念,Job每执行一次都会涉及到一个JobInstance。
JobInstance来源可能有两种:一种是根据设置的JobParameters从JobRepository(作业仓库)中获取一个;如果根据JobParameters从JobRepository没有获取JobInstance,则新创建一个新的JobInstance。
JobExecution表示Job执行的句柄,一次Job的执行可能成功也可能失败。只有Job执行成功后,对应的JobInstance才会被完成。因此在Job执行失败的情况下,会有一个JobInstance对应多个JobExecution的场景发生。
总结下批处理的典型概念模型,其设计非常精简的十个概念,完整支撑了整个框架。
Job提供的核心能力包括作业的抽象与继承,类似面向对象中的概念。对于执行异常的作业,提供重启的能力。
框架在Job层面,同样提供了作业编排的概念,包括顺序、条件、并行作业编排。
在一个Job中配置多个Step。不同的Step间可以顺序执行,也可以按照不同的条件有选择的执行(条件通常使用Step的退出状态决定),通过next元素或者decision元素来定义跳转规则;
为了提高多个Step的执行效率,框架提供了Step并行执行的能力(使用split进行声明,通常该情况下需要Step之间没有任何的依赖关系,否则容易引起业务上的错误)。Step包含了一个实际运行的批处理任务中的所有必需的信息,其实现可以是非常简单的业务实现,也可以是非常复杂的业务处理,Step的复杂程度通常是业务决定的。
每个Step由ItemReader、ItemProcessor、ItemWriter组成,当然根据不同的业务需求,ItemProcessor可以做适当的精简。同时框架提供了大量的ItemReader、ItemWriter的实现,提供了对FlatFile、XML、Json、DataBase、Message等多种数据类型的支持。
框架还为Step提供了重启、事务、重启次数、并发数;以及提交间隔、异常跳过、重试、完成策略等能力。基于Step的灵活配置,可以完成常见的业务功能需求。其中三步走(Read、Processor、Writer)是音效网源码批处理中的经典抽象。
作为面向批的处理,在Step层提供了多次读、处理,一次提交的能力。
在Chunk的操作中,可以通过属性commit-interval设置read多少条记录后进行一次提交。通过设置commit-interval的间隔值,减少提交频次,降低资源使用率。Step的每一次提交作为一个完整的事务存在。默认采用Spring提供的声明式事务管理模式,事务编排非常方便。如下是一个声明事务的示例:
框架对于事务的支持能力包括:
Chunk支持事务管理,通过commit-interval设置每次提交的记录数;支持对每个Tasklet设置细粒度的事务配置:隔离界别、传播行为、超时;支持rollback和norollback,通过skippable-exception-classes和no-rollback-exception-classes进行支撑;支持JMSQueue的事务级别配置;
另外,在框架资深的模型抽象方面,SpringBatch也做了极为精简的抽象。
仅仅使用六张业务表存储了所有的元数据信息(包括Job、Step的实例,上下文,执行器信息,为后续的监控、重启、重试、状态恢复等提供了可能)。
BATCH_JOB_INSTANCE:作业实例表,用于存放Job的实例信息BATCH_JOB_EXECUTION_PARAMS:作业参数表,用于存放每个Job执行时候的参数信息,该参数实际对应Job实例的。BATCH_JOB_EXECUTION:作业执行器表,用于存放当前作业的执行信息,比如创建时间,执行开始时间,执行结束时间,执行的那个Job实例,执行状态等。BATCH_JOB_EXECUTION_CONTEXT:作业执行上下文表,用于存放作业执行器上下文的信息。BATCH_STEP_EXECUTION:作业步执行器表,用于存放每个Step执行器的信息,比如作业步开始执行时间,执行完成时间,执行状态,读写次数,跳过次数等信息。BATCH_STEP_EXECUTION_CONTEXT:作业步执行上下文表,用于存放每个作业步上下文的信息。实现作业的健壮性与扩展性
批处理要求Job必须有较强的健壮性,通常Job是批量处理数据、无人值守的,这要求在Job执行期间能够应对各种发生的异常、错误,并对Job执行进行有效的跟踪。
一个健壮的Job通常需要具备如下的几个特性:
1.容错性
在Job执行期间非致命的异常,Job执行框架应能够进行有效的容错处理,而不是让整个Job执行失败;通常只有致命的、导致业务不正确的异常才可以终止Job的执行。
2.可追踪性
Job执行期间任何发生错误的地方都需要进行有效的记录,方便后期对错误点进行有效的处理。例如在Job执行期间任何被忽略处理的记录行需要被有效的记录下来,应用程序维护人员可以针对被忽略的记录后续做有效的处理。
3.可重启性
Job执行期间如果因为异常导致失败,应该能够在失败的点重新启动Job;而不是从头开始重新执行Job。
框架提供了支持上面所有能力的特性,包括Skip(跳过记录处理)、Retry(重试给定的操作)、Restart(从错误点开始重新启动失败的Job):
Skip,在对数据处理期间,如果数据的某几条的格式不能满足要求,可以通过Skip跳过该行记录的处理,让Processor能够顺利的处理其余的记录行。Retry,将给定的操作进行多次重试,在某些情况下操作因为短暂的lede官方源码异常导致执行失败,如网络连接异常、并发处理异常等,可以通过重试的方式避免单次的失败,下次执行操作时候网络恢复正常,不再有并发的异常,这样通过重试的能力可以有效的避免这类短暂的异常。Restart,在Job执行失败后,可以通过重启功能来继续完成Job的执行。在重启时候,批处理框架允许在上次执行失败的点重新启动Job,而不是从头开始执行,这样可以大幅提高Job执行的效率。
对于扩展性,框架提供的扩展能力包括如下的四种模式:
MultithreadedStep多线程执行一个Step;ParallelStep通过多线程并行执行多个Step;RemoteChunking在远端节点上执行分布式Chunk操作;PartitioningStep对数据进行分区,并分开执行;
我们先来看第一种的实现MultithreadedStep:
批处理框架在Job执行时默认使用单个线程完成任务的执行,同时框架提供了线程池的支持(MultithreadedStep模式),可以在Step执行时候进行并行处理,这里的并行是指同一个Step使用线程池进行执行,同一个Step被并行的执行。使用tasklet的属性task-executor可以非常容易的将普通的Step变成多线程Step。
MultithreadedStep的实现示例:
需要注意的是SpringBatch框架提供的大部分的ItemReader、ItemWriter等操作都是线程不安全的。
可以通过扩展的方式显现线程安全的Step。
下面为大家展示一个扩展的实现:
需求:针对数据表的批量处理,实现线程安全的Step,并且支持重启能力,即在执行失败点可以记录批处理的状态。
对于示例中的数据库读取组件JdbcCursorItemReader,在设计数据库表时,在表中增加一个字段Flag,用于标识当前的记录是否已经读取并处理成功,如果处理成功则标识Flag=true,等下次重新读取的时候,对于已经成功读取且处理成功的记录直接跳过处理。
MultithreadedStep(多线程步)提供了多个线程执行一个Step的能力,但这种场景在实际的业务中使用的并不是非常多。
更多的业务场景是Job中不同的Step没有明确的先后顺序,可以在执行期并行的执行。
ParallelStep:提供单个节点横向扩展的能力
使用场景:StepA、StepB两个作业步由不同的线程执行,两者均执行完毕后,StepC才会被执行。
框架提供了并行Step的能力。可以通过Split元素来定义并行的作业流,并制定使用的线程池。
ParallelStep模式的执行效果如下:
每个作业步并行处理不同的记录,示例中三个作业步,处理同一张表中的不同数据。
并行Step提供了在一个节点上横向处理,但随着作业处理量的增加,有可能一台节点无法满足Job的处理,此时我们可以采用远程Step的方式将多个机器节点组合起来完成一个Job的处理。
RemoteChunking:远程Step技术本质上是将对Item读、写的处理逻辑进行分离;通常情况下读的逻辑放在一个节点进行操作,将写操作分发到另外的节点执行。
远程分块是一个把step进行技术分割的工作,不需要对处理数据的结构有明确了解。
任何输入源能够使用单进程读取并在动态分割后作为块发送给远程的工作进程。
远程进程实现了监听者模式,反馈请求、处理数据最终将处理结果异步返回。请求和返回之间的传输会被确保在发送者和单个消费者之间。
在Master节点,作业步负责读取数据,并将读取的数据通过远程技术发送到指定的远端节点上,进行处理,处理完毕后Master负责回收Remote端执行的情况。
在SpringBatch框架中通过两个核心的接口来完成远程Step的任务,分别是ChunkProvider与ChunkProcessor。
ChunkProvider:根据给定的ItemReader操作产生批量的Chunk操作;
ChunkProcessor:负责获取ChunkProvider产生的Chunk操作,执行具体的写逻辑;
SpringBatch中对远程Step没有默认的实现,但我们可以借助SI或者AMQP实现来实现远程通讯能力。
Step本地节点负责读取数据,并通过MessagingGateway将请求发送到远程Step上;远程Step提供了队列的监听器,当请求队列中有消息时候获取请求信息并交给ChunkHander负责处理。华讯源码
接下来我们看下最后一种分区模式;PartitioningStep:分区模式需要对数据的结构有一定的了解,如主键的范围、待处理的文件的名字等。
这种模式的优点在于分区中每一个元素的处理器都能够像一个普通SpringBatch任务的单步一样运行,也不必去实现任何特殊的或是新的模式,来让他们能够更容易配置与测试。
通过分区可以实现以下的优点:
分区实现了更细粒度的扩展;基于分区可以实现高性能的数据切分;分区比远程通常具有更高的扩展性;分区后的处理逻辑,支持本地与远程两种模式;分区作业典型的可以分成两个处理阶段,数据分区、分区处理;
数据分区:根据特殊的规则(例如:根据文件名称,数据的唯一性标识,或者哈希算法)将数据进行合理的数据切片,为不同的切片生成数据执行上下文ExecutionContext、作业步执行器StepExecution。可以通过接口Partitioner生成自定义的分区逻辑,SpringBatch批处理框架默认实现了对多文件的实现org.springframework.batch.core.partition.support.MultiResourcePartitioner;也可以自行扩展接口Partitioner来实现自定义的分区逻辑。
分区处理:通过数据分区后,不同的数据已经被分配到不同的作业步执行器中,接下来需要交给分区处理器进行作业,分区处理器可以本地执行也可以远程执行被划分的作业。接口PartitionHandler定义了分区处理的逻辑,SpringBatch批处理框架默认实现了本地多线程的分区处理org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;也可以自行扩展接口PartitionHandler来实现自定义的分区处理逻辑。
SpringBatch框架提供了对文件分区的支持,实现类org.springframework.batch.core.partition.support.MultiResourcePartitioner提供了对文件分区的默认支持,根据文件名将不同的文件处理进行分区,提升处理的速度和效率,适合有大量小文件需要处理的场景。
示例展示了将不同文件分配到不同的作业步中,使用MultiResourcePartitioner进行分区,意味着每个文件会被分配到一个不同的分区中。如果有其它的分区规则,可以通过实现接口Partitioner来进行自定义的扩展。有兴趣的TX,可以自己实现基于数据库的分区能力哦。
总结一下,批处理框架在扩展性上提供了4中不同能力,每种都是各自的使用场景,我们可以根据实际的业务需要进行选择。
批处理框架的不足与增强
SpringBatch批处理框架虽然提供了4种不同的监控方式,但从目前的使用情况来看,都不是非常的友好。
通过DB直接查看,对于管理人员来讲,真的不忍直视;通过API实现自定义的查询,这是程序员的天堂,确实运维人员的地狱;提供了Web控制台,进行Job的监控和操作,目前提供的功能太,无法直接用于生产;提供JMX查询方式,对于非开发人员太不友好;
但在企业级应用中面对批量数据处理,仅仅提供批处理框架仅能满足批处理作业的快速开发、执行能力。
企业需要统一的批处理平台来处理复杂的企业批处理应用,批处理平台需要解决作业的统一调度、批处理作业的集中管理和管控、批处理作业的统一监控等能力。
那完美的解决方案是什么呢?
关注我:转发私信回复“架构资料”获取Java高级架构资料、源码、笔记、视频
Dubbo、Redis、设计模式、Netty、zookeeper、Springcloud、分布式、微服务
高并发等架构技术
企业级批处理平台需要在SpringBatch批处理框架的基础上,集成调度框架,通过调度框架可以将任务按照企业的需求进行任务的定期执行;
丰富目前SpringBatchAdmin(SpringBatch的管理监控平台,目前能力比较薄弱)框架,提供对Job的统一管理功能,增强Job作业的监控、预警等能力;
通过与企业的组织机构、权限管理、认证系统进行合理的集成,增强平台对Job作业的权限控制、安全管理能力。
由于时间关系,今天的分享就到这里,很多内容未能展开讨论。欢迎大家在实际业务中使用SpringBatch框架。
最后的话
觉得还不错可以转发关注支持一波~私信架构资料获取一些我私人整理的Java进阶资料!
为什么某些人会一直比你优秀,是因为他本身就很优秀还一直在持续努力变得更优秀。而你是不是还在满足于现状且内心在窃喜?“对于程序员来说,如果哪一天开始他停止了学习,那么他的职业生涯便开始宣告消亡。”所以行动起来,学习起来!
ffplay视频播放原理分析
作者|赵家祝FFmpeg框架由命令行工具和函数库组成,ffplay是其中的一种命令行工具,提供了播放音视频文件的功能,不仅可以播放本地多媒体文件,还可以播放网络流媒体文件。本文从ffplay的整体播放流程出发,借鉴其设计思路,学习如何设计一款简易的播放器。
一、播放器工作流程在学习ffplay源码之前,为了方便理解,我们先宏观了解一下播放器在播放媒体文件时的工作流程。
解协议:媒体文件在网络上传输时,需要经过流媒体协议将媒体数据分段成若干个数据包,这样就可以满足用户一边下载一边观看的需求,而不需要等整个媒体文件都下载完成才能观看。常见的流媒体协议有RTMP、HTTP、HLS、MPEG-DASH、MSS、HDS等。由于流媒体协议中不仅仅包含媒体数据,还包含控制播放的信令数据。因此,解协议是移除协议中的信令数据,输出音视频封装格式数据。
解封装:封装格式也叫容器,就是将已经编码压缩好的视频流和音频流按照一定的格式放到一个文件中,常见的封装格式有MP4、FLV、MPEG2-TS、AVI、MKV、MOV等。解封装是将封装格式数据中的音频流压缩编码数据和视频流压缩编码数据分离,方便在解码阶段使用不同的解码器解码。
解码:压缩编码数据是在原始数据基础上采用不同的编码压缩得到的数据,而解码阶段就是编码的逆向操作。常见的视频压缩编码标准有H./H.、MPEG-2、AV1、V8/9等,音频压缩编码标准有AAC、MP3等。解压后得到的视频图像数据是YUV或RGB,音频采样数据是PCM。
音视频同步:解码后的视频数据和音频数据是独立的,在送给显卡和声卡播放前,需要将视频和音频同步,避免播放进度不一致。
二、main函数ffplay的使用非常简单,以ffplay-iinput.mp4-loop2为例,表示使用ffplay播放器循环播放input.mp4文件两遍。执行该命令时,对应的源码在fftools/ffplay.c中,程序入口函数是main函数。
注:本文ffplay源码基于ffmpeg4.4。
2.1环境初始化
初始化部分主要调用以下函数:
init_dynload:调用SetDllDirectory("")删除动态链接库(DLL)搜索路径中的当前工作目录,是Windows平台下的一种安全预防措施。
av_log_set_flag:设置log打印的标记为AV_LOG_SKIP_REPEATED,即跳过重复消息。
parse_loglevel:解析log的级别,会匹配命令中的-loglevel字段。如果命令中添加-report,会将播放日志输出成文件。
avdevice_register_all:注册特殊设备的封装库。
avformat_network_init:初始化网络资源,可以从网络中拉流。
parse_options:解析命令行参数,示例中的-iinput.mp4和-loop2就是通过这个函数解析的,支持的选项定义在options静态数组中。解析得到的文件名、文件格式分别保存在全局变量input_filename和file_iformat中。
2.2SDL初始化
SDL的全称是SimpleDirectMediaLayer,是一个跨平台的多媒体开发库,支持Linux、Windows、MacOS等多个平台,实际上是对DirectX、OpenGL、Xlib再封装,在不同操作系统上提供了相同的函数。ffplay的播放显示是通过SDL实现的。
main函数中主要调用了以下三个SDL函数:
SDL_Init:初始化SDL库,传入的参数flags,默认支持视频、音频和定时器,如果命令中配置了-an则禁用音频,配置了-vn则禁用视频。
SDL_CreateWindow:创建播放视频的窗口,该函数可以指定窗口的位置、大小,默认是*大小。
SDL_CreateRenderer:为指定的窗口创建渲染器上下文,对应的结构体是SDL_Render。我们既可以使用渲染器创建纹理,也可以渲染视图。
2.3解析媒体流
stream_open函数是ffplay开始播放流程的起点,该函数传入两个参数,分别是文件名input_filename和文件格式file_iformat。下面是函数内部的处理流程:
(1)初始化VideoState:VideoState是ffplay中最大的结构体,所有的视频信息都定义在其中。初始化VideoState时,先定义VideoState结构体指针类型的局部变量is,分配堆内存。然后初始化结构体中的变量,例如视频流、音频流、字幕流的索引,并赋值函数入参filename和iformat。
(2)初始化FrameQueue:FrameQueue是解码后的Frame队列,Frame是解码后的数据,例如视频解码后是YUV或RGB数据,音频解码后是PCM数据。初始化FrameQueue时,会对VideoState中的pictq(视频帧队列)、subpq(字幕帧队列)、sampq(音频帧队列)依次调用frame_queue_init函数进行初始化。FrameQueue内部是通过数组实现了一个先进先出的环形缓冲区,windex是写指针,被解码线程使用;rindex是读指针,被播放线程使用。使用环形缓冲区的好处是,缓冲区内的元素被移除后,其它元素不需要移动位置,适用于事先知道缓冲区最大容量的场景。
(3)初始化PacketQueue:PacketQueue是解码前的Packet队列,用于保存解封装后的数据。初始化PacketQueue时,会对VideoState中的videoq(视频包队列)、audio(音频包队列)、subtitleq(字幕包队列)依次调用packet_queue_init函数进行初始化。不同于FrameQueue,PacketQueue采用链表的方式实现队列。由于解码前的包大小不可控,无法明确缓冲区的最大容量,如果使用环形缓冲区,容易触发缓冲区扩容,需要移动缓冲区内的数据。因此,使用链表实现队列更加合适。
(4)初始化Clock:Clock是时钟,在音视频同步阶段,有三种同步方法:视频同步到音频,音频同步到视频,以及音频和视频同步到外部时钟。初始化Clock时,会对VideoState中的vidclk(视频时钟)、audclk(音频时钟)、extclk(外部时钟)依次调用init_clock函数进行初始化。
(5)限制音量范围:先限制音量范围在0~之间,然后再根据SDL的音量范围作进一步限制。
(6)设置音视频同步方式:ffplay默认采用AV_SYNC_AUDIO_MASTER,即视频同步到音频。
(7)创建读线程:调用SDL_CreateThread创建读线程,同时设置了线程创建成功的回调read_thread函数以及接收参数is(stream_open函数最开始创建的VideoState指针类型的局部变量)。如果线程创建失败,则调用stream_close做销毁逻辑。
(8)返回值:将局部变量is作为函数返回值返回,用于处理下面的各种SDL事件。
2.4SDL事件处理
event_loop函数内部是一个for循环,使用SDL监听用户的键盘按键事件、鼠标点击事件、窗口事件、退出事件等。
三、read_thread函数read_thread函数的作用是从磁盘或者网络中获取流,包括音频流、视频流和字幕流,然后根据可用性创建对应流的解码线程。因此read_thread所在的线程实际上起到了解协议/解封装的作用。核心处理流程可以分为以下步骤:
3.1创建AVFormatContext
AVFormatContext是封装上下文,描述了媒体文件或媒体流的构成和基本信息。avformat_alloc_context函数用于分配内存创建AVFormatContext对象ic。
拿到AVFormatContext对象后,在调用avformat_open_input函数打开文件前,需要设置中断回调函数,用于检查是否应该中断IO操作。
?ic->interrupt_callback.callback=decode_interrupt_cb;ic->interrupt_callback.opaque=is;decode_interrupt_cb内部返回了一个VideoState的abort_request变量,该变量在调用stream_close函数关闭流时会被置为1。
3.2打开输入文件
在准备好前面的一些赋值操作后,就可以开始根据filename打开文件了。avformat_open_input函数用于打开一个文件,并对文件进行解析。如果文件是一个网络链接,则发起网络请求,在网络数据返回后解析音频流、视频流相关的数据。
3.3搜索流信息
搜索流信息使用avformat_find_stream_info函数,该从媒体文件中读取若干个包,然后从其中搜索流相关的信息,最后将搜索到的流信息放到ic->streams指针数组中,数组的大小为ic->nb_streams。
由于在实际播放过程中,用户可以指定是否禁用音频流、视频流、字幕流。因此在解码要处理的流之前,会判断对应的流是否处于不可用状态,如果是可用状态则调用av_find_best_stream函数查找对应流的索引,并保存在st_index数组中。
3.4设置窗口大小
如果找到了视频流的索引,则需要渲染视频画面。由于窗体的大小一般使用默认值*,这个值和视频帧真正的大小可能是不相等的。为了正确显示承载视频画面的窗体,需要计算视频帧的宽高比。调用av_guess_sample_aspect_ration函数猜测帧样本的宽高比,调用set_default_window_size函数重新设置显示窗口的大小和宽高比。
3.5创建解码线程
根据st_index判断音频流、视频流、字幕流的索引是否找到,如果找到了就依次调用stream_component_open创建对应流的解码线程。
3.6解封装处理
接下来是一个for(;;)循环:
(1)响应中断停止、暂停/继续、Seek操作;
(2)判断PacketQueue队列是否满了,如果满了就休眠ms,继续循环;
(3)调用av_read_frame从码流中读取若干个音频帧或一个视频帧;
(4)从输入文件中读取一个AVPacket,判断当前AVPacket是否在播放时间范围内,如果是则调用packet_queue_put函数,根据类型将其放在音频/视频/字幕的PacketQueue中。
四、stream_component_open函数3.5小节讲到,stream_component_open函数负责创建不同流的解码线程。那么它是如何创建解码线程的呢?
4.1创建AVCodecContext
AVCodecContext是编解码器上下文,保存音视频编解码相关的信息。使用avcodec_alloc_context3函数分配空间,使用avcodec_free_context函数释放空间。
4.2查找解码器
根据解码器的id,调用avcodec_find_decoder函数,查找对应的解码器。与之类似的一个函数是avcodec_find_encoder,用于查找FFmpeg的编码器。两个函数返回的结构体都是AVCodec。
如果指定了解码器名称,则需要调用avcodec_find_decoder_by_name函数查找解码器。
不管是哪种方式查找解码器,如果没有找到解码器,都会抛异常退出流程。
4.3解码器初始化
找到解码器后,需要打开解码器,并对解码器初始化,对应的函数是avcodec_open2,该函数也支持编码器的初始化。
4.4创建解码线程
判断解码类型,创建不同的解码线程。
switch(avctx->codec_type){ caseAVMEDIA_TYPE_AUDIO://音频...if((ret=decoder_init(&is->auddec,avctx,&is->audioq,is->continue_read_thread))<0)gotofail;...if((ret=decoder_start(&is->auddec,audio_thread,"audio_decoder",is))<0)gotoout;...caseAVMEDIA_TYPE_VIDEO://视频...if((ret=decoder_init(&is->viddec,avctx,&is->videoq,is->continue_read_thread))<0)gotofail;if((ret=decoder_start(&is->viddec,video_thread,"video_decoder",is))<0)gotoout;...caseAVMEDIA_TYPE_SUBTITLE://字幕...if((ret=decoder_init(&is->subdec,avctx,&is->subtitleq,is->continue_read_thread))<0)gotofail;if((ret=decoder_start(&is->subdec,subtitle_thread,"subtitle_decoder",is))<0)gotoout;...}线程创建在decoder_start函数中,依然使用SDL创建线程的方式,调用SDL_CreateThread函数。
五、video_thread函数视频解码线程从视频的PacketQueue中不断读取AVPacket,解码完成后将AVFrame放入视频FrameQueue。音频的解码实现和视频类似,这里仅介绍视频的解码过程。
5.1创建AVFrame
AVFrame描述解码后的原始音频数据或视频数据,通过av_frame_alloc函数分配内存,通过av_frame_free函数释放内存。
5.2视频解码
开启for(;;)循环,不断调用get_video_frame函数解码一个视频帧。该函数主要调用了decoder_decode_frame函数解码,decoder_decode_frame函数对音频、视频、字幕都进行了处理,主要依靠FFmpeg的avcodec_receive_frame函数获取解码器解码输出的数据。
拿到解码后的视频帧后,会根据音视频同步的方式和命令行的-framedrop选项,判断是否需要丢弃失去同步的视频帧。
命令行带-framedrop选项,无论哪种音视频同步机制,都会丢弃失去同步的视频帧。
命令行带-noframedrop选项,无论哪种音视频同步机制,都不会丢弃失去同步的视频帧。
命令行不带-framedrop或-noframedrop选项,若音视频同步机制为同步到视频,则不丢弃失去同步的视频帧,否则会丢弃失去同步的视频帧。
5.3放入FrameQueue
调用queue_picture函数,将AVFrame放入FrameQueue。该函数内部调用了frame_queue_push函数,采用了环形缓冲区的处理方式,对写指针windex累加。
staticvoidframe_queue_push(FrameQueue*f){ if(++f->windex==f->max_size)f->windex=0;SDL_LockMutex(f->mutex);f->size++;SDL_CondSignal(f->cond);SDL_UnlockMutex(f->mutex);}六、音视频同步ffplay默认采用将视频同步到音频的方式,分以下三种情况:
如果视频和音频进度一致,不需要同步;
如果视频落后音频,则丢弃当前帧直接播放下一帧,人眼感觉跳帧了;
如果视频超前音频,则重复显示上一帧,等待音频,人眼感觉视频画面停止了,但是有声音在播放;
ffplay视频同步到音频的逻辑在视频播放函数video_refresh中实现。该函数的调用链是:main()->event_loop()->refresh_loop_wait_event()->video_refresh。
6.1判断播放完成
调用frame_queue_nb_remaing函数计算剩余没有显示的帧数是否等于0,如果是,则不需要走剩下的步骤。计算过程比较简单,用FrameQueue的size-rindex_shown,size是FrameQueue的大小,rindex_shown表示rindex指向的节点是否已经显示,如果已经显示则为1,否则为0。
6.2播放序列匹配
****分别调用frame_queue_peek_last和frame_queue_peek函数从FrameQueue中获取上一帧和当前帧,上一帧是上次已经显示的帧,当前帧是当前待显示的帧。
(1)比较当前帧和当前PacketQueue的播放序列serial是否相等:
如果不等,重试视频播放的逻辑;
如果相等,则进入(2)流程判断;
注:serial是用来区分是不是连续的数据,如果发生了seek,会开始一个新的播放序列,
(2)比较上一帧和当前帧的播放序列serial是否相等:
如果不相等,则将frame_timer更新为当前时间;
如果相等,不处理并进入下一流程
6.3判断是否重复上一帧
(1)将上一帧lastvp和当前帧vp传入vp_duration函数,通过vp->pts-lastvp->pts计算上一帧的播放时长。
注:pts全称是PresentationTimeStamp,显示时间戳,表示解码后得到的帧的显示时间。
(2)在compute_target_delay函数中,调用get_clock函数获取视频时钟,调用get_master_clock函数获取同步时钟,计算两个时钟的差值,根据差值计算需要delay的时间。
(3)如果当前帧播放时刻(is->frame_timer+delay)大于当前时刻(time),表示当前帧的播放时间还没有到,相当于当前视频超前音频了,则需要将上一帧再播放一遍。
last_duration=vp_duration(is,lastvp,vp);delay=compute_target_delay(last_duration,is);time=av_gettime_relative()/.0;if(time<is->frame_timer+delay){ *remaining_time=FFMIN(is->frame_timer+delay-time,*remaining_time);gotodisplay;}6.4判断是否丢弃未播放的帧
如果当前队列中的帧数大于1,则需要考虑丢帧,只有一帧的时候不考虑丢帧。
(1)调用frame_queue_peek_next函数获取下一帧(下一个待显示的帧),根据当前帧和下一帧计算当前帧的播放时长,计算过程和6.3相同。
(2)满足以下条件时,开始丢帧:
当前播放模式不是步进模式;
丢帧策略生效:framedrop>0,或者当前音视频同步策略不是音频到视频。
当前帧vp还没有来得及播放,但是下一帧的播放时刻(is->frame_timer+duration)已经小于当前系统时刻(time)了。
(3)丢帧时,将is->frame_drops_late++,并调用frame_queue_next函数将上一帧删除,更新FrameQueue的读指针rindex和size。
if(frame_queue_nb_remaining(&is->pictq)>1){ Frame*nextvp=frame_queue_peek_next(&is->pictq);duration=vp_duration(is,vp,nextvp);if(!is->step&&(framedrop>0||(framedrop&&get_master_sync_type(is)!=AV_SYNC_VIDEO_MASTER))&&time>is->frame_timer+duration){ is->frame_drops_late++;frame_queue_next(&is->pictq);gotoretry;}}七、渲染ffplay最终的图像渲染是由SDL完成的,在video_display中调用了SDL_RenderPresent(render)函数,其中render参数是最开始在main函数中创建的。在渲染之前,需要将解码得到的视频帧数据转换为SDL支持的图像格式。转换过程在upload_texture函数中实现,细节不在此处分析。
音频类似,如果解码得到的音频不能被SDL支持,需要对音频进行重采样,将音频帧格式转换为SDL支持的格式。
八、小结本文从整体播放流程出发,介绍了ffplay播放器播放媒体文件的主要流程,不深陷于代码细节。同时,对FFmpeg的一些常用函数有了一些了解,对我们自己手写一个简单的播放器有很大的帮助。
----------END----------2024-12-29 00:21
2024-12-28 23:22
2024-12-28 22:28
2024-12-28 22:07
2024-12-28 21:54