定制运行时-j9九游会登录
定制运行时场景说明
运行时负责运行函数的代码、从环境变量读取处理程序名称以及从functiongraph运行时api读取调用事件。运行时会将事件数据传递给函数处理程序,并将来自处理程序的响应返回给functiongraph。
functiongraph支持自定义编程语言运行时,即支持使用定制运行时。可以使用可执行文件(名称为bootstrap)的形式将运行时包含在函数的程序包中,当调用一个functiongraph函数时,它将运行函数的处理程序方法。
自定义的运行时在functiongraph执行环境中运行,支持shell脚本,也支持可在linux执行的二进制文件。
约束与限制
- 使用定制运行时上传函数业务代码,只支持上传zip包形式,所需函数的依赖包均需合入zip包中。
- 制作zip包时,bootstrap文件必须在根目录,保证解压后,直接出现可执行文件,才能正常运行。
- 对于go runtime,必须在编译之后打zip包,编译后的动态库文件名称必须与函数执行入口的插件名称保持一致,例如:动态库名称为testplugin.so,则“函数执行入口”命名为testplugin.handler。
运行时文件bootstrap说明
如果程序包中存在一个名为bootstrap的文件,functiongraph将执行该文件。如果引导文件未找到或不是可执行文件,函数在调用后将返回错误。
运行时代码负责完成一些初始化任务,它将在一个循环中处理调用事件,直到它被终止。
初始化任务将对函数的每个实例运行一次以准备用于处理调用的环境。
运行时接口说明
functiongraph提供了用于自定义运行时的http api来接收来自函数的调用事件,并在functiongraph执行环境中发送回响应数据。
- 获取调用
方法 – get
路径 – http://$runtime_api_addr/v1/runtime/invocation/request
该接口用来获取下一个事件,响应正文包含事件数据。响应标头包含信息如下。
表1 响应标头信息说明 参数
说明
x-cff-request-id
请求id。
x-cff-access-key
租户accesskey,使用该特殊变量需要给函数配置委托。
x-cff-auth-token
token,使用该特殊变量需要给函数配置委托。
x-cff-invoke-type
函数执行类型。
x-cff-secret-key
租户secretkey,使用该特殊变量需要给函数配置委托。
x-cff-security-token
security token,使用该特殊变量需要给函数配置委托。
- 调用响应
方法 – post
路径 – http://$runtime_api_addr/v1/runtime/invocation/response/$request_id
该接口将正确的调用响应发送到functiongraph。在运行时调用函数处理程序后,将来自函数的响应发布到调用响应路径。
- 错误上报
方法 – post
路径 – http://$runtime_api_addr/v1/runtime/invocation/error/$request_id
$request_id为获取事件的响应header中x-cff-request-id变量值,说明请参见表1。
$runtime_api_addr为系统环境变量,说明请参见表2。
该接口将错误的调用响应发送到functiongraph。在运行时调用函数处理程序后,将来自函数的响应发布到调用响应路径。
运行时环境变量说明
表2是functiongraph执行环境中运行时相关的环境变量列表,除此之外,还有用户自定义的环境变量,都可以在函数代码中直接使用。
|
键 |
值说明 |
|---|---|
|
runtime_project_id |
projectid |
|
runtime_func_name |
函数名称 |
|
runtime_func_version |
函数的版本 |
|
runtime_package |
函数组 |
|
runtime_handler |
函数执行入口 |
|
runtime_timeout |
函数超时时间 |
|
runtime_userdata |
用户通过环境变量传入的值 |
|
runtime_cpu |
分配的cpu数 |
|
runtime_memory |
分配的内存 |
|
runtime_code_root |
包含函数代码的目录 |
|
runtime_api_addr |
自定义运行时api的主机和端口 |
用户定义的环境变量也同functiongraph环境变量一样,可通过环境变量获取方式直接获取用户定义环境变量。
示例说明
此示例包含1个文件(bootstrap文件),该文件都在bash中实施。运行时将从部署程序包加载函数脚本。它使用两个变量来查找脚本。
引导文件bootstrap内容如下:
#!/bin/sh
set -o pipefail
#processing requests loop
while true
do
headers="$(mktemp)"
# get an event
event_data=$(curl -ss -ld "$headers" -x get "http://$runtime_api_addr/v1/runtime/invocation/request")
# get request id from response header
request_id=$(grep -fi x-cff-request-id "$headers" | tr -d '[:space:]' | cut -d: -f2)
if [ -z "$request_id" ]; then
continue
fi
# process request data
response="echoing request: hello world!"
# put response
curl -x post "http://$runtime_api_addr/v1/runtime/invocation/response/$request_id" -d "$response"
done
加载脚本后,运行时将在一个循环中处理事件。它使用运行时api从functiongraph检索调用事件,将事件传递到处理程序,并将响应发布回给functiongraph。
为了获取请求id,运行时会将来自api响应的标头保存到临时文件,并从该文件读取x-cff-request-id读取请求头的请求唯一标识。将获取到的事件数据做处理并响应发布返回functiongraph。
go源码示例,需要通过编译后才可执行。
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"strings"
"time"
)
var (
getrequesturl = os.expandenv("http://${runtime_api_addr}/v1/runtime/invocation/request")
putresponseurl = os.expandenv("http://${runtime_api_addr}/v1/runtime/invocation/response/{request_id}")
puterrorresponseurl = os.expandenv("http://${runtime_api_addr}/v1/runtime/invocation/error/{request_id}")
requestidinvaliderror = fmt.errorf("request id invalid")
norequestavailableerror = fmt.errorf("no request available")
putresponsefailederror = fmt.errorf("put response failed")
functionpackage = os.getenv("runtime_package")
functionname = os.getenv("runtime_func_name")
functionversion = os.getenv("runtime_func_version")
client = http.client{
transport: &http.transport{
dialcontext: (&net.dialer{
timeout: 3 * time.second,
}).dialcontext,
},
}
)
func main() {
// main loop for processing requests.
for {
requestid, header, payload, err := getrequest()
if err != nil {
time.sleep(50 * time.millisecond)
continue
}
result, err := processrequestevent(requestid, header, payload)
err = putresponse(requestid, result, err)
if err != nil {
log.printf("put response failed, err: %s.", err.error())
}
}
}
// event processing function
func processrequestevent(requestid string, header http.header, evtbytes []byte) ([]byte, error) {
log.printf("processing request '%s'.", requestid)
result := fmt.sprintf("function: %s:%s:%s, request id: %s, headers: % v, payload: %s", functionpackage, functionname,
functionversion, requestid, header, string(evtbytes))
var event functionevent
err := json.unmarshal(evtbytes, &event)
if err != nil {
return (&errormessage{errortype: "invalid event", errormessage: "invalid json formated event"}).tojsonbytes(), err
}
return (&apigformatresult{statuscode: 200, body: result}).tojsonbytes(), nil
}
func getrequest() (string, http.header, []byte, error) {
resp, err := client.get(getrequesturl)
if err != nil {
log.printf("get request error, err: %s.", err.error())
return "", nil, nil, err
}
defer resp.body.close()
// get request id from response header
requestid := resp.header.get("x-cff-request-id")
if requestid == "" {
log.printf("request id not found.")
return "", nil, nil, requestidinvaliderror
}
payload, err := ioutil.readall(resp.body)
if err != nil {
log.printf("read request body error, err: %s.", err.error())
return "", nil, nil, err
}
if resp.statuscode != 200 {
log.printf("get request failed, status: %d, message: %s.", resp.statuscode, string(payload))
return "", nil, nil, norequestavailableerror
}
log.printf("get request ok.")
return requestid, resp.header, payload, nil
}
func putresponse(requestid string, payload []byte, err error) error {
var body io.reader
if payload != nil && len(payload) > 0 {
body = bytes.newbuffer(payload)
}
url := ""
if err == nil {
url = strings.replace(putresponseurl, "{request_id}", requestid, -1)
} else {
url = strings.replace(puterrorresponseurl, "{request_id}", requestid, -1)
}
resp, err := client.post(strings.replace(url, "{request_id}", requestid, -1), "", body)
if err != nil {
log.printf("put response error, err: %s.", err.error())
return err
}
defer resp.body.close()
responsepayload, err := ioutil.readall(resp.body)
if err != nil {
log.printf("read request body error, err: %s.", err.error())
return err
}
if resp.statuscode != 200 {
log.printf("put response failed, status: %d, message: %s.", resp.statuscode, string(responsepayload))
return putresponsefailederror
}
return nil
}
type functionevent struct {
type string `json:"type"`
name string `json:"name"`
}
type apigformatresult struct {
statuscode int `json:"statuscode"`
isbase64encoded bool `json:"isbase64encoded"`
headers map[string]string `json:"headers,omitempty"`
body string `json:"body,omitempty"`
}
func (result *apigformatresult) tojsonbytes() []byte {
data, err := json.marshalindent(result, "", " ")
if err != nil {
return nil
}
return data
}
type errormessage struct {
errortype string `json:"errortype"`
errormessage string `json:"errormessage"`
}
func (errmsg *errormessage) tojsonbytes() []byte {
data, err := json.marshalindent(errmsg, "", " ")
if err != nil {
return nil
}
return data
}
代码中的环境变量说明如下,请参见表3。
|
环境变量 |
说明 |
|---|---|
|
runtime_func_name |
函数名称 |
|
runtime_func_version |
函数版本 |
|
runtime_package |
函数组 |
相关文档
意见反馈
文档内容是否对您有帮助?
如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨