Source code for

import threading

[docs]def stream_timeout(stream, timeout, timeout_msg=None): """ Iterate over items in a streaming response from the Docker client within a timeout. :param ~docker.types.daemon.CancellableStream stream: Stream from the Docker client to consume items from. :param timeout: Timeout value in seconds. :param timeout_msg: Message to raise in the exception when a timeout occurs. """ timed_out = threading.Event() def timeout_func(): timed_out.set() stream.close() timer = threading.Timer(timeout, timeout_func) try: timer.start() for item in stream: yield item # A timeout looks the same as the loop ending. So we need to check a # flag to determine whether a timeout occurred or not. if timed_out.is_set(): raise TimeoutError(timeout_msg) finally: timer.cancel() # Close the stream's underlying response object (if it has one) to # avoid potential socket leaks. # This method seems to have more success at preventing ResourceWarnings # than just stream.close() (should this be improved upstream?) # FIXME: Potential race condition if Timer thread closes the stream at # the same time we do here, but hopefully not with serious side effects if hasattr(stream, '_response'): stream._response.close()