阿里云函數(shù)計(jì)算對接kafka實(shí)戰(zhàn)

來源: 新鈦云服
作者:秦鳴
時(shí)間:2021-06-08
17390
阿里云函數(shù)計(jì)算式是按調(diào)用次數(shù)來計(jì)算費(fèi)用的,無需服務(wù)器就能進(jìn)行后端的一些處理,對于調(diào)用次數(shù)不是特別多的場景比較適用??梢怨?jié)省成本,但是如果調(diào)用次數(shù)很多對服務(wù)器性能要求不是特別高的情況下建議還是用ECS來部署服務(wù)。

阿里云函數(shù)計(jì)算式是按調(diào)用次數(shù)來計(jì)算費(fèi)用的,無需服務(wù)器就能進(jìn)行后端的一些處理,對于調(diào)用次數(shù)不是特別多的場景比較適用??梢怨?jié)省成本,但是如果調(diào)用次數(shù)很多對服務(wù)器性能要求不是特別高的情況下建議還是用ECS來部署服務(wù)。



函數(shù)計(jì)算對接kafka實(shí)戰(zhàn)


新建服務(wù)和函數(shù)

新建服務(wù)比較容易基本只要輸入一個(gè)服務(wù)名稱即可

新建函數(shù)使用HTTP 函數(shù),新手也可以嘗試使用模版函數(shù)


配置函數(shù)

這里主要注意函數(shù)入口這個(gè)配置,Handler 的格式為 [文件名].[函數(shù)名]。例如創(chuàng)建函數(shù)時(shí)指定的 Handler 為 index.handler,那么文件名為 index.py,入口函數(shù)為 handler。


配置觸發(fā)器

這里只是為了測試所以認(rèn)證方式使用anonymous。


代碼開發(fā)

首先要在代碼根目錄下安裝python連接kafka的依賴包

pip install -t . kafka-python


然后開始編寫setting.py這個(gè)是連接kafka的配置信息

vim setting.py

kafka_setting = {
   'bootstrap_servers': ["XXX", "XXX", "XXX"],   #kafka連接地址
   'topic_name': 'XXX',  #使用的topic名稱
   'consumer_id': 'XXX'  #使用的Consumer Group
}


然后開始編寫入口文件index.py

# -*- coding: utf-8 -*-
# 導(dǎo)入連接kafka所需依賴包和配置
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError

import setting

conf = setting.kafka_setting

print conf

HELLO_WORLD = b"Hello world! "

def handler(environ, start_response):    
   context = environ['fc.context']    
   request_uri = environ['fc.request_uri']    
   for k, v in environ.items():        
       if k.startswith("HTTP_"):            
           # process custom request headers            
           pass    

   # get request_body    
   try:        
       request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
   except (ValueError):        
       request_body_size = 0  
   request_body = environ['wsgi.input'].read(request_body_size)  

   # get request_method    
   request_method = environ['REQUEST_METHOD']    

   # get path info    
   path_info = environ['PATH_INFO']    

   # get server_protocol    
   server_protocol = environ['SERVER_PROTOCOL']    

   # get content_type    
   try:        
       content_type = environ['CONTENT_TYPE']    
   except (KeyError):        
       content_type = " "    

   # get query_string    
   try:        
       query_string = environ['QUERY_STRING']        
   except (KeyError):        
       query_string = " "  

   print 'request_body: {}'.format(request_body)  
   print 'method: {} path: {}  query_string: {} server_protocol: {} '.format(request_method, path_info,  query_string, server_protocol)    
   # do something here    

   status = '200 OK'    
   response_headers = [('Content-type', 'text/plain')]    
   start_response(status, response_headers)

   #以下是kafka操作部分,發(fā)送一個(gè)消息到kafka
   producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                       api_version = (0,10),
                       retries=5)

   partitions = producer.partitions_for(conf['topic_name'])
   print 'Topic 下分區(qū): %s' % partitions

   try:
       future = producer.send(conf['topic_name'], 'hello aliyun-kafka test!')
       future.get()
       print 'send message succeed.'
   except KafkaError, e:
       print 'send message failed.'
       print e    
   # return value must be iterable    
   return [HELLO_WORLD]

備注:上面的函數(shù)腳本分別參考了阿里云的函數(shù)計(jì)算Hello World示例和python連接kafka示例,參考url如下:


Hello World示例https://help.aliyun.com/document_detail/74756.html?spm=a2c4g.11186623.6.573.2be3dc876slGKm

python連接kafka示例 https://code.aliyun.com/alikafka/aliware-kafka-demos/blob/master/kafka-python-demo/vpc/aliyun_kafka_producer.py

Hello World示例必須要,因?yàn)檫@個(gè)是Http函數(shù),需要加http請求參數(shù)和返回的狀態(tài)碼等信息。environ, start_response這兩個(gè)參數(shù)是必選項(xiàng)。不加參數(shù)無法通過。具體信息可以查看幫助文檔。


代碼上傳

通過文件夾和壓縮包均可以上傳代碼


環(huán)境設(shè)置

配置服務(wù)允許訪問VPC內(nèi)資源


配置kafka所在安全組允許被函數(shù)計(jì)算內(nèi)網(wǎng)地址訪問

由于函數(shù)計(jì)算地址用的是域名,通過ping得到函數(shù)計(jì)算內(nèi)網(wǎng)地址,然后用大段的子網(wǎng)覆蓋它,避免ip變更導(dǎo)致無法訪問


配置函數(shù)計(jì)算訪問kafka權(quán)限,這里給了管理權(quán)限,生產(chǎn)環(huán)境可以根據(jù)實(shí)際情況配自定義權(quán)限


測試運(yùn)行


直接點(diǎn)擊代碼執(zhí)行下的執(zhí)行按鈕就能運(yùn)行


查看結(jié)果


總結(jié)


函數(shù)計(jì)算對很多人來說還是一個(gè)新鮮的工具。未來也許會(huì)成為一種趨勢。因?yàn)樗恍枰?wù)器,且在公司初期量不大的前提下能節(jié)省成本和運(yùn)維維護(hù)的成本。

立即登錄,閱讀全文
版權(quán)說明:
本文內(nèi)容來自于新鈦云服,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個(gè)人觀點(diǎn),不代表快出海對觀點(diǎn)贊同或支持。如有侵權(quán),請聯(lián)系管理員(zzx@kchuhai.com)刪除!
相關(guān)文章
阿里云助力《誅仙世界》端游正式開服!
阿里云助力《誅仙世界》端游正式開服!
近?,完美世界游戲歷時(shí)多年打造的新國?仙俠MMORPG端游《誅仙世界》在阿?云上正式開服。
阿里云
云服務(wù)
2024-12-292024-12-29
一文詳解阿里云AI大基建
一文詳解阿里云AI大基建
面向AI時(shí)代,阿里云基礎(chǔ)設(shè)施是如何創(chuàng)新與發(fā)展的?計(jì)算、網(wǎng)絡(luò)、存儲(chǔ)、服務(wù)器、集群、可觀測等,阿里云全新升級的AI Infra到底有哪些重磅更新?
阿里云
云服務(wù)
2024-11-022024-11-02
AI時(shí)代云安全新范式,阿里云安全能力全線升級!
AI時(shí)代云安全新范式,阿里云安全能力全線升級!
AI時(shí)代,云安全面臨著新的挑戰(zhàn),不僅要持續(xù)面對以往的傳統(tǒng)問題,更需要全新理念落地于產(chǎn)品設(shè)計(jì)、技術(shù)演進(jìn)、架構(gòu)設(shè)計(jì),才能實(shí)現(xiàn)效果、性能、和成本的最優(yōu)解。
AI
阿里云
云服務(wù)
2024-09-272024-09-27
連續(xù)四年!阿里云領(lǐng)跑中國公有云大數(shù)據(jù)平臺(tái)
連續(xù)四年!阿里云領(lǐng)跑中國公有云大數(shù)據(jù)平臺(tái)
近日,國際數(shù)據(jù)公司(IDC)發(fā)布《中國大數(shù)據(jù)平臺(tái)市場份額,2023:數(shù)智融合時(shí)代的真正到來》報(bào)告——2023年中國大數(shù)據(jù)平臺(tái)公有云服務(wù)市場規(guī)模達(dá)72.2億元人民幣,其中阿里巴巴市場份額保持領(lǐng)先,占比達(dá)40.2%,連續(xù)四年排名第一。
阿里云
云服務(wù)
2024-09-182024-09-18
優(yōu)質(zhì)服務(wù)商推薦
更多
掃碼登錄
打開掃一掃, 關(guān)注公眾號后即可登錄/注冊
加載中
二維碼已失效 請重試
刷新
賬號登錄/注冊
個(gè)人VIP
小程序
快出海小程序
公眾號
快出海公眾號
商務(wù)合作
商務(wù)合作
投稿采訪
投稿采訪
出海管家
出海管家