@@ -75,44 +75,65 @@ def process_batch(predictor, input_batch, optimal_batch_size):
75
75
return results , last_predictor_success , received_at , predicted_at
76
76
77
77
78
+ to_process = {}
79
+ current_sum_of_to_process = 0
80
+
81
+
78
82
def fetch_batch (
79
83
main_index ,
80
84
predictor_sequence ,
81
85
optimal_batch_size ,
82
86
max_wait_time_for_batch_collection ,
83
87
):
88
+ global to_process
89
+ global current_sum_of_to_process
90
+
84
91
unique_id_wise_input_count = {}
85
92
input_batch = []
86
93
current_batch_length = 0
87
94
batch_collection_started_at = time .time ()
88
95
last_input_received_at = time .time ()
89
96
90
97
while current_batch_length < optimal_batch_size :
91
- to_process = main_index .search (
92
- query = {
93
- "-1.predicted_at" : 0 , # prediction not yet done
94
- "last_predictor_success" : True , # last predictor success
95
- "last_predictor_sequence" : predictor_sequence
96
- - 1 , # last predictor sequence
97
- "timedout_in_queue" : {"$ne" : True }, # not timedout in queue
98
- },
99
- n = optimal_batch_size ,
100
- select_keys = [f"{ predictor_sequence - 1 } .outputs" ],
101
- update = {
102
- "last_predictor_sequence" : predictor_sequence , # set last predictor sequence to current predictor sequence
103
- "last_predictor_success" : None , # reset last predictor success
104
- f"{ predictor_sequence } .received_at" : time .time (), # set received at to current time
105
- },
106
- )
98
+ if current_sum_of_to_process < optimal_batch_size :
99
+ to_process .update (
100
+ main_index .search (
101
+ query = {
102
+ "-1.predicted_at" : 0 , # prediction not yet done
103
+ "last_predictor_success" : True , # last predictor success
104
+ "last_predictor_sequence" : predictor_sequence - 1 , # last predictor sequence
105
+ "timedout_in_queue" : {"$ne" : True }, # not timedout in queue
106
+ },
107
+ n = optimal_batch_size ,
108
+ select_keys = [f"{ predictor_sequence - 1 } .outputs" ],
109
+ update = {
110
+ "last_predictor_sequence" : predictor_sequence , # set last predictor sequence to current predictor sequence
111
+ "last_predictor_success" : None , # reset last predictor success
112
+ f"{ predictor_sequence } .received_at" : time .time (), # set received at to current time
113
+ },
114
+ )
115
+ )
107
116
108
117
for unique_id , data in to_process .items ():
118
+ if current_batch_length > optimal_batch_size * 0.8 :
119
+ break
109
120
outputs = data [f"{ predictor_sequence - 1 } .outputs" ]
110
121
input_count = len (outputs )
111
122
unique_id_wise_input_count [unique_id ] = input_count
112
123
input_batch .extend (outputs )
113
124
current_batch_length += input_count
114
125
last_input_received_at = time .time ()
115
126
127
+ for unique_id in unique_id_wise_input_count .keys ():
128
+ try :
129
+ del to_process [unique_id ]
130
+ except :
131
+ pass
132
+
133
+ current_sum_of_to_process = sum (
134
+ len (v [f"{ predictor_sequence - 1 } .outputs" ]) for v in to_process .values ()
135
+ )
136
+
116
137
if current_batch_length == 0 :
117
138
if time .time () - last_input_received_at > 5 :
118
139
time .sleep (0.05 )
@@ -133,8 +154,9 @@ def fetch_batch(
133
154
break
134
155
135
156
_utils .logger .info (
136
- f"Fetched batch { [ v for v in unique_id_wise_input_count . values ()] } "
157
+ f"Fetched batch { unique_id_wise_input_count } with { current_sum_of_to_process } remaining in memory, to_process: { len ( to_process ) } "
137
158
)
159
+
138
160
return unique_id_wise_input_count , input_batch
139
161
140
162
0 commit comments