博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
docker1.12-containerd源码分析
阅读量:6423 次
发布时间:2019-06-23

本文共 10080 字,大约阅读时间需要 33 分钟。

     从原openstack转型至docker已有一段时间。更稳定的使用docker了解docker的各流程,从源代码层面了解下containerd。本文基于docker 1.12版本,从1.11开始docker已拆分docker daemon

containerd源码流程图

源码接口调用详情

A)第一步从ctr入口至API接口

checkpoint(用于快照,docker目前该功能不完善)

1
2
3
list --> 
/types
.API
/ListCheckpoint 
create --> 
/types
.API
/CreateCheckpoint 
delete --> 
/types
.API
/DeleteCheckpoint


containers

1
2
3
4
5
6
7
8
9
list、state -->
/types
.API
/State
pause、resume、update -->
/types
.API
/UpdateContainer
create --> 
/types
.API
/CreateContainer
stats --> 
/types
.API
/Stats
watch 
-->
/types
.API
/State 
/types
.API
/Events
exec 
-->
/types
.API
/Events 
/types
.API
/AddProcess 
/types
.API
/UpdateProcess
kill 
-->
/types
.API
/Signal
start -->
/types
.API
/Events 
、 
/types
.API
/CreateContainer 
/types
.API
/UpdateProcess
update -->
/types
.API
/UpdateContainer

events

1
/types
.API
/Events

state

1
/types
.API
/State

version

1
/types
.API
/GetServerVersion 
--
return 
result

B)第二步从API接口至supervisor任务处理

1
注:API--server.go --> daemon – supervisor.go(handleTask func)

checkpoint

1
2
3
/types
.API
/ListCheckpoint 
(supervisor.GetContainersTask)--> getContainers 
/types
.API
/CreateCheckpoint 
--> createCheckpoint
/types
.API
/DeleteCheckpoint 
--> deleteCheckpoint

containers

1
2
3
4
5
6
7
/types
.API
/State 
/types
.API
/Stats 
(supervisor.GetContainersTask)--> getContainers 
/types
.API
/UpdateContainer 
(supervisor.UpdateTask)-->updateContainer
/types
.API
/CreateContainer 
(supervisor.StartTask)-->start
/types
.API
/Events 
--> Events --
return 
result
/types
.API
/AddProcess 
-->addProcess
/types
.API
/UpdateProcess 
-->updateProcess
/types
.API
/Signal 
-->signal

 

C)第三步从任务队列至runtime至runc

checkpoint

1
2
3
getContainers -- 
return 
result
createCheckpoint -->(runtime)CheckPoint -->
exec
.Command(c.runtime,arg....)
deleteCheckpoint -->(runtime)DeleteCheckpoint -- 
return 
result

containers

1
2
3
4
5
6
getContainers -- 
return 
result
updateContainer -->(runtime)Resume Pause UpdateResources-->
exec
.Command(c.runtime,arg....)
start -->(runtime supervisor
/worker
.go) Start -->
exec
.Command(c.shim,c.
id
,c.bundle,c.runtime)
addProcess -->(runtime) 
exec 
--> 
exec
.Command(c.shim,c.
id
,c.bundle,c.runtime)
updateProcess -->
return 
result
signal -->
return 
result


createContainer示例

deamon启动监听tasks及startTasks进程

a)进入main.go main方法调用daemon方法 

1
2
3
4
5
6
7
app.Action = func(context *cli.Context) {
       
if 
err := daemon(context); err != nil {
              
logrus.Fatal(err)
       
}
  
  
}

b)进入main.go daemon方法

1
2
3
4
5
6
7
8
for 
i := 0; i < 10; i++ {
       
wg.Add(1)
       
w := supervisor.NewWorker(sv, wg)
       
go w.Start()
}
if 
err := sv.Start(); err != nil {
       
return 
err
}

c)初始化supervisor/worker.go NewWorker并启动监听startTask并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func NewWorker(s *Supervisor, wg *
sync
.WaitGroup) Worker {
       
return 
&worker{
              
s:  s,
              
wg: wg,
       
}
}
  
  
  
func (w *worker) Start() {
       
defer w.wg.Done()
       
for 
t := range w.s.startTasks {
              
started := 
time
.Now()
              
process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
              
if 
err != nil {
                     
logrus.WithFields(logrus.Fields{
                            
"error"
: err,
                            
"id"
:    t.Container.ID(),
                     
}).Error(
"containerd: start container"
)
                     
t.Err <- err
                     
evt := &DeleteTask{
                            
ID:      t.Container.ID(),
                            
NoEvent: 
true
,
                            
Process: process,
                     
}
                     
w.s.SendTask(evt)
                     
continue
              
}

d)启动supervisor/supervisor.go task监听task并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
func (s *Supervisor) Start() error {
       
logrus.WithFields(logrus.Fields{
              
"stateDir"
:    s.stateDir,
              
"runtime"
:     s.runtime,
              
"runtimeArgs"
: s.runtimeArgs,
              
"memory"
:      s.machine.Memory,
              
"cpus"
:        s.machine.Cpus,
       
}).Debug(
"containerd: supervisor running"
)
       
go func() {
              
for 
i := range s.tasks {
                     
s.handleTask(i)
  
              
}

containers容器创建示例

Ctl控制台命令入口

ctr/main.go  containersCommand 

1
2
3
4
5
6
7
8
9
10
11
execCommand,
killCommand,
listCommand,
pauseCommand,
resumeCommand,
startCommand,
stateCommand,
statsCommand,
watchCommand,
  
updateCommand,

ctr/container.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var startCommand = cli.Command{
       
Name:      
"start"
,
       
Usage:     
"start a container"
,
       
ArgsUsage: "ID BundlePath”, ————…...
  
  
  
        
events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/
       
if 
err != nil {
              
fatal(err.Error(), 1)
       
}
       
if 
_, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/
              
fatal(err.Error(), 1)
       
}
       
if 
context.Bool(
"attach"
) {
              
go func() {
                     
io.Copy(stdin, os.Stdin)
                     
if 
_, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/
                            
Id:         
id
,
                            
Pid:        
"init"
,
                            
CloseStdin: 
true
,
                     
}); err != nil {
                            
fatal(err.Error(), 1)
                     
}
                     
restoreAndCloseStdin()
              
}()
              
if 
tty 
{
                     
resize(
id
"init"
, c)
                     
go func() {
                            
s := 
make
(chan os.Signal, 64)
                            
signal.Notify(s, syscall.SIGWINCH)
                            
for 
range s {
                                   
if 
err := resize(
id
"init"
, c); err != nil {
                                          
log.Println(err)
                                   
}
                            
}
                     
}()
              
}
              
waitForExit(c, events, 
id
"init"
, restoreAndCloseStdin)
       
}
  
},

api处理

api/grpc/types/api.pb.go 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (c *aPIClient) Events(ctx context.Context, 
in 
*EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {
       
stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc, 
"/types.API/Events"
, opts...)
       
if 
err != nil {
              
return 
nil, err
       
}
       
x := &aPIEventsClient{stream}
       
if 
err := x.ClientStream.SendMsg(
in
); err != nil {
              
return 
nil, err
       
}
       
if 
err := x.ClientStream.CloseSend(); err != nil {
              
return 
nil, err
       
}
       
return 
x, nil
}
  
  
  
func (c *aPIClient) CreateContainer(ctx context.Context, 
in 
*CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
       
out := new(CreateContainerResponse)
       
err := grpc.Invoke(ctx, 
"/types.API/CreateContainer"
in
, out, c.cc, opts...)
       
if 
err != nil {
              
return 
nil, err
       
}
       
return 
out, nil
}
  
  
  
func (c *aPIClient) UpdateProcess(ctx context.Context, 
in 
*UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {
       
out := new(UpdateProcessResponse)
       
err := grpc.Invoke(ctx, 
"/types.API/UpdateProcess"
in
, out, c.cc, opts...)
       
if 
err != nil {
              
return 
nil, err
       
}
       
return 
out, nil
}

api/grpc/types/api.pb.go 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
       
m := new(EventsRequest)
       
if 
err := stream.RecvMsg(m); err != nil {
              
return 
err
       
}
       
return 
srv.(APIServer).Events(m, &aPIEventsServer{stream})
}
  
  
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
       
in 
:= new(CreateContainerRequest)
       
if 
err := dec(
in
); err != nil {
              
return 
nil, err
       
}
       
if 
interceptor == nil {
              
return 
srv.(APIServer).CreateContainer(ctx, 
in
)
       
}
       
info := &grpc.UnaryServerInfo{
              
Server:     srv,
              
FullMethod: 
"/types.API/CreateContainer"
,
       
}
       
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
              
return 
srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
       
}
       
return 
interceptor(ctx, 
in
, info, handler)
}
  
  
  
func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
       
in 
:= new(UpdateProcessRequest)
       
if 
err := dec(
in
); err != nil {
              
return 
nil, err
       
}
       
if 
interceptor == nil {
              
return 
srv.(APIServer).UpdateProcess(ctx, 
in
)
       
}
       
info := &grpc.UnaryServerInfo{
              
Server:     srv,
              
FullMethod: 
"/types.API/UpdateProcess"
,
       
}
       
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
              
return 
srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
       
}
       
return 
interceptor(ctx, 
in
, info, handler)

api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
  
events := s.sv.Events(t, r.StoredOnly, r.Id) 
  
  
  
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
  
s.sv.SendTask(e)
  
apiC, err := createAPIContainer(r.Container, 
false
  
  
  
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
       
e := &supervisor.UpdateProcessTask{}
       
e.ID = r.Id
       
e.PID = r.Pid
       
e.Height = int(r.Height)
       
e.Width = int(r.Width)
       
e.CloseStdin = r.CloseStdin
       
s.sv.SendTask(e)
       
if 
err := <-e.ErrorCh(); err != nil {
              
return 
nil, err
       
}
       
return 
&types.UpdateProcessResponse{}, nil
}

supervisor/create.go

1
2
func (s *Supervisor) start(t *StartTask) error {
s.startTasks <- task

supervisor/worker.go

1
2
3
func (w *worker) Start() {
       
defer w.wg.Done()
       
for 
t := range w.s.startTasks {

runtime/container.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
       
processRoot := filepath.Join(c.root, c.
id
, InitProcessID)
       
if 
err := os.Mkdir(processRoot, 0755); err != nil {
              
return 
nil, err
       
}
       
cmd := 
exec
.Command(c.shim,
              
c.
id
, c.bundle, c.runtime,
  
       
) ---执行 docker-containerd-shim命令
  
       
cmd.Dir = processRoot
       
cmd.SysProcAttr = &syscall.SysProcAttr{
              
Setpgid: 
true
,
       
}
       
spec, err := c.readSpec()
       
if 
err != nil {
              
return 
nil, err
       
}
       
config := &processConfig{
              
checkpoint:  checkpointPath,
              
root:        processRoot,
              
id
:          InitProcessID,
              
c:           c,
              
stdio:       s,
              
spec:        spec,
              
processSpec: specs.ProcessSpec(spec.Process),
       
}
       
p, err := newProcess(config)
       
if 
err != nil {
              
return 
nil, err
       
}
       
if 
err := c.createCmd(InitProcessID, cmd, p); err != nil {
              
return 
nil, err
       
}
       
return 
p, nil
}

containerd-shim接收后处理

containerd-shim/main.go 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func start(log *os.File) error {
  
  
  
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if 
err != nil {
       
return 
err
}
defer func() {
       
if 
err := p.Close(); err != nil {
              
writeMessage(log, 
"warn"
, err)
       
}
}()
if 
err := p.create(); err != nil {
       
p.delete()
       
return 
err
}

containerd-shim/process.go跳转执行runc命令 

1
2
3
func (p *process) create() error {
  
cmd := 
exec
.Command(p.runtime, args...)
本文转自 qwjhq 51CTO博客,原文链接:http://blog.51cto.com/bingdian/1893470

转载地址:http://mggra.baihongyu.com/

你可能感兴趣的文章
Java 动态太极图 DynamicTaiChi (整理)
查看>>
微信公众平台后台编辑器上线图片缩放和封面图裁剪功能
查看>>
git使用教程2-更新github上代码
查看>>
张掖百公里,再次折戟
查看>>
SAP QM Batch to Batch的转移过账事务中的Vendor Batch
查看>>
本期最新 9 篇论文,帮你完美解决「读什么」的问题 | PaperDaily #19
查看>>
图解SSIS监视文件夹并自动导入数据
查看>>
Lucene.Net 2.3.1开发介绍 —— 四、搜索(一)
查看>>
MyBatis Review——开发Dao的方法
查看>>
技术研发国产化进程加快 看传感器企业如何展示十八般武艺
查看>>
技术助力第三次革命
查看>>
《HTML与CSS入门经典(第8版)》——2.6 总结
查看>>
新手指南:在 Ubuntu 和 Fedora 上安装软件包
查看>>
在 CentOS7.0 上搭建 Chroot 的 Bind DNS 服务器
查看>>
大型网站的 HTTPS 实践(二):HTTPS 对性能的影响
查看>>
《Swift 权威指南》——第6章,第6.10节嵌套函数
查看>>
《自己动手做交互系统》——1.3 本章小结
查看>>
Mobile devices bundled with malware?
查看>>
《JavaScript面向对象精要》——1.5 访问属性
查看>>
《Python数据可视化编程实战》—— 第 1 章 准备工作环境
查看>>