整体架构

  • nova 和其他组件之间的交互使用 HTTP 请求
  • 内部组件之间使用 oslo_messaging 库实现 RPC 调用,这里还涉及消息队列 RabbitMQ ,遵循 AMQP 协议
  • 大部分 nova 组件都可以运行在多个服务器上,然后使用一个管理器监听 RPC 消息
  • 而 nova-compute 是运行在计算主机上的单进程,用于管理计算资源
  • nova 内部组件共享本地数据库,通过对象层访问,确保兼容性和安全性
    • nova-compute 访问数据库由 nova-conductor 代理

当用户发起一个新的请求时,该请求会先在 nova-api 中处理。nova-api 会对请求进行一系列检查,包括请求是否合法,配额是否足够等;当检查用过后,nova-api 就会为该请求分配一个唯一的虚拟机 ID ,并在数据库中新建对应的项来记录虚拟机的状态;然后,nova-api 会将请求发送给 nova-conductor 处理。

nova-conductor 主要管理服务之间的通信并进行任务处理。它在接收到请求之后,会为 nova-scheduler 创建一个 RequestSpec 对象用来包装与调度相关的所有请求信息,然后调用 nova-scheduler 服务的 select_destination 接口。

nova-scheduler 通过接收到的 RequestSpec 对象,首先将 RequestSpec 对象转换成 ResourceRequest 对象,并将该对象发送给 Placement 进行一次预筛选,然后会根据数据库中最新的系统状态做出调度决定,并告诉 nova-conductor 把该请求调度到合适的计算节点上。

nova-conductor 在得知调度决定后,会把请求发送给对应的 nova-compute 服务。

每个 nova-compute 服务都有独立的资源监视器(Resource Tracker)用来监视本地主机的资源使用情况。当计算节点接收到请求时,资源监视器能够检查主机是否有足够的资源。

  • 如果对应的资源足够,nova-compute 就会允许在当前主机中启动所要求的虚拟机,并在数据库中更新虚拟机状态,同时将最新的主机资源情况更新到数据库
  • 如果当前主机不符合请求的资源要求,nova-compute 会拒绝启动虚拟机,并将请求重新发给 nova-conductor 服务,重试整个调度过程

组成部分

  1. nova-api

    接受和响应用户的计算 API 调用

  2. nova-api-metadata

    接受来自实例的元数据请求

    Metadata service

  3. nova-compute

    通过 hypervisor API 创建和终止虚拟机实例的守护进程。例如 KVM/QEMU 的 libvirt、VMware 的 VMwareAPI 。

    运行在它所管理的 hypervisor 机器上,管理与虚拟机管理程序和虚拟机的通信。

  4. nova-scheduler

    从消息队列中获取虚拟机实例请求,并决定在哪个服务器上运行。

  5. nova-conductor

    处理需要协调的请求(构建/调整)、充当数据库代理或处理对象转换。用于连接 nova-api、nova-scheduler、nova-compute 服务。

  6. nova-novncproxy

    协调 nova-compute 服务和数据库之间的交互。避免 nova-compute 直接访问数据库,为了提供更好的 API 兼容性。建议不要部署在 nova-compute 服务所在的节点上。

  7. nova-spicehtml5proxy

    提供通过 SPICE 连接访问运行实例的代理,支持基于浏览器的 HTML5 客户端。

  8. The queue

    在守护进程之间传递消息的中央消息队列,通常使用 RabbitMQ 。

  9. SQL database

    存储云基础设施的大多数构建时和运行时状态,包括:可用的实例类型、在使用的实例、可用的网络、项目。

RPC

消息代理(RabbitMQ AMQP broker)允许 nova 内部组件以低耦合的方式进行通信,建立在发布/订阅(publish/subscribe)模式上

  • 解耦客户端和服务端
  • 同步客户端和服务端
  • 平衡远程调用

nova 使用 AMQP 中的直连(direct)、扇型(fanout)、主题(topic)交换;

nova 使用适配器类(adapter)将消息封装和解封从而调用函数,实现了两种 RPC 调用

  • rpc.call:请求 + 响应,api 作为消费者(consumer)
  • rpc.cast:单向,api 作为发布者(publisher)

每个 nova 服务在初始化时创建两个队列

  • 接受路由键 NODE-TYPE.NODE-ID(例如,compute.hostname):nova-api 需要重定向到特定节点
  • 接受路由键 NODE-TYPE(例如,compute):

每个 nova 内部组件都连接到消息代理,根据不同的作用,把消息队列作为:

  • 调用者(Invoker):nova-api、nova-scheduler;通过 rpc.callrpc.cast 向消息队列发送消息
  • 工作者(Worker):nova-compute;从消息队列接收消息,根据 rpc.call 进行响应

相关概念

主题发布者(Topic Publisher)

执行 rpc.callrpc.cast 操作将实例化一个主题发布者,用于将消息发送到消息队列。每个发布者总是连接到相同的主题交换机(topic-based exchange);生命周期仅限于消息传递。

直连消费者(Direct Consumer)

执行 rpc.call 操作将实例化一个直连消费者,用于从消息队列接收响应消息。每个消费者连接到唯一的直连交换机(direct-based exchange);生命周期仅限于消息传递。

主题消费者(Topic Consumer)

当工作者被实例化后将实例化一个主题消费者,并存在于工作者的整个生命周期;主题消费者用于从消息队列接收消息,并调用工作者定义的操作。主题消费者通过共享/排他队列(shared/exclusive queue)连接到相同的主体交换机。每个工作者都有两个主题消费者,一个处理 rpc.cast ,连接到交换键是 topic 的共享队列;另一个处理 rpc.call ,连接到交换键是 topic.host 的独立队列。

直连发布者(Direct Publisher)

执行 rpc.call 操作将实例化一个直连发布者,用于返回请求/响应操作所需的消息,连接到直连交换机。

主题交换机(Topic Exchange)

存在于虚拟机上下文中的路由表;类型(主题/直连)决定了路由策略;对于 nova 中的每个主题,消息代理节点只有一个主题交换机。

直连交换机(Direct Exchange)

rpc.call 操作中创建的路由表,消息代理节点的生命周期中有许多该实例,对应每个 rpc.call 调用。

队列元素(Queue Element)

消息桶,消息一直保存在队列中,直到消费者(主题/直连)连接到队列获取消息。队列可以是共享的也可以是独立的;路由键是 topic 的队列在相同类型的工作者中共享。

rpc.call

  1. 实例化主题发布者,将请求发送到消息队列;在发布操作之前,实例化直连消费者等待响应信息

  2. 一旦消息被交换器分派(dispatch),它就会被路由键(例如,topic.host)指定的主题消费者获取,并传递给负责该任务的工作者

  3. 任务完成后,将分配一个直连发布者将响应消息发送到消息队列

  4. 一旦消息被交换器分派,它就会被路由键(例如,msg_id)指定的直连消费者获取,并传递给调用者

rpc.cast

  1. 实例化主题发布者,将请求发送到消息队列

  2. 一旦消息被交换器分派(dispatch),它就会被路由键(例如,topic)指定的主题消费者获取,并传递给负责该任务的工作者

源码分析

从 github 下载 Victoria 版本的 Nova 源码

1
git clone https://github.com/openstack/nova.git --branch stable/victoria --single-branch

nova/ 文件夹下的目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
accelerator/    # Cyborg 加速器
api/ # Nova API 服务
cmd/ # 各个 Nova 服务的入口程序
compute/ # Nova Compute 服务
conductor/ # Nova Conductor 服务
conf/ # 所有的配置选项
console/ # nova-console 服务
db/ # 封装数据库操作
hacking/ # 编码规范检查
image/ # 封装镜像操作,Glance 接口抽象
keymgr/ # 密钥管理器实现
locale/ # 国际化相关文件
network/ # nova-network 服务
notifications/ # 通知相关功能
objects/ # 封装实体对象的 CURD 操作
pci/ # PCI/SR-IOV 支持
policies/ # 所有 Policy 的默认规则
privsep/ # oslo_privsep 相关
scheduler/ # Nova Scheduler 服务
servicegroup/ # 成员服务(membership),服务组
storage/ # Ceph 存储支持
tests/ # 单元测试
virt/ # 支持的 hypervisor 驱动
volume/ # 封装卷访问接口,Cinder 接口抽象

nova/ 文件夹下的 python 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
__init__.py
availability_zones.py # 区域设置的工具函数
baserpc.py # 基础 RPC 客户端/服务端实现
block_device.py # 块设备映射
cache_utils.py # oslo_cache 封装
config.py # 解析命令行参数
context.py # 贯穿 Nova 的所有请求的上下文
crypto.py # 包装标准加密数据元素
debugger.py # pydev 调试
exception.py # 基础异常类
exception_wrapper.py # 封装异常类
filters.py # 基础过滤器
i18n.py # 集成 oslo_i18n
loadables.py # 可加载类
manager.py # 基础 Manager 类
middleware.py # 更新 oslo_middleware 的默认配置选项
monkey_patch.py # eventlet 猴子补丁
policy.py # 策略引擎
profiler.py # 调用 OSProfiler
quota.py # 每个项目的资源配额
rpc.py # RPC 操作相关的工具函数
safe_utils.py # 不会导致循环导入的工具函数
service.py # 通用节点基类,用于在主机上运行的所有工作者
service_auth.py # 身份认证插件
test.py # 单元测试基础类
utils.py # 工具函数
version.py # 版本号管理
weights.py # 权重插件
wsgi.py # 管理 WSGI 应用的服务器类

setup.cfg 配置文件,[entry_points] 小节指定了 nova 各个组件入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
console_scripts =
nova-api = nova.cmd.api:main
nova-api-metadata = nova.cmd.api_metadata:main
nova-api-os-compute = nova.cmd.api_os_compute:main
nova-compute = nova.cmd.compute:main
nova-conductor = nova.cmd.conductor:main
nova-manage = nova.cmd.manage:main
nova-novncproxy = nova.cmd.novncproxy:main
nova-policy = nova.cmd.policy:main
nova-rootwrap = oslo_rootwrap.cmd:main
nova-rootwrap-daemon = oslo_rootwrap.cmd:daemon
nova-scheduler = nova.cmd.scheduler:main
nova-serialproxy = nova.cmd.serialproxy:main
nova-spicehtml5proxy = nova.cmd.spicehtml5proxy:main
nova-status = nova.cmd.status:main
wsgi_scripts =
nova-api-wsgi = nova.api.openstack.compute.wsgi:init_application
nova-metadata-wsgi = nova.api.metadata.wsgi:init_application

nova-api

nova-api 对外提供 RESTful API,没有对内的 RPC 。

nova/api/ 目录结构

1
2
3
4
5
6
7
__init__.py
auth.py # 身份认证中间件
compute_req_id.py # x-compute-request-id 中间件(oslo_middleware)
metadata/ # Metadata API
openstack/ # Nova v2.1 API
validation/ # 请求体验证
wsgi.py # WSGI 原语(请求、应用、中间件、路由、加载器)

openstack 目录中包含 WSGI 基础架构的代码,一些 WSGI 中间件,以及如何解析请求与分发请求的核心代码。

nova/api/openstack/compute/ 包含 Controller 实现,Resource 对象将 API 映射到相应的 Controller 方法上。

1
2
3
4
5
6
7
8
9
10
11
__init__.py
api_version_request.py # 版本验证
auth.py # noauth 中间件
common.py # 信息查询的工具函数
compute/ # 每个 API 的入口点
identity.py # 验证项目是否存在
requestlog.py # 请求日志中间件
urlmap.py # url 映射
versioned_method.py # 版本信息
wsgi.py # WSGI 相关抽象类
wsgi_app.py # WSGI 应用程序初始化方法

API 请求路由

nova-api 读取 etc/nova/api-paste.ini 并加载 WSGI 程序,最终 API 入口点都位于 nova.api.openstack.compute 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[composite:osapi_compute]
use = call:nova.api.openstack.urlmap:urlmap_factory
/: oscomputeversions # version API
/v2: oscomputeversion_legacy_v2 # v2 API
/v2.1: oscomputeversion_v2 # v2.1 API
# v21 is an exactly feature match for v2, except it has more stringent
# input validation on the wsgi surface (prevents fuzzing early on the
# API). It also provides new features via API microversions which are
# opt into for clients. Unaware clients will receive the same frozen
# v2 API feature set, but with some relaxed validation
/v2/+: openstack_compute_api_v21_legacy_v2_compatible
/v2.1/+: openstack_compute_api_v21

[composite:openstack_compute_api_v21]
use = call:nova.api.auth:pipeline_factory_v21 # 加载中间件
keystone = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler authtoken keystonecontext osapi_compute_app_v21
# DEPRECATED: The [api]auth_strategy conf option is deprecated and will be
# removed in a subsequent release, whereupon this pipeline will be unreachable.
noauth2 = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler noauth2 osapi_compute_app_v21

[app:osapi_compute_app_v21]
paste.app_factory = nova.api.openstack.compute:APIRouterV21.factory # 入口

nova/api/openstack/compute/routes.py 中的 APIRouterV21 主要用来完成路由规则的创建,其中 ROUTE_LIST 保存了 URL 与 Controller 之间的映射关系。

APIRouterV21 基于 ROUTE_LIST,使用 Routes 模块作为 URL 映射的工具,将各个模块所实现的 API 对应的 URL 注册到 mapper 中,并把每个资源都封装成 nova.api.openstack.wsgi.Resource 对象,当解析 URL 请求时,可以通过 URL 映射找到 API 对应的 Resource 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Router 类对 WSGI routes 模块进行了简单的封装
class APIRouterV21(base_wsgi.Router):
"""Routes requests on the OpenStack API to the appropriate controller
and method. The URL mapping based on the plain list `ROUTE_LIST` is built
at here.
"""
def __init__(self, custom_routes=None):
""":param custom_routes: the additional routes can be added by this
parameter. This parameter is used to test on some fake routes
primarily.
"""
super(APIRouterV21, self).__init__(nova.api.openstack.ProjectMapper())

if custom_routes is None:
custom_routes = tuple()

for path, methods in ROUTE_LIST + custom_routes:
# NOTE(alex_xu): The variable 'methods' is a dict in normal, since
# the dict includes all the methods supported in the path. But
# if the variable 'method' is a string, it means a redirection.
# For example, the request to the '' will be redirect to the '/' in
# the Nova API. To indicate that, using the target path instead of
# a dict. The route entry just writes as "('', '/)".
if isinstance(methods, six.string_types):
self.map.redirect(path, methods)
continue

for method, controller_info in methods.items():
# TODO(alex_xu): In the end, I want to create single controller
# instance instead of create controller instance for each
# route.
controller = controller_info[0]()
action = controller_info[1]
self.map.create_route(path, method, controller, action)

@classmethod
def factory(cls, global_config, **local_config):
"""Simple paste factory, :class:`nova.wsgi.Router` doesn't have one."""
return cls()

nova/api/wsgi.py 解析 URL 映射,通过 _dispatch 回调,调用 Resource 对象的 _call_ 方法,最终通过请求调用 API 对应的模块中的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 路由
class Router(object):
"""WSGI middleware that maps incoming requests to WSGI apps."""

def __init__(self, mapper):
"""Create a router for the given routes.Mapper.

Each route in `mapper` must specify a 'controller', which is a
WSGI app to call. You'll probably want to specify an 'action' as
well and have your controller be an object that can route
the request to the action-specific method.

Examples:
mapper = routes.Mapper()
sc = ServerController()

# Explicit mapping of one route to a controller+action
mapper.connect(None, '/svrlist', controller=sc, action='list')

# Actions are all implicitly defined
mapper.resource('server', 'servers', controller=sc)

# Pointing to an arbitrary WSGI app. You can specify the
# {path_info:.*} parameter so the target app can be handed just that
# section of the URL.
mapper.connect(None, '/v1.0/{path_info:.*}', controller=BlogApp())

"""
self.map = mapper
# 使用 routes 模块关联 mapper 和 _dispatch
# routes.middleware.RoutesMiddleware 设置 environ 信息
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)

@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
"""Route the incoming request to a controller based on self.map.

If no match, return a 404.

"""
# 根据 mapper 将请求路由到 WSGI 应用(资源)
# 每个资源会在 __call__ 方法中根据 HTTP 请求的 URL 路由到对应 Controller 上的方法(Action)
return self._router

@staticmethod
@webob.dec.wsgify(RequestClass=Request)
def _dispatch(req):
"""Dispatch the request to the appropriate controller.

Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.

"""
# 根据 HTTP 请求的 environ 信息找到 URL 对应的 Controller
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
app = match['controller']
return app

API 实现

nova/api/openstack/compute/ 目录包含每个 API 对应的 Controller 实现,Resource 对象将请求的 API 映射到相应的 Controller 方法上。

keypairs.py (密钥对管理扩展)为例,公共方法包含 create、delete、show、index,多个实现对应不同的 Microversion(使用 @wsgi.Controller.api_version 装饰器)

  • @wsgi.expected_errors:API 允许的错误返回码
  • @validation.query_schema:请求对应的 json schema
  • @wsgi.response:API 请求正常返回码
  • @wsgi.action:注册 action

Microversion 用于实现兼容性。

nova/api/openstack/compute/schemas 包含允许的 json schema,表示接受的键值对及其类型。

通过方法接口可以得到 webob.Request 对象,从 Request 对象中可以获取其他请求参数,用于执行对应的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class KeypairController(wsgi.Controller):

"""Keypair API controller for the OpenStack API."""

_view_builder_class = keypairs_view.ViewBuilder

def __init__(self):
super(KeypairController, self).__init__()
self.api = compute_api.KeypairAPI()

@wsgi.Controller.api_version("2.10")
@wsgi.response(201)
@wsgi.expected_errors((400, 403, 409))
@validation.schema(keypairs.create_v210)
def create(self, req, body):
...

@wsgi.Controller.api_version("2.2", "2.9") # noqa
@wsgi.response(201)
@wsgi.expected_errors((400, 403, 409))
@validation.schema(keypairs.create_v22)
def create(self, req, body): # noqa
...

nova-conductor

使用 RPC 的子组件通常包含以下文件:

  • api.py 对 RPC 接口进行封装,类似提供 SDK
  • rpcapi.py 暴露给其他内部组件的 RPC 接口,RPC 客户端
  • manager.py 处理 RPC API 调用

nova-compute 访问数据库的操作都要由 nova-conductor 代理,用 nova/conductor/manager.py 的 ConductorManager 类完成,出于安全性考虑,nova-conductor 和 nova-compute 不能部署在同一服务器上。

nova/objects 定义了 nova object,封装数据库 CURD 操作,每个类对应数据库中的一张表。

nova-scheduler

nova-scheduler 执行调度决策,nova-compute 收集并更新主机数据,实时写入数据库(周期任务)。

nova/scheduler/filters 包含所有的过滤器实现,用于过滤不符合条件的主机;nova/scheduler/weights 包含所有的权重实现,用于计算权重并排序。

启动流程

nova-api 启动入口 nova.cmd.api:main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def main():
config.parse_args(sys.argv) # 解析参数
logging.setup(CONF, "nova") # 设置日志
objects.register_all() # 注册 nova object
gmr_opts.set_defaults(CONF) # 设置 oslo_reports
if 'osapi_compute' in CONF.enabled_apis:
# NOTE(mriedem): This is needed for caching the nova-compute service
# version.
objects.Service.enable_min_version_cache()
log = logging.getLogger(__name__)

# 生成报告的机制 Guru Meditation Report (GMR)
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)

# oslo_service.ProcessLauncher
launcher = service.process_launcher()
started = 0
# 根据 paste-ini 文件创建 WSGI 应用
for api in CONF.enabled_apis:
should_use_ssl = api in CONF.enabled_ssl_apis
try:
# nova.service.WSGIService 初始化 WSGI 程序
server = service.WSGIService(api, use_ssl=should_use_ssl)
# oslo_service.ProcessLauncher 创建子进程启动服务
launcher.launch_service(server, workers=server.workers or 1)
started += 1
except exception.PasteAppNotFound as ex:
log.warning("%s. ``enabled_apis`` includes bad values. "
"Fix to remove this warning.", ex)

if started == 0:
log.error('No APIs were started. '
'Check the enabled_apis config option.')
sys.exit(1)

# 等待子进程终止
launcher.wait()

nova.service.WSGIService 的初始化函数实例化 nova.wsgi.Server ,启动函数实际调用了 nova.wsgi.Server 的 start 方法。

其中的 self._socket 使用 eventlet.listen 创建,最后使用 utils 中封装的 spawn 函数启动 WSGI 程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Server(service.ServiceBase):
"""Server class to manage a WSGI server, serving a WSGI application."""

...

def start(self):
"""Start serving a WSGI application.

:returns: None
"""
# The server socket object will be closed after server exits,
# but the underlying file descriptor will remain open, and will
# give bad file descriptor error. So duplicating the socket object,
# to keep file descriptor usable.

dup_socket = self._socket.dup()
dup_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
dup_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)

...

self._server = utils.spawn(**wsgi_kwargs)

nova-conductor 启动入口 nova.cmd.conductor:main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def main():
config.parse_args(sys.argv)
logging.setup(CONF, "nova")
objects.register_all()
gmr_opts.set_defaults(CONF)
objects.Service.enable_min_version_cache()

gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)

# nova.service.Service 实例化 Service 对象
server = service.Service.create(binary='nova-conductor',
topic=rpcapi.RPC_TOPIC)
workers = CONF.conductor.workers or processutils.get_worker_count()
# oslo_service.launch 创建 launcher
service.serve(server, workers=workers)
# 调用 launcher.wait 等待子进程终止
service.wait()

nova.service.Service 初始化函数接受 manager 对象,通过监听消息队列启用 RPC 服务;设置定期任务报告状态,并写入数据库。

  • nova-compute
  • nova-conductor
  • nova-scheduler

RPC 服务启动时创建 rpc_client 用于发送消息,创建 rpc_server 用于接收消息,分派执行。

1. rpc_client

nova/cmd/conductor.py 实际创建 Service 实例

1
2
server = service.Service.create(binary='nova-conductor',
topic=rpcapi.RPC_TOPIC)

nova/service.py 初始化函数

1
2
3
4
5
6
7
8
9
10
11
# 创建 _driver
self.servicegroup_api = servicegroup.API()

# 动态导入 manager 类
manager_class = importutils.import_class(self.manager_class_name)

if objects_base.NovaObject.indirection_api:
# 创建 RPCClient
conductor_api = conductor.API()
# 等待 nova-conductor 启动
conductor_api.wait_until_ready(context.get_admin_context())

nova/servicegroup/api.py 创建 _driver

1
2
3
driver_class = _driver_name_class_mapping[CONF.servicegroup_driver]
self._driver = importutils.import_object(driver_class,
*args, **kwargs)

nova/conductor/api.py 实际调用 rpcapi.py

1
2
3
def __init__(self):
self.conductor_rpcapi = rpcapi.ConductorAPI()
self.base_rpcapi = baserpc.BaseAPI(topic=rpcapi.RPC_TOPIC)

nova/conductor/rpcapi.py 设置 rpc_client

1
2
3
4
5
6
7
8
9
10
def __init__(self):
super(ConductorAPI, self).__init__()
target = messaging.Target(topic=RPC_TOPIC, version='3.0')
version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.conductor,
CONF.upgrade_levels.conductor)
serializer = objects_base.NovaObjectSerializer()
# rpc client
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)

nova/baserpc.py 设置 rpc_client

1
2
3
4
5
6
7
8
def __init__(self, topic):
super(BaseAPI, self).__init__()
target = messaging.Target(topic=topic,
namespace=_NAMESPACE,
version='1.0')
version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.baseapi,
CONF.upgrade_levels.baseapi)
self.client = rpc.get_client(target, version_cap=version_cap)

2. rpc_server

nova/cmd/conductor.py 使用 Service 实例启动服务

1
2
3
4
# oslo_service.launch 创建 launcher
service.serve(server, workers=workers)
# 调用 launcher.wait 等待子进程终止
service.wait()

nova/service.py 实际调用 oslo_service 的 launch 函数,创建绿色线程(greenthread)或进程,最终调用 Service 实例的 start 方法

1
2
3
4
5
6
7
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))

_launcher = service.launch(CONF, server, workers=workers,
restart_method='mutate')

nova/service.py Service 实例的 start 方法创建 rpc_server 和 dispatcher;设置周期任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 创建 rpc server 以及 dispatcher
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()

...

if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None

self.tg.add_dynamic_timer(self.periodic_tasks,
initial_delay=initial_delay,
periodic_interval_max=
self.periodic_interval_max)

收到消息后主要由 oslo_messaging 进行解析和处理,核心是 oslo_messaging/rpc/dispatcher.py

incoming 是 AMQP 消息格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def dispatch(self, incoming):
"""Dispatch an RPC message to the appropriate endpoint method.

:param incoming: incoming message
:type incoming: IncomingMessage
:raises: NoSuchMethod, UnsupportedVersion
"""
message = incoming.message
ctxt = incoming.ctxt

method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')

# NOTE(danms): This event and watchdog thread are used to send
# call-monitoring heartbeats for this message while the call
# is executing if it runs for some time. The thread will wait
# for the event to be signaled, which we do explicitly below
# after dispatching the method call.
completion_event = eventletutils.Event()
watchdog_thread = threading.Thread(target=self._watchdog,
args=(completion_event, incoming))
if incoming.client_timeout:
# NOTE(danms): The client provided a timeout, so we start
# the watchdog thread. If the client is old or didn't send
# a timeout, we just never start the watchdog thread.
watchdog_thread.start()

found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target

if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue

if hasattr(endpoint, method):
if self.access_policy.is_allowed(endpoint, method):
try:
# 分派,调用函数
return self._do_dispatch(endpoint, method, ctxt, args)
finally:
completion_event.set()
if incoming.client_timeout:
watchdog_thread.join()

found_compatible = True

if found_compatible:
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version, method=method)

oslo_messaging/rpc/dispatcher.py 调用函数

1
2
3
4
5
6
7
8
def _do_dispatch(self, endpoint, method, ctxt, args):
ctxt = self.serializer.deserialize_context(ctxt)
new_args = dict()
for argname, arg in args.items():
new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
func = getattr(endpoint, method)
result = func(ctxt, **new_args)
return self.serializer.serialize_entity(ctxt, result)

发送消息的实现都在 nova/conductor/rpcapi.py 中,cctxt.call 同步调用,cctxt.cast 异步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def object_class_action_versions(self, context, objname, objmethod,
object_versions, args, kwargs):
cctxt = self.client.prepare()
return cctxt.call(context, 'object_class_action_versions',
objname=objname, objmethod=objmethod,
object_versions=object_versions,
args=args, kwargs=kwargs)

def cache_images(self, ctxt, aggregate, image_ids):
version = '1.21'
if not self.client.can_send_version(version):
raise exception.NovaException('Conductor RPC version pin does not '
'allow cache_images() to be called')
cctxt = self.client.prepare(version=version)
cctxt.cast(ctxt, 'cache_images', aggregate=aggregate,
image_ids=image_ids)

由 oslo_messaging/rpc/client.py 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately. See RPCClient.cast()."""
msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)

self._check_version_cap(msg.get('version'))

try:
self.transport._send(self.target, msg_ctxt, msg,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)

def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
if self.target.fanout:
raise exceptions.InvalidTarget('A call cannot be used with fanout',
self.target)

msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)

timeout = self.timeout
if self.timeout is None:
timeout = self.conf.rpc_response_timeout

cm_timeout = self.call_monitor_timeout

self._check_version_cap(msg.get('version'))

try:
result = \
self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)

return self.serializer.deserialize_entity(ctxt, result)

关于周期任务,nova/scheduler/manager.py 中使用 @periodic_task.periodic_task 装饰的方法将会被周期调用,从 scheduler 的调试日志可以看到周期任务的运行

1
2
3
4
********************************************************************* log_opt_values /home/jck/.local/lib/python3.6/site-packages/oslo_config/cfg.py:2591
2021-05-18 05:53:17.030 3501 DEBUG oslo_service.periodic_task [req-66b43add-49c7-4f33-8f6b-1e33cb9f0123 - - - - -] Running periodic task SchedulerManager._run_periodic_tasks run_periodic_tasks /home/jck/.local/lib/python3.6/site-packages/oslo_service/periodic_task.py:211
2021-05-18 05:53:39.072 3500 DEBUG oslo_service.periodic_task [req-8436b3e2-96d1-4f15-8ae8-b596cee05536 - - - - -] Running periodic task SchedulerManager._run_periodic_tasks run_periodic_tasks /home/jck/.local/lib/python3.6/site-packages/oslo_service/periodic_task.py:211
...

对应于 nova/scheduler/manager.py

1
2
3
4
@periodic_task.periodic_task(spacing=CONF.scheduler.periodic_task_interval,
run_immediately=True)
def _run_periodic_tasks(self, context):
self.driver.run_periodic_tasks(context)

执行周期任务的有 nova-scheduler 和 nova-compute ,主要功能是计算节点 nova-compute 上报资源信息,nova-scheduler 读取数据库,更新资源信息缓存。

参阅