@@ -53,12 +53,19 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT
5353 self .flush_count = flush_count
5454
5555 def get (self ):
56- """Retrieve an item from the queue. Recalculate queue size."""
57- row = self ._queue .get ()
58- mutation_size = row .get_mutations_size ()
59- self .total_mutation_count -= len (row ._get_mutations ())
60- self .total_size -= mutation_size
61- return row
56+ """
57+ Retrieve an item from the queue. Recalculate queue size.
58+
59+ If the queue is empty, return None.
60+ """
61+ try :
62+ row = self ._queue .get_nowait ()
63+ mutation_size = row .get_mutations_size ()
64+ self .total_mutation_count -= len (row ._get_mutations ())
65+ self .total_size -= mutation_size
66+ return row
67+ except queue .Empty :
68+ return None
6269
6370 def put (self , item ):
6471 """Insert an item to the queue. Recalculate queue size."""
@@ -79,9 +86,6 @@ def full(self):
7986 return True
8087 return False
8188
82- def empty (self ):
83- return self ._queue .empty ()
84-
8589
8690@dataclass
8791class _BatchInfo :
@@ -292,8 +296,10 @@ def flush(self):
292296 * :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
293297 """
294298 rows_to_flush = []
295- while not self ._rows .empty ():
296- rows_to_flush .append (self ._rows .get ())
299+ row = self ._rows .get ()
300+ while row is not None :
301+ rows_to_flush .append (row )
302+ row = self ._rows .get ()
297303 response = self ._flush_rows (rows_to_flush )
298304 return response
299305
@@ -303,58 +309,68 @@ def _flush_async(self):
303309 :raises:
304310 * :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
305311 """
306-
307- rows_to_flush = []
308- mutations_count = 0
309- mutations_size = 0
310- rows_count = 0
311- batch_info = _BatchInfo ()
312-
313- while not self ._rows .empty ():
314- row = self ._rows .get ()
315- mutations_count += len (row ._get_mutations ())
316- mutations_size += row .get_mutations_size ()
317- rows_count += 1
318- rows_to_flush .append (row )
319- batch_info .mutations_count = mutations_count
320- batch_info .rows_count = rows_count
321- batch_info .mutations_size = mutations_size
322-
323- if (
324- rows_count >= self .flush_count
325- or mutations_size >= self .max_row_bytes
326- or mutations_count >= self .flow_control .max_mutations
327- or mutations_size >= self .flow_control .max_mutation_bytes
328- or self ._rows .empty () # submit when it reached the end of the queue
312+ next_row = self ._rows .get ()
313+ while next_row is not None :
314+ # start a new batch
315+ rows_to_flush = [next_row ]
316+ batch_info = _BatchInfo (
317+ mutations_count = len (next_row ._get_mutations ()),
318+ rows_count = 1 ,
319+ mutations_size = next_row .get_mutations_size (),
320+ )
321+ # fill up batch with rows
322+ next_row = self ._rows .get ()
323+ while next_row is not None and self ._row_fits_in_batch (
324+ next_row , batch_info
329325 ):
330- # wait for resources to become available, before submitting any new batch
331- self . flow_control . wait ( )
332- # once unblocked, submit a batch
333- # event flag will be set by control_flow to block subsequent thread, but not blocking this one
334- self .flow_control . control_flow ( batch_info )
335- future = self . _executor . submit ( self . _flush_rows , rows_to_flush )
336- self . futures_mapping [ future ] = batch_info
337- future . add_done_callback ( self ._batch_completed_callback )
338-
339- # reset and start a new batch
340- rows_to_flush = []
341- mutations_size = 0
342- rows_count = 0
343- mutations_count = 0
344- batch_info = _BatchInfo ( )
326+ rows_to_flush . append ( next_row )
327+ batch_info . mutations_count += len ( next_row . _get_mutations () )
328+ batch_info . rows_count += 1
329+ batch_info . mutations_size += next_row . get_mutations_size ()
330+ next_row = self ._rows . get ( )
331+ # send batch over network
332+ # wait for resources to become available
333+ self .flow_control . wait ( )
334+ # once unblocked, submit the batch
335+ # event flag will be set by control_flow to block subsequent thread, but not blocking this one
336+ self . flow_control . control_flow ( batch_info )
337+ future = self . _executor . submit ( self . _flush_rows , rows_to_flush )
338+ # schedule release of resources from flow control
339+ self . futures_mapping [ future ] = batch_info
340+ future . add_done_callback ( self . _batch_completed_callback )
345341
346342 def _batch_completed_callback (self , future ):
347343 """Callback for when the mutation has finished to clean up the current batch
348344 and release items from the flow controller.
349-
350345 Raise exceptions if there's any.
351346 Release the resources locked by the flow control and allow enqueued tasks to be run.
352347 """
353-
354348 processed_rows = self .futures_mapping [future ]
355349 self .flow_control .release (processed_rows )
356350 del self .futures_mapping [future ]
357351
352+ def _row_fits_in_batch (self , row , batch_info ):
353+ """Checks if a row can fit in the current batch.
354+
355+ :type row: class
356+ :param row: :class:`~google.cloud.bigtable.row.DirectRow`.
357+
358+ :type batch_info: :class:`_BatchInfo`
359+ :param batch_info: Information about the current batch.
360+
361+ :rtype: bool
362+ :returns: True if the row can fit in the current batch.
363+ """
364+ new_rows_count = batch_info .rows_count + 1
365+ new_mutations_count = batch_info .mutations_count + len (row ._get_mutations ())
366+ new_mutations_size = batch_info .mutations_size + row .get_mutations_size ()
367+ return (
368+ new_rows_count <= self .flush_count
369+ and new_mutations_size <= self .max_row_bytes
370+ and new_mutations_count <= self .flow_control .max_mutations
371+ and new_mutations_size <= self .flow_control .max_mutation_bytes
372+ )
373+
358374 def _flush_rows (self , rows_to_flush ):
359375 """Mutate the specified rows.
360376
0 commit comments