python自动化开发 day10

Author:@南非波波

课程大纲:

day09

http://www.cnblogs.com/alex3714/articles/5248247.html

day10

http://www.cnblogs.com/alex3714/articles/5286889.html

一、回顾

1. 队列

1.    队列的作业就是实现多个线程之间数据安全的交互
2.    队列类型:先进先出、后进先出、优先级
3.    queue的数据必须按照顺序进行取出-->处理-->放回。主要作用就是不同进程之间数据的交换,manager可以进行多个进程之间的数据的共享,而且是数据安全的。
4.    生产者-消费者模型:实现程序的松耦合

2. gevent模块:对Greenlet模块的一次封装

1. gevent里面的socket本身可以实现IO阻塞变成非阻塞
2. monkey.path_all()可以帮助我们实现阻塞变成非阻塞

3. 协程

1. 实现单个线程里面的并发
2. 无需线程上下文切换的开销,无需原子操作锁定及同步的开销,方便切换控制流,高并发+高扩展性+低成本
3. 无法利用多核资源,但是可以实现单个进程下面起一个线程,然后一个线程下面实现多个协程并发

4. select

1. select 与poll的区别

    select有一个最大文件数的限制1024,文件扫描一个列表是非常低效的;poll没有这个限制
    内核态到用户态的数据copy;Epoll直接调用C语言进行内核态的数据nat到用户态

2.    select代码注释


    __auther__ = 'Victor'

    import select
    import socket
    import sys
    import queue

    # 创建一个TCP/IP socket
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    # 绑定socket到指定端口
    server_address = ('localhost', 10000)
    print(sys.stderr, 'starting up on %s port %s' % server_address)
    server.bind(server_address)
    # 监听连接的地址
    server.listen(5)
    inputs = [server]
    # Socket的读操作
    outputs = []
    # socket的写操作
    message_queues = {}
    while inputs:
    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # 监听句柄序列,如果某个发生变化,select的第一个rLest会拿到数据,output只要有数据wLest就能获取到,select的第三个参数inputs用来监测异常,并赋值给exceptional。
    # 监听inputs,outputs,inputs  如果他们的值有变化,就将分别赋值给readable,writable,exceptional。
    for s in readable:
    # 遍历readable的值。
    if s is server:
    connection, client_address = s.accept()
    # 如果s 是server,那么server socket将接收连接。
    print('new connection from', client_address)
    # 打印出连接客户端的地址。
    connection.setblocking(False)
    # 设置socket 为非阻塞模式。
    inputs.append(connection)
    # 因为有读操作发生,所以将此连接加入inputs
    message_queues[connection] = queue.Queue()
    # 为每个连接创建一个queue队列。使得每个连接接收到正确的数据。
    else:
    data = s.recv(1024)
    # 如果s不是server,说明客户端连接来了,那么就接受客户端的数据。
    if data:
    # 如果接收到客户端的数据
    print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
    message_queues[s].put(data)
    # 将收到的数据放入队列中
    if s not in outputs:
    outputs.append(s)
    # 将socket客户端的连接加入select的output中,并且用来返回给客户端数据。
    else:
    print('closing', client_address, 'after reading no data')
    # 如果没有收到客户端发来的空消息,则说明客户端已经断开连接。
    if s in outputs:
    outputs.remove(s)
    # 既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
    inputs.remove(s)
    # inputs中也删除掉
    s.close()
    # 把这个连接关闭掉
    del message_queues[s]
    # 删除此客户端的消息队列

    for s in writable:
    # 遍历output的数据
    try:
    next_msg = message_queues[s].get_nowait()
    except queue.Empty:
    # 获取对应客户端消息队列中的数据,如果队列中的数据为空,从消息队列中移除此客户端连接。
    print('output queue for', s.getpeername(), 'is empty')
    outputs.remove(s)
    else:
    print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
    s.send(next_msg)
    # 如果消息队列有数据,则发送给客户端。
    for s in exceptional:
    # 处理 "exceptional conditions"
    print('handling exceptional condition for', s.getpeername() )
    inputs.remove(s)
    # 取消对出现异常的客户端的监听
    if s in outputs:
    outputs.remove(s)
    # 移除客户端的连接对象。
    s.close()
    # 关闭此socket连接
    del message_queues[s]
    # 删除此消息队列。

    '''

    在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,

    轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

    epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统(文件系统一般用什么数据结构实现?B+树)。把原先的select/poll调用分成了3个部分:

    1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)

    2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字

    3)调用epoll_wait收集发生的事件的连接

    '''


3.    epoll代码注释

    __auther__ = 'Victor'


    #--------------这是一个epoll的例子--------------


    import socket, select
    # 'windows'下不支持'epoll'

    EOL1 = b'\n\n'
    EOL2 = b'\n\r\n'
    response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
    response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
    response += b'Hello, world!'

    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serversocket.bind(('0.0.0.0', 8080))
    serversocket.listen(1)
    # 建立socket连接。
    serversocket.setblocking(0)
    # 因为socket本身是阻塞的,setblocking(0)使得socket不阻塞

    epoll = select.epoll()
    # 创建一个eopll对象
    epoll.register(serversocket.fileno(), select.EPOLLIN)
    # 在服务器端socket上面注册对读event的关注,一个读event随时会触发服务器端socket去接收一个socket连接。

    try:
       connections = {}; requests = {}; responses = {}
    # 生成3个字典,connection字典是存储文件描述符映射到他们相应的网络连接对象
       while True:
          events = epoll.poll(1)
    # 查询epoll对象,看是否有任何关注的event被触发,参数‘1’表示,会等待一秒来看是否有event发生,如果有任何感兴趣的event发生在这次查询之前,这个查询就会带着这些event的列表立即返回
          for fileno, event in events:
            # event作为一个序列(fileno,event code)的元组返回,fileno是文件描述符的代名词,始终是一个整数。
             if fileno == serversocket.fileno():
                # 如果一个读event在服务器端socket发生,就会有一个新的socket连接可能被创建。
                connection, address = serversocket.accept()
                # 服务器端开始接收连接和客户端地址
                connection.setblocking(0)
                # 设置新的socket为非阻塞模式
                epoll.register(connection.fileno(), select.EPOLLIN)
                # 为新的socket注册对读(EPOLLIN)event的关注
                connections[connection.fileno()] = connection
                requests[connection.fileno()] = b''
                responses[connection.fileno()] = response
             elif event & select.EPOLLIN:
                requests[fileno] += connections[fileno].recv(1024)
                # 如果发生一个读event,就读取从客户端发过来的数据。
                if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
                   epoll.modify(fileno, select.EPOLLOUT)
                # 一旦完成请求已经收到,就注销对读event的关注,注册对写(EPOLLOUT)event的关注,写event发生的时候,会回复数据给客户端。
                   print('-'*40 + '\n' + requests[fileno].decode()[:-2])
                # 打印完整的请求,证明虽然与客户端的通信是交错进行的,但是数据可以作为一个整体来组装和处理。
             elif event & select.EPOLLOUT:
                # 如果一个写event在一个客户端socket上面发生,他会接受新的数据以便发送到客户端。
                byteswritten = connections[fileno].send(responses[fileno])
                responses[fileno] = responses[fileno][byteswritten:]
                if len(responses[fileno]) == 0:
                    # 每次发送一部分响应数据,直到完整的响应数据都已经发送给操作系统等待传输给客户端。
                   epoll.modify(fileno, 0)
                # 一旦完整的响应数据发送完成,就不再关注读或者写event。
                   connections[fileno].shutdown(socket.SHUT_RDWR)
                # 如果一个连接显式关闭,那么socket shutdown是可选的,在这里这样使用,是为了让客户端首先关闭。
                # shutdown调用会通知客户端socket没有更多的数据应该被发送或者接收,并会让功能正常的客户端关闭自己的socket连接。
             elif event & select.EPOLLHUP:
                # HUP挂起event表明客户端socket已经断开(即关闭),所以服务器端也需要关闭,没有必要注册对HUP event的关注,在socket上面,他们总是会被epoll对象注册。
                epoll.unregister(fileno)
                # 注销对此socket连接的关注。
                connections[fileno].close()
                # 关闭socket连接。
                del connections[fileno]
    finally:
       epoll.unregister(serversocket.fileno())
    # 去掉已经注册的文件句柄
       epoll.close()
    # 关闭epoll对象
       serversocket.close()
    # 关闭服务器连接
    # 打开的socket连接不需要关闭,因为Python会在程序结束时关闭, 这里的显示关闭是个好的习惯。

    '''

    首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。

        不管是文件,还是套接字,还是管道,我们都可以把他们看作流。

        之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。现在假定一个情形,
        我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),
        这时候该怎么办?

    阻塞:阻塞是个什么概念呢?比如某个时候你在等快递,但是你不知道快递什么时候过来,而且你没有别的事可以干(或者说接下来的事要等快递来了才能做);
    那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打个电话(假定一定能叫醒你)。

    非阻塞忙轮询:接着上面等快递的例子,如果用忙轮询的方法,那么你需要知道快递员的手机号,然后每分钟给他挂个电话:“你到了没?”

        很明显一般人不会用第二种做法,不仅显很无脑,浪费话费不说,还占用了快递员大量的时间。

        大部分程序也不会用第二种做法,因为第一种方法经济而简单,经济是指消耗很少的CPU时间,如果线程睡眠了,就掉出了系统的调度队列,暂时不会去瓜分CPU宝贵的时间片了。

        为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用(你知道它很慢的),
        当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。

    假设有一个管道,进程A为管道的写入方,B为管道的读出方。

    假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,
    这个事件姑且称之为“缓冲区非空”。

        但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,
        B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。

    假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”

        也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。

    这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满(注都是说的内核缓冲区,且这四个术语都是我生造的,仅为解释其原理而造)。
    这四个I/O事件是进行阻塞同步的根本。(如果不能理解“同步”是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。

        然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),
        很不幸这两种方法效率都不高。

        于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了(把一个流从阻塞模式切换到非阻塞模式再此不予讨论):

    while true {
        for i in stream[]; {
            if i has data
                read until unavailable
        }
    }

        我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。
        这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。

        为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,
        可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流
        (于是我们可以把“忙”字去掉了)。代码长这样:

    while true {
        select(streams[])
        for i in streams[] {
            if i has data
                read until unavailable
        }
    }

        于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流
        (可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。

        但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,没一次无差别轮询时间就越长。再次

    说了这么多,终于能好好解释epoll了

        epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。
        (复杂度降低到了O(1))

        在讨论epoll的实现细节之前,先把epoll的相关操作列出:

          epoll_create 创建一个epoll对象,一般epollfd = epoll_create()

          epoll_ctl (epoll_add/epoll_del的合体),往epoll对象中增加/删除某一个流的某一个事件

    比如

    epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注册缓冲区非空事件,即有数据流入

    epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注册缓冲区非满事件,即流可以被写入

    epoll_wait(epollfd,...)等待直到注册的事件发生

    (注:当对一个非阻塞流的读写发生缓冲区满或缓冲区空,write/read会返回-1,并设置errno=EAGAIN。而epoll只关心缓冲区非满和缓冲区非空事件)。

    一个epoll模式的代码大概的样子是:
    while true {
        active_stream[] = epoll_wait(epollfd)
        for i in active_stream[] {
            read or write till
        }
    }
    '''

二、Twsited异步网络框架

1. 事件驱动

将自定义的类和函数注册到事件列表中,事件驱动框架就会自行去列表中获取事件并执行。

第一,注册事件;第二,触发事件
![](http://i.imgur.com/6brDszu.png)

示例代码:

#event_drive.py

#!/usr/local/env python3
'''
Author:@南非波波
Blog:http://www.cnblogs.com/songqingbo/
E-mail:qingbo.song@gmail.com
'''
'''
模拟twsited异步网络框架的流程
'''
#创建一个事件列表
event_list = []

#创建一个事件驱动动作
def run():
    for event in event_list:
        obj = event()
        obj.execute()

#创建事件定义规则,用户将自定义事件注册到事件列表中需要继承此类
class BaseHandler(object):
    """
    用户必须继承该类,从而规范所有类的方法(类似于接口的功能)
    """
    def execute(self):
        raise Exception('you must overwrite execute')


#event_run.py


#!/usr/local/env python3
'''
Author:@南非波波
Blog:http://www.cnblogs.com/songqingbo/
E-mail:qingbo.song@gmail.com
'''
import event_drive

#自定义事件,继承事件驱动自定义类
class MyHandler(event_drive.BaseHandler):

    #重写执行函数
    def execute(self):
        print('event-drive execute MyHandler')
class YourHandler(event_drive.BaseHandler):

    def execute(self):
        print('event-drive ezecute YourHandler')

event_drive.event_list.append(MyHandler) #将事件注册到事件列表中
event_drive.event_list.append(YourHandler)
event_drive.run()

2. Twisted框架

Echo_server

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    from twisted.internet import protocol
    from twisted.internet import reactor

    class Echo(protocol.Protocol):
        '''
        定义一个类,处理客户端传递的数据
        '''
        def dataReceived(self, data):
            '''
            一旦接收到客户端传递的数据就要调用该方法
            :param data: 客户端传递过来的数据,python3版本传递的数据需要转换成bytes
            :return: 返回的数据是将客户端传递过来的数据返回给客户端
            '''
            print("Client said:",data)
            self.transport.write(data)

    def main():
        '''
        主函数,程序执行时直接从该函数调用事件类
        :return:
        '''
        factory = protocol.ServerFactory() #定义基础工厂类
        factory.protocol = Echo #相当于socketserver中的Handler方法,工厂协议直接引用自定义的Echo类

        reactor.listenTCP(5000,factory) #reactor自动重复去做一件事情。使用listenTCP监听端口
        reactor.run() #运行

    if __name__ == '__main__':
        main()


Echo_client:

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    from twisted.internet import reactor, protocol


    # a client protocol

    class EchoClient(protocol.Protocol):
        '''
        客户端Echo事件
        '''
        def connectionMade(self):
            '''
            连接建立执行该方法,客户端发送数据
            :return:
            '''
            self.transport.write(b"hello alex!")

        def dataReceived(self, data):
            '''
            客户端接收服务端的数据
            :param data:
            :return:
            '''
            print("Server said:", data)
            self.transport.loseConnection()
        def connectionLost(self, reason):
            '''
            客户端接收完数据断开连接,主动执行该方法断开连接
            :param reason:
            :return:
            '''
            print("connection lost")

    class EchoFactory(protocol.ClientFactory):
        '''
        自定义工厂类,继承prorocol.ClientFactory类
        '''
        protocol = EchoClient #hanld。自己重写了protocol类

        def clientConnectionFailed(self, connector, reason):
            print("Connection failed - goodbye!")
            reactor.stop()

        def clientConnectionLost(self, connector, reason):
            print("Connection lost - goodbye!")
            reactor.stop()


    # this connects the protocol to a server running on port 8000
    def main():
        f = EchoFactory()
        reactor.connectTCP("localhost", 5000, f)
        reactor.run()

    # this only runs if the module was *not* imported
    if __name__ == '__main__':
        main()

3.深入学习

http://blog.csdn.net/hanhuili/article/details/9389433

http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/

三、非关系型数据库

1. Redis

参考:http://www.cnblogs.com/wupeiqi/articles/5132791.html

数据(键值对)存储在内存中,一个独立的内存管理器,可以使多个程序共享数据    

默认是非持久化的,但是可以在配置文件中进行设置

1. redis基础使用

        cli>keys * #查看所有的键
        cli>set name swht ex 5 #设置一个键值对,其有效时间为5秒
        cli>get name #获取键值

2. redis连接

        import redis
        redis_cli = redis.Redis("localhost")
        print(redis_cli.get('name')) #b'swht'  get方法只能获取字符
3. redis连接池

        import redis
        pool = redis.ConnectionPool(host = 'localhost',port = 6379)
        redis_cli = redis.Redis(connection_pool=pool)
        redis_cli.set('age',56)
        print(redis_cli.get('age')) #b'56'
4. 操作

    set(name, value, ex=None, px=None, nx=False, xx=False)

        在Redis中设置值,默认,不存在则创建,存在则修改
        参数:
             ex,过期时间(秒)
             px,过期时间(毫秒)
             nx,如果设置为True,则只有name不存在时,当前set操作才执行
             xx,如果设置为True,则只有name存在时,岗前set操作才执行
    setnx(name, value)

        设置值,只有name不存在时,执行设置操作(添加)
    setex(name, value, time)

        # 设置值
        # 参数:
            # time,过期时间(数字秒 或 timedelta对象)
    psetex(name, time_ms, value)

        # 设置值
        # 参数:
            # time_ms,过期时间(数字毫秒 或 timedelta对象)
    mset(*args, **kwargs)

        批量设置值
        如:
            mset(k1='v1', k2='v2')
            或
            mget({'k1': 'v1', 'k2': 'v2'})
    get(name)

        获取值
    mget(keys, *args)

        批量获取
        如:
            mget('ylr', 'wupeiqi')
            或
            r.mget(['ylr', 'wupeiqi'])
    getset(name, value)

        设置新值并获取原来的值
    getrange(key, start, end)


        # 获取子序列(根据字节获取,非字符)
        # 参数:
            # name,Redis 的 name
            # start,起始位置(字节)
            # end,结束位置(字节)
        # 如: "武沛齐" ,0-3表示 "武"
    setrange(name, offset, value)

        # 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
        # 参数:
            # offset,字符串的索引,字节(一个汉字三个字节)
            # value,要设置的值
    setbit(name, offset, value)    
        # 对name对应值的二进制表示的位进行操作

        # 参数:
            # name,redis的name
            # offset,位的索引(将值变换成二进制后再进行索引)
            # value,值只能是 1 或 0

        # 注:如果在Redis中有一个对应: n1 = "foo",
                那么字符串foo的二进制表示为:01100110 01101111 01101111
            所以,如果执行 setbit('n1', 7, 1),则就会将第7位设置为1,
                那么最终二进制则变成 01100111 01101111 01101111,即:"goo"

        # 扩展,转换二进制表示:

            # source = "武沛齐"
            source = "foo"

            for i in source:
                num = ord(i)
                print bin(num).replace('b','')

            特别的,如果source是汉字 "武沛齐"怎么办?
            答:对于utf-8,每一个汉字占 3 个字节,那么 "武沛齐" 则有 9个字节
               对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制    
    假定统计UV,使用setbit可以进行相应UV数统计。

        #!/usr/local/env python3
        '''
        Author:@南非波波
        Blog:http://www.cnblogs.com/songqingbo/
        E-mail:qingbo.song@gmail.com
        '''
        import redis

        pool = redis.ConnectionPool(host = 'localhost',port = 6379)
        redis_cli = redis.Redis(connection_pool=pool)
        redis_cli.setbit('ip',5,1)
        redis_cli.setbit('ip',45,1)
        redis_cli.setbit('ip',15,1)
        redis_cli.setbit('ip',45,1)
        print("uv_count:",redis_cli.bitcount('ip'))    
    getbit(name, offset)


        # 获取name对应的值的二进制表示中的某位的值 (0或1)
        bitcount(key, start=None, end=None)

        # 获取name对应的值的二进制表示中 1 的个数
        # 参数:
            # key,Redis的name
            # start,位起始位置
            # end,位结束位置
    bitop(operation, dest, *keys)

        # 获取多个值,并将值做位运算,将最后的结果保存至新的name对应的值

        # 参数:
            # operation,AND(并) 、 OR(或) 、 NOT(非) 、 XOR(异或)
            # dest, 新的Redis的name
            # *keys,要查找的Redis的name

        # 如:
            bitop("AND", 'new_name', 'n1', 'n2', 'n3')
            # 获取Redis中n1,n2,n3对应的值,然后讲所有的值做位运算(求并集),然后将结果保存 new_name 对应的值中
    strlen(name)

        # 返回name对应值的字节长度(一个汉字3个字节)
    incr(self, name, amount=1)

        做pv统计比较有用
        # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。

        # 参数:
            # name,Redis的name
            # amount,自增数(必须是整数)

        # 注:同incrby
    incrbyfloat(self, name, amount=1.0)

        # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。

        # 参数:
            # name,Redis的name
            # amount,自增数(浮点型)
    decr(self, name, amount=1)

        # 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。

        # 参数:
            # name,Redis的name
            # amount,自减数(整数)
    append(key, value)

        # 在redis name对应的值后面追加内容

        # 参数:
            key, redis的name
            value, 要追加的字符串        

5. Hash操作

    hset(name, key, value)

        # name对应的hash中设置一个键值对(不存在,则创建;否则,修改)

        # 参数:
            # name,redis的name
            # key,name对应的hash中的key
            # value,name对应的hash中的value

        # 注:
            # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)
    hmset(name, mapping)

        # 在name对应的hash中批量设置键值对

        # 参数:
            # name,redis的name
            # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}

        # 如:
            # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
    hget(name,key)

        # 在name对应的hash中获取根据key获取value
        hmget(name, keys, *args)

        # 在name对应的hash中获取多个key的值

        # 参数:
            # name,reids对应的name
            # keys,要获取key集合,如:['k1', 'k2', 'k3']
            # *args,要获取的key,如:k1,k2,k3

        # 如:
            # r.mget('xx', ['k1', 'k2'])
            # 或
            # print r.hmget('xx', 'k1', 'k2')
    hgetall(name)

        获取name对应hash的所有键值
    hlen(name)

        # 获取name对应的hash中键值对的个数
    hkeys(name)

        # 获取name对应的hash中所有的key的值
    hvals(name)

        # 获取name对应的hash中所有的value的值
    hexists(name, key)

        # 检查name对应的hash是否存在当前传入的key
    hdel(name,*keys)

        # 将name对应的hash中指定key的键值对删除
    hincrby(name, key, amount=1)

        # 自增name对应的hash中的指定key的值,不存在则创建key=amount
        # 参数:
            # name,redis中的name
            # key, hash对应的key
            # amount,自增数(整数)
    hincrbyfloat(name, key, amount=1.0)

        # 自增name对应的hash中的指定key的值,不存在则创建key=amount

        # 参数:
            # name,redis中的name
            # key, hash对应的key
            # amount,自增数(浮点数)

        # 自增name对应的hash中的指定key的值,不存在则创建key=amount
    hscan(name, cursor=0, match=None, count=None)

        # 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆

        # 参数:
            # name,redis的name
            # cursor,游标(基于游标分批取获取数据)
            # match,匹配指定key,默认None 表示所有的key
            # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数

        # 如:
            # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
            # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
            # ...
            # 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕
    hscan_iter(name, match=None, count=None)

        # 利用yield封装hscan创建生成器,实现分批去redis中获取数据

        # 参数:
            # match,匹配指定key,默认None 表示所有的key
            # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数

        # 如:
            # for item in r.hscan_iter('xx'):
            #     print item
6. List操作

    lpush(name,values)

        # 在name对应的list中添加元素,每个新的元素都添加到列表的最左边

        # 如:
            # r.lpush('oo', 11,22,33)
            # 保存顺序为: 33,22,11

        # 扩展:
            # rpush(name, values) 表示从右向左操作
    lpushx(name,value)

        # 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边

        # 更多:
            # rpushx(name, value) 表示从右向左操作
    llen(name)

        # name对应的list元素的个数
    linsert(name, where, refvalue, value))

        # 在name对应的列表的某一个值前或后插入一个新值

        # 参数:
            # name,redis的name
            # where,BEFORE或AFTER
            # refvalue,标杆值,即:在它前后插入数据
            # value,要插入的数据
    r.lset(name, index, value)

        # 对name对应的list中的某一个索引位置重新赋值

        # 参数:
            # name,redis的name
            # index,list的索引位置
            # value,要设置的值
    r.lrem(name, value, num)

        # 在name对应的list中删除指定的值

        # 参数:
            # name,redis的name
            # value,要删除的值
            # num,  num=0,删除列表中所有的指定值;
                   # num=2,从前到后,删除2个;
                   # num=-2,从后向前,删除2个
    lpop(name)

        # 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素

        # 更多:
            # rpop(name) 表示从右向左操作
    lindex(name, index)

        在name对应的列表中根据索引获取列表元素
    lrange(name, start, end)

        # 在name对应的列表分片获取数据
        # 参数:
            # name,redis的name
            # start,索引的起始位置
            # end,索引结束位置
    ltrim(name, start, end)

        # 在name对应的列表中移除没有在start-end索引之间的值
        # 参数:
            # name,redis的name
            # start,索引的起始位置
            # end,索引结束位置
    rpoplpush(src, dst)

        # 从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
        # 参数:
            # src,要取数据的列表的name
            # dst,要添加数据的列表的name
    blpop(keys, timeout)

        # 将多个列表排列,按照从左到右去pop对应列表的元素

        # 参数:
            # keys,redis的name的集合
            # timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞

        # 更多:
            # r.brpop(keys, timeout),从右向左获取数据
    brpoplpush(src, dst, timeout=0)

        # 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧

        # 参数:
            # src,取出并要移除元素的列表对应的name
            # dst,要插入元素的列表对应的name
            # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞
    自定义增量迭代

        # 由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要:
            # 1、获取name对应的所有列表
            # 2、循环列表
        # 但是,如果列表非常大,那么就有可能在第一步时就将程序的内容撑爆,所有有必要自定义一个增量迭代的功能:

        def list_iter(name):
            """
            自定义redis列表增量迭代
            :param name: redis中的name,即:迭代name对应的列表
            :return: yield 返回 列表元素
            """
            list_count = r.llen(name)
            for index in xrange(list_count):
                yield r.lindex(name, index)

        # 使用
        for item in list_iter('pp'):
            print item
7. Set操作,Set集合就是不允许重复的列表

    sadd(name,values)

        # name对应的集合中添加元素
    scard(name)

        获取name对应的集合中元素个数
    sdiff(keys, *args)

        在第一个name对应的集合中且不在其他name对应的集合的元素集合
    sdiffstore(dest, keys, *args)

        # 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
    sinter(keys, *args)

        # 获取多一个name对应集合的并集
    sinterstore(dest, keys, *args)

        # 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中
    sismember(name, value)

        # 检查value是否是name对应的集合的成员
    smembers(name)

        # 获取name对应的集合的所有成员
    smove(src, dst, value)

        # 将某个成员从一个集合中移动到另外一个集合
    spop(name)

        # 从集合的右侧(尾部)移除一个成员,并将其返回
    srandmember(name, numbers)

        # 从name对应的集合中随机获取 numbers 个元素
    srem(name, values)

        # 在name对应的集合中删除某些值
    sunion(keys, *args)

        # 获取多一个name对应的集合的并集
    sunionstore(dest,keys, *args)

        # 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
    sscan(name, cursor=0, match=None, count=None)
    sscan_iter(name, match=None, count=None)

        # 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大


8. 有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。

    zadd(name, *args, **kwargs)

        # 在name对应的有序集合中添加元素
        # 如:
             # zadd('zz', 'n1', 1, 'n2', 2)
             # 或
             # zadd('zz', n1=11, n2=22)
    zcard(name)

        # 获取name对应的有序集合元素的数量
    zcount(name, min, max)

        # 获取name对应的有序集合中分数 在 [min,max] 之间的个数
    zincrby(name, value, amount)

        # 自增name对应的有序集合的 name 对应的分数
    r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

        # 按照索引范围获取name对应的有序集合的元素

        # 参数:
            # name,redis的name
            # start,有序集合索引起始位置(非分数)
            # end,有序集合索引结束位置(非分数)
            # desc,排序规则,默认按照分数从小到大排序
            # withscores,是否获取元素的分数,默认只获取元素的值
            # score_cast_func,对分数进行数据转换的函数

        # 更多:
            # 从大到小排序
            # zrevrange(name, start, end, withscores=False, score_cast_func=float)

            # 按照分数范围获取name对应的有序集合的元素
            # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
            # 从大到小排序
            # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
    zrank(name, value)

        # 获取某个值在 name对应的有序集合中的排行(从 0 开始)

        # 更多:
            # zrevrank(name, value),从大到小排序
    zrangebylex(name, min, max, start=None, num=None)

        # 当有序集合的所有成员都具有相同的分值时,有序集合的元素会根据成员的 值 (lexicographical ordering)来进行排序,而这个命令则可以返回给定的有序集合键 key 中, 元素的值介于 min 和 max 之间的成员
        # 对集合中的每个成员进行逐个字节的对比(byte-by-byte compare), 并按照从低到高的顺序, 返回排序后的集合成员。 如果两个字符串有一部分内容是相同的话, 那么命令会认为较长的字符串比较短的字符串要大

        # 参数:
            # name,redis的name
            # min,左区间(值)。 + 表示正无限; - 表示负无限; ( 表示开区间; [ 则表示闭区间
            # min,右区间(值)
            # start,对结果进行分片处理,索引位置
            # num,对结果进行分片处理,索引后面的num个元素

        # 如:
            # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
            # r.zrangebylex('myzset', "-", "[ca") 结果为:['aa', 'ba', 'ca']

        # 更多:
            # 从大到小排序
            # zrevrangebylex(name, max, min, start=None, num=None)
    zrem(name, values)

        # 删除name对应的有序集合中值是values的成员

        # 如:zrem('zz', ['s1', 's2'])
    zremrangebyrank(name, min, max)

        # 根据排行范围删除
    zremrangebyscore(name, min, max)

        # 根据分数范围删除
    zremrangebylex(name, min, max)

        # 根据值返回删除
    zscore(name, value)

        # 获取name对应有序集合中 value 对应的分数
    zinterstore(dest, keys, aggregate=None)

        # 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
        # aggregate的值为:  SUM  MIN  MAX
    zunionstore(dest, keys, aggregate=None)

        # 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
        # aggregate的值为:  SUM  MIN  MAX
    zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
    zscan_iter(name, match=None, count=None,score_cast_func=float)

        # 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作

  9. redis的发布与订阅

基础类:

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    import redis


    class RedisHelper:

        def __init__(self):
            self.__conn = redis.Redis(host='localhost',port=6379)
            self.chan_sub = 'fm104.5'
            self.chan_pub = 'fm104.5'

        def public(self, msg):
            self.__conn.publish(self.chan_pub, msg)
            return True

        def subscribe(self):
            pub = self.__conn.pubsub()
            pub.subscribe(self.chan_sub)
            pub.parse_response()
            return pub

redis_sub.py

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    from RedisHelper import RedisHelper

    obj = RedisHelper()
    redis_sub = obj.subscribe()

    while True:
        msg= redis_sub.parse_response()
        print(msg)

redis_pub.py

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    from RedisHelper import RedisHelper

    obj = RedisHelper()
    obj.public('hello')

2. memcached

非持久化轻量级缓存,使用第三方工具可以实现数据的持久化存储

3. mongodb

天生的数据持久化,默认将数据持久化存储在本地磁盘。

四、消息队列rabbitmq

通信模式:

1. 简单生产者消费者模型

rabbit_send

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    import pika
    #与消息队列建立一个连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    #创建一个管道
    channel = connection.channel()

    #在管道中声明一个名称为'name'的队列
    channel.queue_declare(queue='name')

    #一个消息不能直接发送给消息队列,需要通过一个路由器进行转发,这个路由器就是由exchange进行设置
    channel.basic_publish(exchange='', #路由器
                          routing_key='name', #队列名称
                          body='swht') #消息
    print(" [swht] Sent a message")
    connection.close()

rabbit_recive

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    import pika

    #与消息队列服务器建立连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    #创建一个管道
    channel = connection.channel()
    #消费者声明一个队列,为了防止生产者还没有启动没有完成创建队列时代码出错的问题。如果队列已存在,则忽略该操作,否则则创建队列
    channel.queue_declare(queue='name')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(callback,
                          queue='name',
                          no_ack=True) #接收消息不进行确认

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

2. 消息持久化

    channel.queue_declare(queue='name',durable=True)
已经存在的队列是不能再进行持久化设置的,所以在只有创建队列的时候设置持久化选项
    basc_ack = (delivery_tag= method.delivry_tag)

查看当前所有的queue  XX

3. 消息公平分发

只在消费者添加
channel.basic_qos(prefetch_count=1)

示例代码:

rabbit_slb_send.py
    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   '192.168.137.6'))
    channel = connection.channel()

    #声明queue
    channel.queue_declare(queue='task_queue')

    #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    import sys

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                          delivery_mode = 2, # make message persistent
                          ))
    print(" [x] Sent %r" % message)
    connection.close()

4. exchange路由

代码:

publisher.py

    #!/usr/local/env python3
    '''
    Author:@南非波波
    Blog:http://www.cnblogs.com/songqingbo/
    E-mail:qingbo.song@gmail.com
    '''
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs',
                             type='fanout')

    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()

subscriber.py

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs',
                             type='fanout')

    result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs',
                       queue=queue_name)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        print(" [x] %r" % body)

    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)

    channel.start_consuming()
文章目录
  1. 一、回顾
    1. 1. 队列
    2. 2. gevent模块:对Greenlet模块的一次封装
    3. 3. 协程
    4. 4. select
  2. 二、Twsited异步网络框架
    1. 1. 事件驱动
    2. 2. Twisted框架
    3. 3.深入学习
    4. 三、非关系型数据库
    5. 1. Redis
    6. 2. memcached
    7. 3. mongodb
  3. 四、消息队列rabbitmq
    1. 1. 简单生产者消费者模型
    2. 2. 消息持久化
    3. 3. 消息公平分发
    4. 4. exchange路由