阿里云函數(shù)計(jì)算式是按調(diào)用次數(shù)來計(jì)算費(fèi)用的,無需服務(wù)器就能進(jìn)行后端的一些處理,對于調(diào)用次數(shù)不是特別多的場景比較適用??梢怨?jié)省成本,但是如果調(diào)用次數(shù)很多對服務(wù)器性能要求不是特別高的情況下建議還是用ECS來部署服務(wù)。
新建服務(wù)比較容易基本只要輸入一個(gè)服務(wù)名稱即可
新建函數(shù)使用HTTP 函數(shù),新手也可以嘗試使用模版函數(shù)
配置函數(shù)
這里主要注意函數(shù)入口這個(gè)配置,Handler 的格式為 [文件名].[函數(shù)名]。例如創(chuàng)建函數(shù)時(shí)指定的 Handler 為 index.handler,那么文件名為 index.py,入口函數(shù)為 handler。
配置觸發(fā)器
這里只是為了測試所以認(rèn)證方式使用anonymous。
首先要在代碼根目錄下安裝python連接kafka的依賴包
pip install -t . kafka-python
然后開始編寫setting.py這個(gè)是連接kafka的配置信息
vim setting.py
kafka_setting = {
'bootstrap_servers': ["XXX", "XXX", "XXX"], #kafka連接地址
'topic_name': 'XXX', #使用的topic名稱
'consumer_id': 'XXX' #使用的Consumer Group
}
然后開始編寫入口文件index.py
# -*- coding: utf-8 -*-
# 導(dǎo)入連接kafka所需依賴包和配置
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError
import setting
conf = setting.kafka_setting
print conf
HELLO_WORLD = b"Hello world! "
def handler(environ, start_response):
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith("HTTP_"):
# process custom request headers
pass
# get request_body
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
request_body = environ['wsgi.input'].read(request_body_size)
# get request_method
request_method = environ['REQUEST_METHOD']
# get path info
path_info = environ['PATH_INFO']
# get server_protocol
server_protocol = environ['SERVER_PROTOCOL']
# get content_type
try:
content_type = environ['CONTENT_TYPE']
except (KeyError):
content_type = " "
# get query_string
try:
query_string = environ['QUERY_STRING']
except (KeyError):
query_string = " "
print 'request_body: {}'.format(request_body)
print 'method: {} path: {} query_string: {} server_protocol: {} '.format(request_method, path_info, query_string, server_protocol)
# do something here
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
#以下是kafka操作部分,發(fā)送一個(gè)消息到kafka
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
api_version = (0,10),
retries=5)
partitions = producer.partitions_for(conf['topic_name'])
print 'Topic 下分區(qū): %s' % partitions
try:
future = producer.send(conf['topic_name'], 'hello aliyun-kafka test!')
future.get()
print 'send message succeed.'
except KafkaError, e:
print 'send message failed.'
print e
# return value must be iterable
return [HELLO_WORLD]
備注:上面的函數(shù)腳本分別參考了阿里云的函數(shù)計(jì)算Hello World示例和python連接kafka示例,參考url如下:
Hello World示例https://help.aliyun.com/document_detail/74756.html?spm=a2c4g.11186623.6.573.2be3dc876slGKm
python連接kafka示例 https://code.aliyun.com/alikafka/aliware-kafka-demos/blob/master/kafka-python-demo/vpc/aliyun_kafka_producer.py
Hello World示例必須要,因?yàn)檫@個(gè)是Http函數(shù),需要加http請求參數(shù)和返回的狀態(tài)碼等信息。environ, start_response這兩個(gè)參數(shù)是必選項(xiàng)。不加參數(shù)無法通過。具體信息可以查看幫助文檔。
通過文件夾和壓縮包均可以上傳代碼
配置服務(wù)允許訪問VPC內(nèi)資源
配置kafka所在安全組允許被函數(shù)計(jì)算內(nèi)網(wǎng)地址訪問
由于函數(shù)計(jì)算地址用的是域名,通過ping得到函數(shù)計(jì)算內(nèi)網(wǎng)地址,然后用大段的子網(wǎng)覆蓋它,避免ip變更導(dǎo)致無法訪問
配置函數(shù)計(jì)算訪問kafka權(quán)限,這里給了管理權(quán)限,生產(chǎn)環(huán)境可以根據(jù)實(shí)際情況配自定義權(quán)限
直接點(diǎn)擊代碼執(zhí)行下的執(zhí)行按鈕就能運(yùn)行
查看結(jié)果
函數(shù)計(jì)算對很多人來說還是一個(gè)新鮮的工具。未來也許會(huì)成為一種趨勢。因?yàn)樗恍枰?wù)器,且在公司初期量不大的前提下能節(jié)省成本和運(yùn)維維護(hù)的成本。