本文共 10080 字,大约阅读时间需要 33 分钟。
从原openstack转型至docker已有一段时间。更稳定的使用docker了解docker的各流程,从源代码层面了解下containerd。本文基于docker 1.12版本,从1.11开始docker已拆分docker daemon
1 2 3 | list --> /types .API /ListCheckpoint create --> /types .API /CreateCheckpoint delete --> /types .API /DeleteCheckpoint |
1 2 3 4 5 6 7 8 9 | list、state --> /types .API /State pause、resume、update --> /types .API /UpdateContainer create --> /types .API /CreateContainer stats --> /types .API /Stats watch --> /types .API /State 、 /types .API /Events exec --> /types .API /Events 、 /types .API /AddProcess 、 /types .API /UpdateProcess kill --> /types .API /Signal start --> /types .API /Events 、 /types .API /CreateContainer 、 /types .API /UpdateProcess update --> /types .API /UpdateContainer |
1 | /types .API /Events |
1 | /types .API /State |
1 | /types .API /GetServerVersion -- return result |
1 | 注:API--server.go --> daemon – supervisor.go(handleTask func) |
1 2 3 | /types .API /ListCheckpoint (supervisor.GetContainersTask)--> getContainers /types .API /CreateCheckpoint --> createCheckpoint /types .API /DeleteCheckpoint --> deleteCheckpoint |
1 2 3 4 5 6 7 | /types .API /State /types .API /Stats (supervisor.GetContainersTask)--> getContainers /types .API /UpdateContainer (supervisor.UpdateTask)-->updateContainer /types .API /CreateContainer (supervisor.StartTask)-->start /types .API /Events --> Events -- return result /types .API /AddProcess -->addProcess /types .API /UpdateProcess -->updateProcess /types .API /Signal -->signal |
1 2 3 | getContainers -- return result createCheckpoint -->(runtime)CheckPoint --> exec .Command(c.runtime,arg....) deleteCheckpoint -->(runtime)DeleteCheckpoint -- return result |
1 2 3 4 5 6 | getContainers -- return result updateContainer -->(runtime)Resume Pause UpdateResources--> exec .Command(c.runtime,arg....) start -->(runtime supervisor /worker .go) Start --> exec .Command(c.shim,c. id ,c.bundle,c.runtime) addProcess -->(runtime) exec --> exec .Command(c.shim,c. id ,c.bundle,c.runtime) updateProcess --> return result signal --> return result |
createContainer示例
deamon启动监听tasks及startTasks进程
a)进入main.go main方法调用daemon方法
1 2 3 4 5 6 7 | app.Action = func(context *cli.Context) { if err := daemon(context); err != nil { logrus.Fatal(err) } } |
b)进入main.go daemon方法
1 2 3 4 5 6 7 8 | for i := 0; i < 10; i++ { wg.Add(1) w := supervisor.NewWorker(sv, wg) go w.Start() } if err := sv.Start(); err != nil { return err } |
c)初始化supervisor/worker.go NewWorker并启动监听startTask并处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | func NewWorker(s *Supervisor, wg * sync .WaitGroup) Worker { return &worker{ s: s, wg: wg, } } func (w *worker) Start() { defer w.wg.Done() for t := range w.s.startTasks { started := time .Now() process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) if err != nil { logrus.WithFields(logrus.Fields{ "error" : err, "id" : t.Container.ID(), }).Error( "containerd: start container" ) t.Err <- err evt := &DeleteTask{ ID: t.Container.ID(), NoEvent: true , Process: process, } w.s.SendTask(evt) continue } |
d)启动supervisor/supervisor.go task监听task并处理
1 2 3 4 5 6 7 8 9 10 11 12 13 | func (s *Supervisor) Start() error { logrus.WithFields(logrus.Fields{ "stateDir" : s.stateDir, "runtime" : s.runtime, "runtimeArgs" : s.runtimeArgs, "memory" : s.machine.Memory, "cpus" : s.machine.Cpus, }).Debug( "containerd: supervisor running" ) go func() { for i := range s.tasks { s.handleTask(i) } |
ctr/main.go containersCommand
1 2 3 4 5 6 7 8 9 10 11 | execCommand, killCommand, listCommand, pauseCommand, resumeCommand, startCommand, stateCommand, statsCommand, watchCommand, updateCommand, |
ctr/container.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | var startCommand = cli.Command{ Name: "start" , Usage: "start a container" , ArgsUsage: "ID BundlePath”, ————…... events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/ if err != nil { fatal(err.Error(), 1) } if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/ fatal(err.Error(), 1) } if context.Bool( "attach" ) { go func() { io.Copy(stdin, os.Stdin) if _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/ Id: id , Pid: "init" , CloseStdin: true , }); err != nil { fatal(err.Error(), 1) } restoreAndCloseStdin() }() if tty { resize( id , "init" , c) go func() { s := make (chan os.Signal, 64) signal.Notify(s, syscall.SIGWINCH) for range s { if err := resize( id , "init" , c); err != nil { log.Println(err) } } }() } waitForExit(c, events, id , "init" , restoreAndCloseStdin) } }, |
api/grpc/types/api.pb.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | func (c *aPIClient) Events(ctx context.Context, in *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) { stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc, "/types.API/Events" , opts...) if err != nil { return nil, err } x := &aPIEventsClient{stream} if err := x.ClientStream.SendMsg( in ); err != nil { return nil, err } if err := x.ClientStream.CloseSend(); err != nil { return nil, err } return x, nil } func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) { out := new(CreateContainerResponse) err := grpc.Invoke(ctx, "/types.API/CreateContainer" , in , out, c.cc, opts...) if err != nil { return nil, err } return out, nil } func (c *aPIClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) { out := new(UpdateProcessResponse) err := grpc.Invoke(ctx, "/types.API/UpdateProcess" , in , out, c.cc, opts...) if err != nil { return nil, err } return out, nil } |
api/grpc/types/api.pb.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(EventsRequest) if err := stream.RecvMsg(m); err != nil { return err } return srv.(APIServer).Events(m, &aPIEventsServer{stream}) } func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CreateContainerRequest) if err := dec( in ); err != nil { return nil, err } if interceptor == nil { return srv.(APIServer).CreateContainer(ctx, in ) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/types.API/CreateContainer" , } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest)) } return interceptor(ctx, in , info, handler) } func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UpdateProcessRequest) if err := dec( in ); err != nil { return nil, err } if interceptor == nil { return srv.(APIServer).UpdateProcess(ctx, in ) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/types.API/UpdateProcess" , } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest)) } return interceptor(ctx, in , info, handler) |
api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error { events := s.sv.Events(t, r.StoredOnly, r.Id) func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) { s.sv.SendTask(e) apiC, err := createAPIContainer(r.Container, false ) func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) { e := &supervisor.UpdateProcessTask{} e.ID = r.Id e.PID = r.Pid e.Height = int(r.Height) e.Width = int(r.Width) e.CloseStdin = r.CloseStdin s.sv.SendTask(e) if err := <-e.ErrorCh(); err != nil { return nil, err } return &types.UpdateProcessResponse{}, nil } |
supervisor/create.go
1 2 | func (s *Supervisor) start(t *StartTask) error { s.startTasks <- task |
supervisor/worker.go
1 2 3 | func (w *worker) Start() { defer w.wg.Done() for t := range w.s.startTasks { |
runtime/container.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | func (c *container) Start(checkpointPath string, s Stdio) (Process, error) { processRoot := filepath.Join(c.root, c. id , InitProcessID) if err := os.Mkdir(processRoot, 0755); err != nil { return nil, err } cmd := exec .Command(c.shim, c. id , c.bundle, c.runtime, ) ---执行 docker-containerd-shim命令 cmd.Dir = processRoot cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true , } spec, err := c.readSpec() if err != nil { return nil, err } config := &processConfig{ checkpoint: checkpointPath, root: processRoot, id : InitProcessID, c: c, stdio: s, spec: spec, processSpec: specs.ProcessSpec(spec.Process), } p, err := newProcess(config) if err != nil { return nil, err } if err := c.createCmd(InitProcessID, cmd, p); err != nil { return nil, err } return p, nil } |
containerd-shim接收后处理
containerd-shim/main.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | func start(log *os.File) error { p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2)) if err != nil { return err } defer func() { if err := p.Close(); err != nil { writeMessage(log, "warn" , err) } }() if err := p.create(); err != nil { p.delete() return err } |
containerd-shim/process.go跳转执行runc命令
1 2 3 | func (p *process) create() error { cmd := exec .Command(p.runtime, args...) |
转载地址:http://mggra.baihongyu.com/