使用 GitLab CI/CD 和 Terraform 实现 Lambda 以进行 SFTP 集成、Go 中的 S Databricks

使用 gitlab ci/cd 和 terraform 实现 lambda 以进行 sftp 集成、go 中的 s databricks

通过 databricks 中的流程自动化降低成本

我的客户需要降低在 databricks 上运行的流程的成本。 databricks 负责的功能之一是从各种 sftp 收集文件,解压缩它们并将它们放入数据湖中。

自动化数据工作流程是现代数据工程的重要组成部分。在本文中,我们将探讨如何使用 gitlab ci/cd 和 terraform 创建 aws lambda 函数,该函数允许 go 应用程序连接到 sftp 服务器、收集文件、将其存储在 amazon s3 中,并最终在 databricks 上触发作业。这种端到端的流程对于依赖高效数据集成和自动化的系统至关重要。

阅读本文需要什么

  • 具有项目存储库的 gitlab 帐户。
  • 有权创建 lambda、s3 和 iam 资源的 aws 账户。
  • 具有创建和运行作业权限的 databricks 帐户。
  • go、terraform 和 gitlab ci/cd 的基础知识。

第 1 步:准备 go 应用程序

首先创建一个 go 应用程序,该应用程序将连接到 sftp 服务器来收集文件。使用 github.com/pkg/sftp 等软件包建立 sftp 连接,使用 github.com/aws/aws-sdk-go 与 aws s3 服务交互。

package main

import (
 "fmt"
 "log"
 "os"
 "path/filepath"

 "github.com/pkg/sftp"
 "golang.org/x/crypto/ssh"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
 // configuração do cliente sftp
 user := "seu_usuario_sftp"
 pass := "sua_senha_sftp"
 host := "endereco_sftp:22"
 config := &ssh.clientconfig{
  user: user,
  auth: []ssh.authmethod{
   ssh.password(pass),
  },
  hostkeycallback: ssh.insecureignorehostkey(),
 }

 // conectar ao servidor sftp
 conn, err := ssh.dial("tcp", host, config)
 if err != nil {
  log.fatal(err)
 }
 client, err := sftp.newclient(conn)
 if err != nil {
  log.fatal(err)
 }
 defer client.close()

 // baixar arquivos do sftp
 remotefilepath := "/path/to/remote/file"
 localdir := "/path/to/local/dir"
 localfilepath := filepath.join(localdir, filepath.base(remotefilepath))
 dstfile, err := os.create(localfilepath)
 if err != nil {
  log.fatal(err)
 }
 defer dstfile.close()

 srcfile, err := client.open(remotefilepath)
 if err != nil {
  log.fatal(err)
 }
 defer srcfile.close()

 if _, err := srcfile.writeto(dstfile); err != nil {
  log.fatal(err)
 }

 fmt.println("arquivo baixado com sucesso:", localfilepath)

 // configuração do cliente s3
 sess := session.must(session.newsession(&aws.config{
  region: aws.string("us-west-2"),
 }))
 uploader := s3manager.newuploader(sess)

 // carregar arquivo para o s3
 file, err := os.open(localfilepath)
 if err != nil {
  log.fatal(err)
 }
 defer file.close()

 _, err = uploader.upload(&s3manager.uploadinput{
  bucket: aws.string("seu-bucket-s3"),
  key:    aws.string(filepath.base(localfilepath)),
  body:   file,
 })
 if err != nil {
  log.fatal("falha ao carregar arquivo para o s3:", err)
 }

 fmt.println("arquivo carregado com sucesso no s3")
}

步骤 2:配置 terraform

terraform 将用于在 aws 上配置 lambda 函数和所需资源。使用创建 lambda 函数、iam 策略和 s3 存储桶所需的配置创建 main.tf 文件。

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    version = "2012-10-17",
    statement = [
      {
        action = "sts:assumerole",
        effect = "allow",
        principal = {
          service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "a policy that allows a lambda function to access s3 and sftp resources"

  policy = jsonencode({
    version = "2012-10-17",
    statement = [
      {
        action = [
          "s3:listbucket",
          "s3:getobject",
          "s3:putobject",
        ],
        effect = "allow",
        resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      sftp_host     = "endereco_sftp",
      sftp_user     = "seu_usuario_sftp",
      sftp_password = "sua_senha_sftp",
      s3_bucket     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}

步骤 3:配置 gitlab ci/cd

gitlab 中,在 .gitlab-ci.yml 文件中定义 ci/cd 管道。该管道应包括测试 go 应用程序的步骤、运行 terraform 来配置基础设施以及必要时的清理步骤。

stages:
  - test
  - build
  - deploy

variables:
  s3_bucket: "seu-bucket-s3"
  aws_default_region: "us-east-1"
  tf_version: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$private_key" | tr -d '' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -h 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$tf_version
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production

第 4 步:与 databricks 集成

将文件上传到 s3 后,lambda 函数必须触发 databricks 中的作业。这可以使用 databricks api 启动现有作业来完成。

package main

import (
 "bytes"
 "encoding/json"
 "fmt"
 "net/http"
)

// estrutura para a requisição de iniciar um job no databricks
type databricksjobrequest struct {
 jobid int `json:"job_id"`
}

// função para acionar um job no databricks
func triggerdatabricksjob(databricksinstance string, token string, jobid int) error {
 url := fmt.sprintf("https://%s/api/2.0/jobs/run-now", databricksinstance)
 requestbody, _ := json.marshal(databricksjobrequest{jobid: jobid})
 req, err := http.newrequest("post", url, bytes.newbuffer(requestbody))
 if err != nil {
  return err
 }

 req.header.set("content-type", "application/json")
 req.header.set("authorization", fmt.sprintf("bearer %s", token))

 client := &http.client{}
 resp, err := client.do(req)
 if err != nil {
  return err
 }
 defer resp.body.close()

 if resp.statuscode != http.statusok {
  return fmt.errorf("failed to trigger databricks job, status code: %d", resp.statuscode)
 }

 return nil
}

func main() {
 // ... (código existente para conectar ao sftp e carregar no s3)

 // substitua pelos seus valores reais
 databricksinstance := "your-databricks-instance"
 databrickstoken := "your-databricks-token"
 databricksjobid := 123 // id do job que você deseja acionar

 // acionar o job no databricks após o upload para o s3
 err := triggerdatabricksjob(databricksinstance, databrickstoken, databricksjobid)
 if err != nil {
  log.fatal("erro ao acionar o job do databricks:", err)
 }

 fmt.println("job do databricks acionado com sucesso")
}

第 5 步:运行管道

将代码推送到 gitlab 存储库以供管道运行。检查所有步骤是否已成功完成,lambda 函数是否正常运行并与 s3 和 databricks 正确交互。

一旦您拥有完整的代码并配置了 .gitlab-ci.yml 文件,您就可以按照以下步骤运行管道:

  • 将您的代码推送到 gitlab 存储库:
  git add .
  git commit -m "adiciona função lambda para integração sftp, s3 e databricks"
  git push origin master
git add .
git commit -m "adiciona função lambda para integração sftp, s3 e databricks"
git push origin master
´´´

  • gitlab ci/cd 将检测新的提交并自动启动管道。
  • 通过访问存储库的 ci/cd 部分来跟踪 gitlab 中管道的执行。
  • 如果所有阶段都成功,您的 lambda 函数将被部署并可供使用。

请记住,您需要在 gitlab ci/cd 中配置环境变量来存储敏感信息,例如访问令牌和私钥。这可以在 gitlab 项目的“设置”>“ci/cd”>“变量”部分中完成。

此外,请确保 databricks 令牌具有触发作业所需的权限,并且该作业具有提供的 id。

结论

使用 gitlab ci/cd、terraform 和 aws lambda 等工具可以显着简化自动化数据工程任务。通过遵循本文中概述的步骤,您可以创建一个强大的系统,自动执行 sftp、s3 和 databricks 之间的数据收集和集成,所有这些都具有 go 的效率和简单性。通过这种方法,您将有能力解决以下问题。大规模数据集成的挑战。

我的联系人:

领英 - airton lira junior

imasters - airton lira junior

aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation


以上就是使用 GitLab CI/CD 和 Terraform 实现 Lambda 以进行 SFTP 集成、Go 中的 S Databricks的详细内容,更多请关注其它相关文章!