print("CPU数量:", str(multiprocessing.cpu_count())) # 可以查看当前正在运行的子进程的信息 for p in multiprocessing.active_children(): print("子进程名称:", p.name, "子进程id:", p.pid)
# put 向队列中添加数据 q.put(15) q.put(59) print(q.qsize())
# 超时时间为2秒 q.put('PolarSnow', timeout=2)
# 获取当前队列长度 print(q.qsize())
# 取出最前面的一个数据 print(q.get())
------------ 2 Traceback (most recent call last): File "/Users/lvrui/PycharmProjects/untitled/10/temp.py", line 157, in <module> q.put('PolarSnow', timeout=2) File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/queue.py", line 141, in put raise Full queue.Full
# mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0
defget(self, block=True, timeout=None): '''Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). ''' with self.not_empty: ifnot block: ifnot self._qsize(): raise Empty elif timeout isNone: whilenot self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout whilenot self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item
参数为是否阻塞和超时时间
get_nowait
1 2 3 4 5 6 7
defget_nowait(self): '''Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. ''' return self.get(block=False)
非阻塞获取队列中的值
put && put_nowait
与get & get_nowait同理
empty
1 2 3 4 5 6 7 8 9 10 11 12 13
defempty(self): '''Return True if the queue is empty, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race condition where a queue can grow before the result of empty() or qsize() can be used. To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method. ''' with self.mutex: returnnot self._qsize()
检查队列是否为空,为空返回True,不为空返回False
full
1 2 3 4 5 6 7 8 9 10
deffull(self): '''Return True if the queue is full, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used. ''' with self.mutex: return0 < self.maxsize <= self._qsize()
判断队列是否已经满了
qsize
1 2 3 4
defqsize(self): '''Return the approximate size of the queue (not reliable!).''' with self.mutex: return self._qsize()
def__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): """This constructor should always be called with keyword arguments. Arguments are: *group* should be None; reserved for future extension when a ThreadGroup class is implemented. *target* is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called. *name* is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number. *args* is the argument tuple for the target invocation. Defaults to (). *kwargs* is a dictionary of keyword arguments for the target invocation. Defaults to {}. If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread. """ assert group isNone, "group argument must be None for now" if kwargs isNone: kwargs = {} self._target = target self._name = str(name or _newname()) self._args = args self._kwargs = kwargs if daemon isnotNone: self._daemonic = daemon else: self._daemonic = current_thread().daemon self._ident = None self._tstate_lock = None self._started = Event() self._is_stopped = False self._initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self._stderr = _sys.stderr # For debugging and _after_fork() _dangling.add(self)
在Thread的构造方法中,封装了我们传进去的两个参数
而子线程被CPU调度执行的时候,自动执行了Thread对象中的run方法,run方法源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
defrun(self): """Method representing the thread's activity. You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. """ try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs