44
55use Ensi \LaravelPhpRdKafka \KafkaManager ;
66use Ensi \LaravelPhpRdKafkaConsumer \Exceptions \KafkaConsumerException ;
7+ use Illuminate \Pipeline \Pipeline ;
78use RdKafka \Exception as RdKafkaException ;
89use RdKafka \KafkaConsumer ;
910use RdKafka \Message ;
1011use Throwable ;
1112
1213class HighLevelConsumer
1314{
14- protected KafkaConsumer $ consumer ;
15+ protected ? KafkaConsumer $ consumer ;
1516
1617 public function __construct (
17- protected string $ topicName ,
18- ?string $ consumerName = null ,
19- protected ConsumerOptions $ options ,
20- )
18+ protected KafkaManager $ kafkaManager ,
19+ protected Pipeline $ pipeline
20+ ) {
21+ }
22+
23+ public function for (?string $ consumerName ): static
2124 {
22- $ manager = resolve (KafkaManager::class);
23- $ this ->consumer = is_null ($ consumerName ) ? $ manager ->consumer () : $ manager ->consumer ($ consumerName );
25+ $ this ->consumer = is_null ($ consumerName )
26+ ? $ this ->kafkaManager ->consumer ()
27+ : $ this ->kafkaManager ->consumer ($ consumerName );
28+
29+ return $ this ;
2430 }
2531
2632 /**
2733 * @throws KafkaException
2834 * @throws RdKafkaException
2935 * @throws Throwable
3036 */
31- public function listen (string $ processorClassName , string $ processorType , string | bool $ processorQueue ): void
37+ public function listen (string $ topicName , ProcessorData $ processorData , ConsumerOptions $ options ): void
3238 {
33- $ this ->consumer ->subscribe ([ $ this -> topicName ]);
39+ $ this ->consumer ->subscribe ([ $ topicName ]);
3440
3541 [$ startTime , $ eventsProcessed ] = [hrtime (true ) / 1e9 , 0 ];
3642
3743 while (true ) {
38- $ message = $ this ->consumer ->consume ($ this -> options ->consumeTimeout );
44+ $ message = $ this ->consumer ->consume ($ options ->consumeTimeout );
3945
4046 switch ($ message ->err ) {
4147
4248 case RD_KAFKA_RESP_ERR_NO_ERROR :
43- $ this ->executeProcessor ( $ processorClassName , $ processorType , $ processorQueue , $ message );
49+ $ this ->processThroughMiddleware ( $ processorData , $ message , $ options );
4450 $ this ->consumer ->commitAsync ($ message );
4551 $ eventsProcessed ++;
52+
4653 break ;
4754
4855 case RD_KAFKA_RESP_ERR__TIMED_OUT :
@@ -55,45 +62,56 @@ public function listen(string $processorClassName, string $processorType, string
5562 throw new KafkaConsumerException ('Kafka error: ' . $ message ->errstr ());
5663 }
5764
58- if ($ this ->shouldBeStopped ($ startTime , $ eventsProcessed )) {
65+ if ($ this ->shouldBeStopped ($ startTime , $ eventsProcessed, $ options )) {
5966 break ;
6067 }
6168 }
6269 }
6370
64- protected function executeProcessor ( string $ className , string $ type , string | bool $ queue , Message $ message ): void
71+ protected function processThroughMiddleware ( ProcessorData $ processorData , Message $ message , ConsumerOptions $ options ): void
6572 {
66- $ queue
67- ? $ this ->executeQueueableProcessor ($ className , $ type , $ queue , $ message )
68- : $ this ->executeSyncProcessor ($ className , $ type , $ message );
73+ $ this ->pipeline
74+ ->send ($ message )
75+ ->through ($ options ->middleware )
76+ ->then (fn (Message $ message ) => $ this ->executeProcessor ($ processorData , $ message ));
6977 }
7078
71- protected function executeSyncProcessor ( string $ className , string $ type , Message $ message ): void
79+ protected function executeProcessor ( ProcessorData $ processorData , Message $ message ): void
7280 {
73- if ($ type === 'job ' ) {
81+ $ processorData ->queue
82+ ? $ this ->executeQueueableProcessor ($ processorData , $ message )
83+ : $ this ->executeSyncProcessor ($ processorData , $ message );
84+ }
85+
86+ protected function executeSyncProcessor (ProcessorData $ processorData , Message $ message ): void
87+ {
88+ $ className = $ processorData ->class ;
89+ if ($ processorData ->type === 'job ' ) {
7490 $ className ::dispatchSync ($ message );
75- } elseif ($ type === 'action ' ) {
91+ } elseif ($ processorData -> type === 'action ' ) {
7692 resolve ($ className )->execute ($ message );
7793 }
7894 }
7995
80- protected function executeQueueableProcessor (string $ className , string $ type , string | bool $ queue , Message $ message ): void
96+ protected function executeQueueableProcessor (ProcessorData $ processorData , Message $ message ): void
8197 {
82- if ($ type === 'job ' ) {
98+ $ className = $ processorData ->class ;
99+ $ queue = $ processorData ->queue ;
100+ if ($ processorData ->type === 'job ' ) {
83101 is_string ($ queue ) ? $ className ::dispatch ($ message )->onQueue ($ queue ) : $ className ::dispatch ($ message );
84- } elseif ($ type === 'action ' ) {
102+ } elseif ($ processorData -> type === 'action ' ) {
85103 $ processor = resolve ($ className );
86104 is_string ($ queue ) ? $ processor ->onQueue ($ queue )->execute ($ message ) : $ processor ->execute ($ message );
87105 }
88106 }
89107
90- protected function shouldBeStopped (int |float $ startTime , int $ eventsProcessed ): bool
108+ protected function shouldBeStopped (int |float $ startTime , int $ eventsProcessed, ConsumerOptions $ options ): bool
91109 {
92- if ($ this -> options ->maxTime && hrtime (true ) / 1e9 - $ startTime >= $ this -> options ->maxTime ) {
110+ if ($ options ->maxTime && hrtime (true ) / 1e9 - $ startTime >= $ options ->maxTime ) {
93111 return true ;
94- }
112+ }
95113
96- if ($ this -> options ->maxEvents && $ eventsProcessed >= $ this -> options ->maxEvents ) {
114+ if ($ options ->maxEvents && $ eventsProcessed >= $ options ->maxEvents ) {
97115 return true ;
98116 }
99117
0 commit comments