Jex’s Note

Go Scheduler

What is the Go Scheduler?

Go scheduler’s purpose is to efficiently distribute goroutines over multiple OS threads.

Kernel thread is expensive; therefore, reducing and reusing kernel threads are keys.

  • initial goroutine stack comsumes 2 KB memory
  • default thread stack comsumes 1 MB memory

Scheduling Basics

P (Processor)

  • It is given a Logical Processor (P) for every virtual core.
  • Runtime keeps track of each G and maps them onto Logical Processors.
  • OS threads run on at most GOMAXPROCS number of processors.

M (Machine, OS thread)

  • M must have an associated P to execute G.

G (Goroutine)

  • G keeps track of its stack and status.
  • Status
    • Idle: It was just allocated and has not yet been initialised.
    • Runnable: It is on a runqueue.
    • Running:
      • It is assigned an M.
      • It executes user code.
    • Syscall:
      • It is assigned an M
      • It is not on a runqueue.
      • It is executing a system call and not executing user code.
    • Waiting: Is is blocked in the runtime. (e.g. channel wait queue)
    • Moribund unused
    • Dead
    • Enqueue unused
    • Copy stack

Runqueue

  • Local runqueue
    • Every P has its own local runqueue.
    • Once goroutines are created, they are tracked in Ps local runqueue (FIFO).
  • Global runqueue
    • All Ps share a global runqueue.
    • A lower priority runqueue
    • Go scheduler runs a background thread called sysmon to detect long-running goroutines (10+ ms, e.g. CPU-bound computation) and put them into global runqueue.

P2 has no work to do and P1 has no work to steal (we will talk about work stealing later).

Global Runqueue
┏━━━━┓
┃ G7 ┃
┗━━━━┛

┏━━━━┓  ┏━━━━┓  ┏━━━━┓         ┏━━━━┓  ┏━━━━┓  ┏━━━━┓
┃ P1 ┣━━┃ M1 ┣━━┃ G1 ┃         ┃ P2 ┣━━┃ M2 ┣━━┃    ┃
┗━┳━━┛  ┗━━━━┛  ┗━━━━┛         ┗━┳━━┛  ┗━━━━┛  ┗━━━━┛
  ┃                              ┃
  ┃    Local Runqueue            ┃    Local Runqueue
  ┃    ┏━━━━━━━┓                 ┃    ┏━━━━━━━┓
  ┗━━━━┫ empty ┃                 ┗━━━━┫ empty ┃
       ┗━━━━━━━┛                      ┗━━━━━━━┛

Since there was runnable G G7 in global runqueue, P2 got G7 from there.

Global Runqueue
┏━━━━━━━┓
┃ empty ┃
┗━━━━━━━┛

┏━━━━┓  ┏━━━━┓  ┏━━━━┓         ┏━━━━┓  ┏━━━━┓  ┏━━━━┓
┃ P1 ┣━━┃ M1 ┣━━┃ G1 ┃         ┃ P2 ┣━━┃ M2 ┣━━┃ G7 ┃
┗━┳━━┛  ┗━━━━┛  ┗━━━━┛         ┗━┳━━┛  ┗━━━━┛  ┗━━━━┛
  ┃                              ┃
  ┃    Local Runqueue            ┃    Local Runqueue
  ┃    ┏━━━━━━━┓                 ┃    ┏━━━━━━━┓
  ┗━━━━┫ empty ┃                 ┗━━━━┫ empty ┃
       ┗━━━━━━━┛                      ┗━━━━━━━┛

Context Switching

┏━━━━━━━━━━━━━━━━━━━━━━┓
┃  CPU Core            ┃
┃┏━━━━━━━━━━━━━━━━━━━━┓┃
┃┃ Thread 1           ┣╋━━┓
┃┃ ┏━━━━┓┏━━━━┓┏━━━━┓ ┃┃  ┃
┃┃ ┃ G1 ┃┃ G2 ┃┃ G3 ┃ ┃┃  ┃
┃┃ ┗━━━━┛┗━━━━┛┗━━━━┛ ┃┃  ┣━━>OS scheduler
┃┗━━━━━━━━━━━━━━━━━━━━┛┃  ┃
┃┏━━━━━━━━━━━━━━━━━━━━┓┃  ┃
┃┃ Thread 2           ┣╋━━┛
┃┃ ┏━━━━┓┏━━━━┓┏━━━━┓ ┃┃
┃┃ ┃ G4 ┃┃ G5 ┃┃ G6 ┃ ┣╋━━━━━>Go scheduler
┃┃ ┗━━━━┛┗━━━━┛┗━━━━┛ ┃┃
┃┗━━━━━━━━━━━━━━━━━━━━┛┃
┗━━━━━━━━━━━━━━━━━━━━━━┛
  • OS Threads are context-switched on and off a core.
  • Goroutines are context-switched on and off an M.
  • Classes of events in Go programs that allow the scheduler to make scheduling decisions.
    • The use of the keyword go (goroutine)
    • Garbage collection
    • System calls
    • Synchronisation
    • Network I/O

Asynchronous System Calls

netpoller

  • Use interface of network poller provided by OS to deal with network sockets.
    • MacOS: kqueue
    • Linux: epoll
    • Windows: IOCP (IoCompletionPort)
  • Prevent goroutines from blocking the M when syscalls are made.
  • Use its own thread to do network I/O. (no need to create new M)

G1 ran on M1 and made a network call, so it was moved to the netpoller. And P1 got G2 from its local runqueue and G2 was context-switched on M1. Until G1 finish its network call, it will be moved back to the P1’s local runqueue.

netpoller
┏━━━━┓
┃ G1 ┃
┗━━━━┛

┏━━━━┓  ┏━━━━┓  ┏━━━━┓
┃ P1 ┣━━┃ M1 ┣━━┃ G2 ┃
┗━┳━━┛  ┗━━━━┛  ┗━━━━┛
  ┃
  ┃    Local Runqueue
  ┃    ┏━━━━┓  ┏━━━━┓  ┏━━━━┓
  ┗━━━━┫ G3 ┣━━┃ G4 ┣━━┃ G5 ┃
       ┗━━━━┛  ┗━━━━┛  ┗━━━━┛

Synchronous System Calls

  • Goroutine making the syscall will block the M.
    • System call e.g. open a file
    • Synchronisation (atomic, mutex, channel operation call)
  • Handoff
    • Background monitor thread detects threads which are blocked the M.
    • Start a new thread or unpark a idle thread.
    • Background monitor hands off the runqueue to new thread.

G1 ran on M1 and made a system call that blocked M1. M1 with G1 was moved off from P1, then M2 was created and replace M1. Then P1 got G2 from its local runqueue and G2 was context-switched on M2. Until G1 finish its system call, it will be moved back to P1’s local runqueue and M1 will park as a idle thread for future use.

┏━━━━┓  ┏━━━━┓
┃ M1 ┣━━┃ G1 ┃
┗━━━━┛  ┗━━━━┛

┏━━━━┓  ┏━━━━┓  ┏━━━━┓
┃ P1 ┣━━┃ M2 ┣━━┃ G2 ┃
┗━┳━━┛  ┗━━━━┛  ┗━━━━┛
  ┃
  ┃    Local Runqueue
  ┃    ┏━━━━┓  ┏━━━━┓  ┏━━━━┓
  ┗━━━━┫ G3 ┣━━┃ G4 ┣━━┃ G5 ┃
       ┗━━━━┛  ┗━━━━┛  ┗━━━━┛

Spinning Thread

For the reason that hand-off and thread parking/unparking increase latency, minimise these actions for optimal thread management.

The problem is that it is impossible to predict the future whether goroutines are comming. We do not want to just park a worker thread and then unpark it.

The solution is to maintain one “idle” thread on M for incoming goroutines, and this state of worker thread called “spinning”.

Work Stealing

  • Balance the goroutines across all the Ps in order to keep Ms efficient.
  • The rules for work stealing are as follows.
    • Check the local runqueue.
    • If not found, try to steal work from other Ps.
    • If not found, check the global runqueue.
    • If not found, poll network.

P2 has finished all work and it will try to steal work from P1.

┏━━━━┓  ┏━━━━┓  ┏━━━━┓         ┏━━━━┓  ┏━━━━┓  ┏━━━━┓
┃ P1 ┣━━┃ M1 ┣━━┃ G1 ┃         ┃ P2 ┣━━┃ M2 ┣━━┃    ┃
┗━┳━━┛  ┗━━━━┛  ┗━━━━┛         ┗━┳━━┛  ┗━━━━┛  ┗━━━━┛
  ┃                              ┃
  ┃    Local Runqueue            ┃    Local Runqueue
  ┃    ┏━━━━┓  ┏━━━━┓  ┏━━━━┓    ┃    ┏━━━━━━━┓
  ┗━━━━┫ G4 ┣━━┃ G5 ┣━━┃ G6 ┃    ┗━━━━┫ empty ┃
       ┗━━━━┛  ┗━━━━┛  ┗━━━━┛         ┗━━━━━━━┛

P2 stealed half of Gs (G4 and G5) from P1’s local runqueue and G4 was context-switched on M2.

┏━━━━┓  ┏━━━━┓  ┏━━━━┓         ┏━━━━┓  ┏━━━━┓  ┏━━━━┓
┃ P1 ┣━━┃ M1 ┣━━┃ G1 ┃         ┃ P2 ┣━━┃ M2 ┣━━┃ G4 ┃
┗━┳━━┛  ┗━━━━┛  ┗━━━━┛         ┗━┳━━┛  ┗━━━━┛  ┗━━━━┛
  ┃                              ┃
  ┃    Local Runqueue            ┃    Local Runqueue
  ┃    ┏━━━━┓                    ┃    ┏━━━━┓
  ┗━━━━┫ G6 ┃                    ┗━━━━┫ G5 ┃
       ┗━━━━┛                         ┗━━━━┛

Trace go scheduler

schedtrace

$ GOMAXPROCS=2 GODEBUG=schedtrace=1000 go run main.go
...
SCHED 2009ms: gomaxprocs=2 idleprocs=0 threads=4 spinningthreads=0 idlethreads=1 runqueue=0 [8 0]
...
  • gomaxprocs: Number of processors
  • idleprocs: Number of idle processors
  • threads: Number of threads
  • idlethreads: Number of idle threads
  • spinningthreads: Number of spinning threads
  • runqueue: Number of goroutines in the global runqueue
  • [0 0]: Number of goroutine of respective local runqueue

schedetail

$ GOMAXPROCS=2 GODEBUG=schedtrace=1000,scheddetail=1 go run main.go
...
SCHED 0ms: gomaxprocs=2 idleprocs=0 threads=4 spinningthreads=1 idlethreads=1 runqueue=0 gcwaiting=0 nmidlelocked=0 stopwait=0 sysmonwait=0
  P0: status=1 schedtick=0 syscalltick=0 m=3 runqsize=0 gfreecnt=0
  P1: status=0 schedtick=2 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
  M3: p=0 curg=-1 mallocing=0 throwing=0 preemptoff= locks=1 dying=0 helpgc=0 spinning=true blocked=false lockedg=-1
  M2: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpgc=0 spinning=false blocked=true lockedg=-1
  M1: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=1 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1
  M0: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpgc=0 spinning=false blocked=true lockedg=1
  G1: status=1(chan receive) m=-1 lockedm=0
  G2: status=4(force gc (idle)) m=-1 lockedm=-1
  G3: status=4(GC sweep wait) m=-1 lockedm=-1
...
  • 0: idle
  • 1: runnable
  • 2: running
  • 3: syscall
  • 4: waiting
  • 5: moribund unused
  • 6: dead
  • 7: enqueue unused
  • 8: copy stack

Ref:

Docker Swarm

Introduction

A distributed system tool,used to manage a cluster of Docker engines, allows you to deploy your services.

Features

  • Decentralised design: you can define its optimal state (number of replicas, network, ports, etc.)
  • Scaling (easy to scale or shrink your service)
  • Load Balancing (use overlay network)
  • Security (nodes communicate with each other over TLS)
  • Support roll-back
  • built in docker

Concepts

  • nodes & roles
    • manager leader: manager nodes elec a single leader to conduct orchestration tasks
    • manager
      • deploy your application by assigning tasks
      • maintain the desired state of the swarm
      • manager node is also a worker node by default, you can exclude it
    • worker
      • execute tasks
      • report current state of its assigned tasks to manager
    • one or more nodes can run on a server
    • a node can be both of a manger and a worker
  • services & tasks
    • a service: the definition of the tasks e.g. 1 service = 3 nginx replicas = 3 task
    • a task: a task is a running container on available node e.g. 1 task = 1 nginx
  • docker swarm use VXLAN (Virtual extensible LAN) to connect each workers just like in the LAN.
  • network
    • ingress: Exposes services to the external network.
    • overlay network manages communications among the Docker daemons.
    • docker_gwbridge: Created by docker, and it allows the containers to connect to the host.
    • IPVS: It is a load balancer implementation in the Linux kernel.

Networking

Routing Mesh

Request -> ingress network on published port -> any node (use IPVS to decide which service to go) -> ingress network (outer LB)-> overlay network (inner LB) -> node

Docker swarm command

Init

docker swarm init --advertise-addr {ip}    // local ip
  • This machine will be a manager (leader) by deafault.
  • This will generate a token command, you can execute this command at any machine you want to assign it as a worker node.

Show node list

docker node ls

Show network list

docker network ls

Create an overlay network

docker network create --opt encrypted --subnet 100.0.0.0/24 -d overlay {name}

Create service

docker service create --name nginx --network {network name} -p {ex_port:in_port} {image_name}
docker service create --name nginx --network {network name} -p 1080:80 nvbeta/swarm_nginx

Get started

  1. Use docker machine command to create 3 machines.
  2. SSH into one of machines and do swarm initialisation.
  3. SSH into the rest of machines and execute token command that is generated by step 2.
  4. Show node list to see if expected.

Setting

Open protocols and ports between the hosts

  • TCP port 2377 for cluster management communications (manager <-> worker)
  • TCP and UDP port 7946 for communication among nodes (worker <-> worker, any node participates in the swarm)
  • UDP port 4789 for overlay network traffic (worker <-> worker, nodes under the same overlay network)

Debugging Tools

strace

Installation

$ sudo apt install strace   #Debian/Ubuntu
# yum install strace        #RHEL/CentOS
# dnf install strace        #Fedora 22+

Trace linux command

Simply run a command with strace, tracing of all system calls, e.g. df -h.

sudo strace df -h

Trace PID

sudo strace -p 5206

Get summary of process

Generate a report of total time, calls, and errors for each system call.

sudo strace -c -p 5206

% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 90.41    0.001141         163         7           futex
  1.82    0.000023           4         6           write
  ...
  0.00    0.000000           0         2           gettimeofday
  0.00    0.000000           0         1           sendmmsg
------ ----------- ----------- --------- --------- ----------------
100.00    0.001262                    80         1 total

Print command time spent in system calls

sudo strace -T ls

open("/proc/filesystems", O_RDONLY)     = 3 <0.000024>
fstat(3, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0 <0.000006>

Trace only specific system calls

sudo strace -e trace=write df -h
sudo strace -e trace=open,close df -h
sudo strace -e trace=open,close,read,write df -h
sudo strace -e trace=all df -h

qualifier: signal, abbrev, verbose, raw, read, or write

Trace system calls based on a certain condition

Trace all system calls involving process management.

sudo strace -e trace=process ls

execve("/bin/ls", ["ls"], [/* 17 vars */]) = 0
arch_prctl(ARCH_SET_FS, 0x7fb0a721f840) = 0
astra-worker  conf  jobctl  library  logs  main.go  README.md  service  test  topic  utility  vendor
...

Trace all system calls that take a filename as an argument

sudo strace -e trace=file ls

execve("/bin/ls", ["ls"], [/* 17 vars */]) = 0
access("/etc/ld.so.nohwcap", F_OK)      = -1 ENOENT (No such file or directory)
access("/etc/ld.so.preload", R_OK)      = -1 ENOENT (No such file or directory)
...

Trace all system calls involving memory mapping, type.

sudo strace -e trace=memory ls

brk(0)                                  = 0x62b000
mmap(NULL, 8192, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f9c0b0a1000
mmap(NULL, 26186, PROT_READ, MAP_PRIVATE, 3, 0) = 0x7f9c0b09a000
...

Trace all network related system calls

sudo strace -e trace=network curl google.com

socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP) = 3
socket(PF_INET, SOCK_STREAM, IPPROTO_TCP) = 3
setsockopt(3, SOL_SOCKET, SO_KEEPALIVE, [1], 4) = 0
...

Trace all signals related system calls

sudo strace -e trace=signal -p 5206

--- SIGTERM {si_signo=SIGTERM, si_code=SI_USER, si_pid=16988, si_uid=0} ---             // `sudo kill 5206` trigger this line
+++ exited with 0 +++

Other arguments

Help

sudo strace -h ls

Print instruction pointer during system call

sudo strace -i ls

[00007f2d6f9f81e0] openat(AT_FDCWD, ".", O_RDONLY|O_NONBLOCK|O_DIRECTORY|O_CLOEXEC) = 3

Show time for each output line

sudo strace -t ls

01:37:47 getdents(3, /* 16 entries */, 32768) = 464

Redirect output to file

sudo strace -o /tmp/ls.log ls

Show debugging information for strace tool

sudo strace -d ls

TODO

System call

...
open("/etc/ld.so.cache", O_RDONLY|O_CLOEXEC) = 3
...

type of system call(argument) = return value

  • epoll_wait(5, {}, 128, 0) = 0
  • clock_gettime(CLOCK_MONOTONIC, {1696036, 90909291}) = 0
  • futex(0x1178818, FUTEX_WAKE, 1) = 1

ref:

Linux - Systemd

設定檔

/etc/systemd/system/test.service:

[Unit]
Description=test

[Service]
User=ec2-user
ExecStart=/bin/sh -c "/home/ec2-user/my_app/test >> /tmp/test.log 2>&1"
Restart=always

[Install]
WantedBy=multi-user.target

其他 [Service] 參數

WorkingDirectory=/home/ec2-user/my_app
  • config 權限 root 644 測試沒問題
  • WantedBy: directive is the most common way to specify how a unit should be enabled.

其他參數

主機啟動時在 network 後才啟動

[Unit]
After=syslog.target network.target remote-fs.target nss-lookup.target

指令

重讀設定檔(修改後都要執行)

sudo systemctl daemon-reload

查看 systemd 執行 log

sudo systemctl status test

Enable it to run at boot:

sudo systemctl enable test

Start it

sudo systemctl start test

Stop it

sudo systemctl stop test

log

預設 systemd log 路徑

/var/log/syslog

將標準輸出指定到指定的路徑

解法是拿 shell 包一層, 程式裡面的輸出會輸出到你指定的位置, 但 syslog 就不會有了, 只會剩 systemd 的一些 log

User=ec2-user   (指定 log 的 user)
ExecStart=/bin/sh -c "/app_path/test >> /tmp/test.log 2>&1"

以下的方式是沒有用的

ExecStart=/app_path/test >> /tmp/test.log

Linux - Upstart

介紹

upstart 用來管理你的服務, 在機器啟動後自動幫你啟動你的服務, 並且可以選擇在服務死掉後自動重新啟動它

ubuntu 16.04 不支援 upstart 改用 systemd

First Example

這是個簡單的範例只有執行 echo, 執行完也不會有任何背景的 process 在執行

建立 /etc/init/testjob.conf:

description "First example"
author "test"

start on runlevel [2345]

exec echo `date` >> /tmp/testjob.log

測試 conf 語法是否正確

$ init-checkconf /etc/init/testjob.conf
File /etc/init/testjob.conf: syntax ok

開始啟動

$ sudo service testjob start
testjob stop/waiting

顯示結果 /tmp/testjob.log:

Tue Aug 8 11:32:54 UTC 2017

Second Example

這個範例可以保持你想要的 process 在背景執行

首先我們要先有一個不會停止的程式供測試, /tmp/test.sh:

#!/bin/sh
while true
do
    echo `date` >> /tmp/testjob.log
    /bin/sleep 1s
done

給予執行權限

chmod +x /tmp/test.sh

建立 /etc/init/testjob.conf:

description "Second example"
author "test"

start on runlevel [2345]

exec /tmp/test.sh

開始啟動

$ sudo service testjob start
testjob stop/waiting

背景可以看到它一直在執行

PID   %CPU %MEM   RSS COMMAND
20755  0.0  0.0   648 test.sh

每秒可以看到執行結果 /tmp/testjob.log:

Tue Aug 8 12:43:02 UTC 2017

手動停止, 就會把背景執行的程式關掉了

sudo service testjob stop

但是如果它被 kill 掉, 就不會再背景執行

sudo kill -9 20755

讓程式死了也能重新執行

/etc/init/testjob.conf 最後面加上

respawn

即使被 kill 了, 它還是會被重新執行

指令

重新讀取 upstart 設定

initctl reload-configuration

amz-linux 剛新增 config 完要先執行此指令

手動開啟

sudo service testjob start
or
sudo start testjob
or
sudo initctl start testjob

手動關閉

sudo service testjob stop
or
sudo stop testjob
or
sudo initctl stop testjob

查看在 init 清單各程序運作的狀況

sudo initctl list

其他設定

User

它預設執行的 user 是 root, 你可以改成你想要的

setuid ec2-user

Respawn

30 秒內, 被 kill 3 次, 就會不再執行了

respawn limit 3 30

將 nginx 加到 upstart 清單裡

nginx 安裝完本身一開機就會執行, 先把它取消掉

sudo update-rc.d nginx disable

新增 /etc/init/nginx.conf:

# nginx

description "nginx http daemon"
author "George Shammas <georgyo@gmail.com>"

start on (filesystem and net-device-up IFACE!=lo)
stop on runlevel [!2345]

env DAEMON=/usr/sbin/nginx
env PID=/var/run/nginx.pid

expect fork
respawn
respawn limit 10 5
#oom never

pre-start script
        $DAEMON -t
        if [ $? -ne 0 ]
                then exit $?
        fi
end script

exec $DAEMON

[註] Fix nginx start up script was run before the network interface was up

start on (filesystem and net-device-up IFACE!=lo)

其他

  • upstart log path
    • ubuntu: /var/log/upstart/
    • amz-linux: 沒有

ref :

Messaging API

Slack

申請

登入後到此頁產生 token

Golang example

使用 nlopes/slack 這個 package

填入 Token 並 New 它的 client

api := slack.New("xoxp-5*******************************************************************a")

Group

groups, err := api.GetGroups(false)
if err != nil {
    fmt.Printf("%s\n", err)
    return
}
for _, group := range groups {
    fmt.Printf("ID: %s, Name: %s\n", group.ID, group.Name)
}

Channel

channels, err := api.GetChannels(false)
if err != nil {
    fmt.Printf("%s\n", err)
    return
}
for _, channel := range channels {
    fmt.Printf("Name: %s, ID: %s\n", channel.Name, channel.ID)
}

Message

params := slack.PostMessageParameters{}
channelID, timestamp, err := api.PostMessage("G*******0", "Message from golang", params)
if err != nil {
    fmt.Printf("%s\n", err)
    return
}
fmt.Printf("Message successfully sent to channel %s at %s\n", channelID, timestamp)

Image

attachment := slack.Attachment{
    Title:    "Test",
    ImageURL: "https://www.google.com.tw/images/branding/googlelogo/2x/googlelogo_color_120x44dp.png",
}
params := slack.PostMessageParameters{
    Username:    "Log Reporter",
    Attachments: []slack.Attachment{attachment},
}
_, _, err = api.PostMessage("G*******0", "Message from golang", params)

MQTT

Introduction

MQTT 是設計給 IOT 的 messaging protocal (pub/sub), 因為它很輕量且效率好, 在 client 實作上它比 AMQP 需要更少的資源, 另外 mqtt 本身沒有支援 authorization (這段需要在 server 端自已實作)

Install

Command line tool

node.js 的 mqtt tool

npm install mqtt --save
npm install mqtt -g

Command

Subscribe

mqtt sub -h my-mqtt-server.com -t my_topic -u mqtt_username -P mqtt_password

Publish

mqtt pub -h my-mqtt-server.com -t my_topic -u mqtt_username -P mqtt_password -m 'Hello world'

Via SSL

mqtt pub -h my-mqtt-server.com -p 8883 -C mqtts ...(略)

QoS

  • QoS 0 : received at most once : The packet is sent, and that’s it. There is no validation about whether it has been received.
  • QoS 1 : received at least once : The packet is sent and stored as long as the client has not received a confirmation from the server. MQTT ensures that it will be received, but there can be duplicates.
  • QoS 2 : received exactly once : Same as QoS 1 but there is no duplicates.

Golang - paho mqtt

介紹

這是 golang 實作的 mqtt package, 目前只有 client, 沒有實作 broker

操作

New & Connect & Close

func New(params map[string]string) (MQTT.Client, error) {
    opts := MQTT.NewClientOptions()
    opts.SetKeepAlive(4 * time.Second)
    opts.SetPingTimeout(2 * time.Second)
    opts.AddBroker(params["broker"])
    opts.SetClientID(params["client_id"])
    opts.SetUsername(params["username"])
    opts.SetPassword(params["password"])
    opts.SetAutoReconnect(true)
    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return nil, token.Error()
    }
    return client, nil
}

params := map[string]string{
    "broker":    "ssl://mqtt.example.com:8883",
    "client_id": "Client ID",
    "username":  "Username",
    "password":  "Password",
}
mqtt_client, err := New(params)
if err != nil {
    return errors.New("New mqtt err: " + err.Error())
}

// Close
defer mqtt_client.Disconnect(250)

Publish

topic := "test/mqtt"
for {
    msg := strconv.FormatInt(time.Now().UnixNano(), 10)
    if token := mqtt_client.Publish(pub_topic, 0, false, "Hello"); token.Wait() && token.Error() != nil {
        log.Println(token.Error())
    } else {
        log.Printf("Successfully published `%s` to `%s`\n", msg, topic)
    }
}

publish 建議用 token.WaitTimeout, 避免極小機率造成 deadlock, ref: https://github.com/eclipse/paho.mqtt.golang/issues/185

Subscribe

var msg_payload, msg_topic string
if token := mqtt_client.Subscribe("test/mqtt", 0, func(client MQTT.Client, msg MQTT.Message) {
    msg_payload = string(msg.Payload())
    msg_topic = string(msg.Topic())
}); token.Wait() && token.Error() != nil {
    log.Fatal(token.Error())
}
var pre_msg string
for {
    time.Sleep(300 * time.Millisecond)
    if msg_payload != pre_msg {
        fmt.Printf("Successfully received `%s` from `%s`\n", msg_payload, msg_topic)
    }
    pre_msg = msg_payload
}

Unsubscribe

if token := mqtt_client.Unsubscribe(sub_topic); token.Wait() && token.Error() != nil {
    return token.Error()
}

Golang - AWS

如何使用 AWS API

在使用它的任何一個 service 前要先準備好 credential 然候再建立 session,然候再跟 AWS services 互動,

Session 可以讓全部 AWS services 共用 (在使用各服務前會需要用 session 建立) ,最好 cache 起來,

每次要用之前再從 cache 拿出來, 避免每一次重新建立連線耗費資源。

[1] 初始化 credential

可以使用 aws-cli 指令 aws configure 幫你產生或手動建立檔案

~/.aws/config

[這裡填 profile name]
region = us-west-2
output = json

~/.aws/credentials

[這裡填 profile name]
aws_access_key_id = A******************A
aws_secret_access_key = 9**************************************V

常用的 credential 有幾種,以下會按照順序,哪個可以取到就使用

func GetAWSCredentialChain() (*credentials.Credentials, *aws.Config) {
    config := aws.NewConfig()
    var ProviderList []credentials.Provider = []credentials.Provider{
        &credentials.EnvProvider{},                                         # 讀取本機環境變數
        &credentials.SharedCredentialsProvider{                             # 讀取本機端實體檔案的 credentials
            Filename: "/Users/me/.aws/credentials",
            Profile:  "myProject",
        },
        &ec2rolecreds.EC2RoleProvider{                                      # IAM 賦與 EC2 Role 的權限
            Client: ec2metadata.New(session.New(), config),
        },
    }
    cred := credentials.NewChainCredentials(ProviderList)

    return cred, config
}

(或) credential 也可以直接帶入 access key 與 secret key

cred := credentials.NewStaticCredentials(accessKey, secretKey, ``)
svc := s3.New(session.New(),
    &aws.Config{
        Region:      aws.String(S3Region),
        Credentials: cred,
    },
)

[2] 初始化設定檔

func InitAWSConfig(region string) (*aws.Config, error) {
    cred, conf := GetAWSCredentialChain()
    val, err := cred.Get()
    if err != nil {
        logs.Error("InitAWSConfig error:", err)
    }
    logs.Debug("Cred ProviderName:", val.ProviderName)
    conf.WithRegion(region).WithCredentials(cred)
    return conf, nil
}

或直接返回 session

func NewSession(region string) *session.Session {
    cred, conf := GetAWSCredentialChain()
    conf.WithRegion(region).WithCredentials(cred)
    return session.New(conf)
}

[3] 建立 Session (e.g. dynamo db)

func GetDynamodbInstance() (*dynamodb.DynamoDB, error) {
    conf, err := InitAWSConfig("Dynamo DB 的 Region name e.g. us-west-2")
    if err != nil {
        logs.Error("GetDynamodbInstance error:", err)
        return nil, err
    }
    svc := dynamodb.New(session.New(), conf)

    return svc, nil
}

[4] 測試 (列出 DynamoDB 的 Table list)

svc, _ := services.GetDynamodbInstance()
result, err := svc.ListTables(&dynamodb.ListTablesInput{})
if err != nil {
    log.Println(err)
    return
}

log.Println("Tables:")
for _, table := range result.TableNames {
    log.Println(*table)
}

補充 :

sess := session.New(&aws.Config{
    Region:      aws.String("ap-northeast-1"),
    Credentials: credentials.NewSharedCredentials("/Users/home/.aws/credentials", "aws-cred-profile"),
    MaxRetries:  aws.Int(5),
})

svc := sqs.New(sess)

DynamoDB

型態

DynamoDB 有自定義的型態,像傳遞參數或接收參收要再將它的型態轉成我們熟悉的

B []byte `type:"blob"`

// A Boolean data type.
BOOL *bool `type:"boolean"`

// A Binary Set data type.
BS [][]byte `type:"list"`

// A List of attribute values.
L []*AttributeValue `type:"list"`

// A Map of attribute values.
M map[string]*AttributeValue `type:"map"`

// A Number data type.
N *string `type:"string"`

// A Number Set data type.
NS []*string `type:"list"`

// A Null data type.
NULL *bool `type:"boolean"`

// A String data type.
S *string `type:"string"`

// A String Set data type.
SS []*string `type:"list"`

GetItemInput

params = &dynamodb.GetItemInput{
    TableName: aws.String("user_contact"),
    Key: map[string]*dynamodb.AttributeValue{ // Required
        "user_id": { // Required
            S: aws.String(uid),
        },
    },
    AttributesToGet: []*string{             // 可省略,不加就是所有欄位都拿
        aws.String("contact_list"),
    },
}

svc, _ := services.GetDynamodbInstance()
resp, err := svc.GetItem(params)

contact_list := resp.Item["contact_list"].M
for key, val := range contact_list {
    logs.Info(key)
    logs.Info(val.S)
}

BatchGetItemInput

最多只可以取 100 筆

params = &dynamodb.BatchGetItemInput{
    RequestItems: map[string]*dynamodb.KeysAndAttributes{
        "user_contacts": {
            Keys: []map[string]*dynamodb.AttributeValue{
                {
                    "user_id": {S: aws.String("要查詢的 user_id")},
                },
                {
                    "user_id": {S: aws.String("要查詢的 user_id")},
                },
            },
            AttributesToGet: []*string{
                aws.String("contact_list"),
            },
        },
    },
}

svc, _ := services.GetDynamodbInstance()
resp, err := svc.BatchGetItem(params)

for _, val := range resp.Responses["user_contacts"] {
    contact_list := val["contact_list"].M           # 將型態轉回 Map
    for key, val := range contact_list {
        logs.Info(key)
        logs.Info(val.S)                            # 轉回 String
    }
}

補充 : 用迴圈組出 BatchGetItem 需要的參數

id_keys []map[string]*dynamodb.AttributeValue
for _, id := range ids {
    id_key := map[string]*dynamodb.AttributeValue{
        "id": {S: aws.String(id)},
    }
    id_keys = append(id_keys, id_key)
}

params := &dynamodb.BatchGetItemInput{
    RequestItems: map[string]*dynamodb.KeysAndAttributes{ // Required
        "users": { // Required
            Keys: id_keys,
            AttributesToGet: ...略...
        },
    },
}

BatchGetItem & GetItem key 不支援用 index, 只能用 primary key

PutItem

建立一個 list 裡面有兩個 map

var list []*dynamodb.AttributeValue
var item dynamodb.AttributeValue
item.M = map[string]*dynamodb.AttributeValue{
    "name":        {S: aws.String(name)},
}
list = append(list, &item)
list = append(list, &item)

params := &dynamodb.PutItemInput{
    TableName: aws.String("user_contacts"),
    Item: map[string]*dynamodb.AttributeValue{
        "name":         {S: aws.String("Tom")},
        "is_friend":    {BOOL: aws.Bool(true)},
        "contacts":     {L: list},
        "age":          {N: aws.String("15")},  // int 用 N (number), 但後面還是要轉成字串
    },
}

resp, err := svc.PutItem(params)    // 即使成功 resp 不會有回傳值

建立一個 Map{“time”: map{…}, “friends”: list: [map{…}]}

結構 :

    contacts : {
        "time": {
            "start": "",
            "end": "",
        },
        "friends": [
            {"name":"", "age":""},
            {"name":"", "age":""},
        ]
    }

var contacts, time map[string]*dynamodb.AttributeValue
var friends []*dynamodb.AttributeValue

// contacts - time
time = map[string]*dynamodb.AttributeValue{
    "start": {S: aws.String(start)},
    "end":   {S: aws.String(end)},
}

// contacts - friends
for _, d := range Friends {
    var item dynamodb.AttributeValue
    item.M = map[string]*dynamodb.AttributeValue{
        "name":     {S: aws.String(name)},
        "age":      {S: aws.String(age)},
    }
    friends = append(friends, &item)
}

// contacts (最外層)
contacts = map[string]*dynamodb.AttributeValue{
    "time":    {M: time},
    "friends": {L: friends},
}

Optional 的值,一定要宣告一個空物件,不要用 var

condition := map[string]*dynamodb.AttributeValue{}
if 是否有值 {
    time := &dynamodb.AttributeValue{
        M: map[string]*dynamodb.AttributeValue{
            "start": {S: aws.String(start)},
            "end":   {S: aws.String(end)},
        },
    }
    condition["time"] = time
}

params := &dynamodb.PutItemInput{
    TableName: aws.String("user_policy"),
    Item: map[string]*dynamodb.AttributeValue{
        "condition":   {M: condition},
    },
}

DeleteItem

params := &dynamodb.DeleteItemInput{
    TableName: aws.String("user_name"),
    Key: map[string]*dynamodb.AttributeValue{
        "user_id": {
            S: aws.String(t.UserID),
        },
    },
}

resp, err := svc.DeleteItem(params)     // 即使成功 resp 不會有回傳值

UpdateItem

Update 使用上每次都是整筆資料更新會比較簡單一點,也就是當沒有資料時,也要給它一個空物件,這樣 Update 時就可以把欄位刪除了,沒有空物件會引發錯誤

// info 為 optional 的值
info := map[string]*dynamodb.AttributeValue{}
friend_list []*string = []*string{aws.String(user_id)}

params := &dynamodb.UpdateItemInput{
    Key: map[string]*dynamodb.AttributeValue{ // Required
        "uid": { // Required
            S: aws.String(uid),
        },
    },
    TableName:        aws.String("user"), // Required
    UpdateExpression: aws.String(`
        SET map.#key = :key,
            #updated_at = :updated_at,
            #a_map = :a_map
        ADD #friend_list :friend_list
    `),
    ExpressionAttributeNames: map[string]*string{ // Required
        "#key":           aws.String("xxxkeyxxx"),      // map 如果沒有存在的 key 會新增, 但是 map 的欄位一定要先存在, 否則會有錯誤
        "#updated_at":    aws.String("updated_at"),
        "#a_map":         aws.String("a_map"),
        "#friend_list":   aws.String("friend_list"),
    },
    ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
        ":key":           {S: aws.String("xxxvalue")},
        ":updated_at":    {S: aws.String(update_at)},
        ":a_map":         {M: info},
        ":friend_list":   {SS: friend_list},
    },
    ReturnValues: aws.String("UPDATED_NEW"),            // (optional) 回傳更新後的資料
}

resp, err = svc.UpdateItem(params)      // 即使成功 resp 不會有回傳值

更新巢狀資料下的某個值, 假設 friends 下有很多 friend_id 為 key 的 map, UpdateExpression 這樣寫:

SET friends.#friend_id.name = :val

(optional) info map

if 當有資料再更新 {
    var time map[string]*dynamodb.AttributeValue
    time = map[string]*dynamodb.AttributeValue{
        "start": {S: aws.String(p.Condition.Time.Start)},
        "end":   {S: aws.String(p.Condition.Time.End)},
    }
    info["time"] = &dynamodb.AttributeValue{M: time}
}
  • 不存在的資料會新增
  • UpdateExpression 不能 ADD 一個 item 到 Map, 必須用 SET,但前提是欄位要先存在

update expression 其他說明

SET list[0] = :val1
REMOVE #m.nestedField1, #m.nestedField2
ADD aNumber :val2, anotherNumber :val3
DELETE aSet :val4

SET list[0] = :val1 REMOVE #m.nestedField1, #m.nestedField2 ADD aNumber :val2, anotherNumber :val3 DELETE aSet :val4

Query

第一種方法 : dynamodb 除了 GetItem (用 partition key 取得資料) 也可以使用其中某個欄位取得,不過要先到 Dynamodb 的 AWS Console 上對那個欄位建立 index

params := &dynamodb.QueryInput{
    TableName:              aws.String("users"), // Required
    IndexName:              aws.String("user_id-index"),
    ConsistentRead:         aws.Bool(false),
    KeyConditionExpression: aws.String("#user_id = :user_id"),
    ExpressionAttributeNames: map[string]*string{
        "#user_id": aws.String("user_id"), // Required
    },
    ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
        ":user_id": { // Required
            S: aws.String(user_id),
        },
    },
}

Query 支援用 index, GetItem 不支援用 index

ExpressionAttributeNames 也可以只寫成這樣

KeyConditionExpression: aws.String("user_id = :user_id"),
ExpressionAttributeValues:  map[string]*dynamodb.AttributeValue{
    ':user_id': {
        S: aws.String(user_id),
    },
},

ExpressionAttributeNames 代入 int : 最然對程式來說他是 int 但是實際上還是要用 string,只不過要指定 N (Number)

ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
    ":num": {N: aws.String("1")}
}

resp, err = svc.Query(params)

如果沒有資料,不會引起 err 喔! 要記得判斷 resp 是否為 nil

第二種方法 : 假設你有設 partition key 及 sort key ,但你只知道 partition key 不知道 sort key 你會沒辦法用 GetItem,這時也可以用 Query 直接對 partition key 取資料

params := &dynamodb.QueryInput{
    TableName:      aws.String(table), // Required
    ConsistentRead: aws.Bool(false),
    KeyConditionExpression: aws.String("#user_id = :user_id"),
    ExpressionAttributeNames: map[string]*string{
        "#user_id": aws.String("user_id"), // Required
    },
    ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
        ":user_id": { // Required
            S: aws.String(user_id),
        },
    },
}

Scan

用來取這個 Table 的全部資料, 不可以設定條件 (where)

轉換 dynamodb 格式

從 dynamodb 取出來的資料有它自已的格式,在取的時候有時候蠻麻煩的,sdk 有提供轉換格式的函式

如果用 Marshal/Unmarshal User struct,它會優先依照 tag dynamodbav 將值 Map 到 struct 欄位,其次才是 tag json

GetItem (struct)

type User struct {
    Name string `json:"name" dynamodbav:"user_name"`  // 避開 dynamodb 保留字
}
var u User
err = dynamodbattribute.UnmarshalMap(resp.Item, &u)

var m map[string]interface{}
err = dynamodbattribute.UnmarshalMap(resp.Item, &m)

BatchGetItem

// Specific table (user_contacts)
var m []map[string]interface{}
err = dynamodbattribute.UnmarshalListOfMaps(resp.Responses["user_contacts"], &m)

// All table
var m map[string][]map[string]interface{}
m = make(map[string][]map[string]interface{})
for k, v := range resp.Responses {
    var tmp []map[string]interface{}
    err = dynamodbattribute.UnmarshalListOfMaps(v, &tmp)
    if err != nil {
        return nil, err
    }
    m[k] = tmp
}

Query

var m []map[string]interface{}
err = dynamodbattribute.UnmarshalListOfMaps(resp.Items, &m)

S3

PutObject

params := &s3.PutObjectInput{
    Bucket: aws.String("bucket_name"),
    Key:    aws.String("file_name"),
    Body:   bytes.NewReader([]byte("json_str")),
}
svc, err := GetS3Instance()
resp, err := svc.PutObject(params)

resp :
    {
      ETag: "\"b8468dbe0941b5164253860813663edf\""
    }

需要注意的是成功上傳後的檔案預設的 ACL 都是 private, 除非你的 bucket 有設定對放開放, 不然就要指定 ACL, 可參考官方文件

DeleteObject

params := &s3.DeleteObjectInput{
    Bucket: aws.String("bucket_name"),
    Key:    aws.String("file_name"),
}
svc, err := GetS3Instance()
resp, err := svc.DeleteObject(params)  // 即使成功 resp 不會有回傳值

無法直接刪除一個 folder

DeleteObjects

params := &s3.DeleteObjectsInput{
    Bucket: aws.String(bucket),
    Delete: &s3.Delete{
    Objects: []*s3.ObjectIdentifier{
        {
            Key: aws.String("objectkey1"),
        },
        {
            Key: aws.String("objectkey2"),
        },
    },
}
svc, err := GetS3Instance()
resp, err = svc.DeleteObjects(params)

如果要刪除整個 folder, 要用 loop 再搭配 listObject 刪除, 直到 listObject 取不到東西為止

ListObject

params := &s3.ListObjectsInput{
    Bucket: aws.String(bucket),
    Prefix: aws.String(path),
    MaxKeys: aws.Int64(2),          // [Optional] 限制一次拿出來的數量, 最多 1,000 (同時也是預設值)
}
result, err = s.Service.ListObjects(params)

result:

{
  Contents: [
    {
      ETag: "\"e******************************8\"",
      Key: "test-dir/2.png",
      LastModified: 2017-11-20 10:20:50 +0000 UTC,
      Owner: {
        DisplayName: "testqa",
        ID: "b**************************************************************6"
      },
      Size: 14688,
      StorageClass: "STANDARD"
    },
    {
        ...
    }
  ],
  IsTruncated: false,
  Marker: "",
  MaxKeys: 1000,
  Name: "test-bucket",
  Prefix: "test-dir"
}

最多一次只能取出 1000 個 item

GetObject

params := &s3.GetObjectInput{
    Bucket: aws.String("bucket_name"),
    Key:    aws.String("file_name"),
}
svc, err := GetS3Instance()
resp, err = svc.GetObject(params)
json_str, err := ioutil.ReadAll(resp.Body)
fmt.Println(string(json_str))

// Dump resp
(*s3.GetObjectOutput)(0xc420468000)({
  AcceptRanges: "bytes",
  Body: buffer(0xc42034e040),
  ContentLength: 633,
  ContentType: "binary/octet-stream",
  ETag: "\"34d8d42271944aa866145dbeb550dd86\"",
  LastModified: 2016-09-26 08:12:23 +0000 UTC,
  Metadata: {

  }
})

HeadObject

可以用來判斷 object 是否存在在 s3

params := &s3.HeadObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
}
res, err = svc.HeadObject(params)

HeadObjectOutput

{
  AcceptRanges: "bytes",
  ContentLength: 80936,
  ContentType: "image/jpeg",
  ETag: "\"7a6e371115538ae1a8b836d1cfd8fc3b\"",
  LastModified: 2018-02-12 09:37:14 +0000 UTC,
  Metadata: {

  }
}

GetObjectRequest (pre-signed url for downloading - GET)

svc, err := GetS3Instance()
req, _ := svc.GetObjectRequest(&s3.GetObjectInput{
    Bucket: aws.String("bucket_name"),
    Key:    aws.String("file_path"),
})
pre_url, err = req.Presign(time.Duration(10) * time.Second)     // within 10 seconds for downloading file

PutObjectRequest (pre-signed url for uploading - PUT)

svc, err := GetS3Instance()
req, _ := svc.PutObjectRequest(&s3.PutObjectInput{
    Bucket: aws.String("bucket_name"),
    Key:    aws.String("file_path"),
})
pre_url, err := req.Presign(15 * time.Minute)                   // within 15 minutes for uploading file

關於 ACL, 不知道為什麼參數加上 ACL 指定 public-read 當上傳時 AWS 會回 403 錯誤訊息為 SignatureDoesNotMatch, 後來解法是上傳成功後再去 call PutObjectAcl 改變 ACL

curl 測試是否可以上傳

curl -v -T /tmp/test.mp4 "https://my_bucket.s3-us-west-2.amazonaws.com/videos/test.mp4?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ASIAJXSKW3C6...略..."

CopyObject

可從一個 bucket 裡的檔案 copy 到另一個 bucket 下

svc, err := GetS3Instance()
_, err = svc.CopyObject(&s3.CopyObjectInput{
    CopySource: aws.String(from_path),          // 注意!! 來源的組成是 {bucket}/{file_path}
    Bucket:     aws.String(bucket),
    Key:        aws.String(to_path),
    ACL:        aws.String("public-read"),      // optional
})

需要注意的是成功上傳後的檔案預設的 ACL 都是 private, 除非你的 bucket 有設定對放開放, 不然就要指定 ACL, 可參考官方文件

PutObjectAcl

svc, err := GetS3Instance()
params := &s3.PutObjectAclInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(file_path),
    ACL:    aws.String("public-read"),
}
_, err = svc.PutObjectAcl(params)

CloudFront

pre-signed url

產生的 pre-signed url 是 custom domain 而不是 aws s3 的 domain

前置作業請參考 aws cloudfront - pre-signed url + custom domain 設定

以上完成後,到這裡應該已經將 private key 上傳到主機了, 就可以開始實作 :

file_url := "https://cdn.your-custom-domain.com/test.jpg"                                                              // custom domain + S3 file
key_id := "APK**************Y2A"                                                                        // Access Key ID
priv_key, err := sign.LoadPEMPrivKeyFile("/tmp/cloudfront-private_key-pk-APK**************Y2A.pem")     // Private key path
if err != nil {
    logs.Debug("load pem err: %s", err.Error())
} else {
    signer := sign.NewURLSigner(key_id, priv_key)
    signed_url, err := signer.Sign(file_url, time.Now().Add(15*time.Second))
    if err != nil {
        logs.Debug("Failed to sign url, err: %s\n", err.Error())
    }
    logs.Info("signed_url: %s", signed_url)
}

signed url :

https://cdn.your-custom-domain.com/test.jpg?Expires=1500302808&Signature=mhh8YmrYMs91Cc4qoTeDSUjOeQChe-U7Ksm0Ue92WJufMlKkEAOHR3GeoEaoc3nSpitA5KV-4op6EePTfYG8DMqr-J8Oh55gCNGMjicaiMdz~VOCEoSUTeYgLFnj-dQT5OGjdg~iELDX5LROZ2UL~5vJgSKrlgiH2VLp4WMO~AoDe~CiZAWtQ49Jbrx1XZtVX3i9lCDAL4881psx8xt7W4dANJ0uo1oelBo5P0BhM0v400un9UT4FG-ZYrXB1iDYszwxhLx4TWZSa2MWXWTJyXzeZwcVcbulvdP7apokPC5aMrLaPfel6v22HSFAEP62Unety01SN4HWYtLCW7v9VQ__&Key-Pair-Id=APKAIZQ4PTQ4P7ZNQY2A

SQS

Send Message / Receive Message / Delete Message

參考 : golang-aws-sqs-example

Send Message Batch Request

var entries []*sqs.SendMessageBatchRequestEntry
for i := 1; i <= 5; i++ {
    f := sqs.SendMessageBatchRequestEntry{ // Required
        Id:          aws.String(fmt.Sprintf("Message_%d", i)),  // Required, 數字 英文 - _  而且 ID 不可重覆
        MessageBody: aws.String("message body"),                 // Required
    }
    entries = append(entries, &f)
}
params := &sqs.SendMessageBatchInput{
    Entries:  entries,
    QueueUrl: aws.String(QueueUrl), // Required
}
resp, err := svc.SendMessageBatch(params)

Get queue attributes

params := &sqs.GetQueueAttributesInput{
    QueueUrl: aws.String(QueueUrl), // Required
    AttributeNames: []*string{
        aws.String("All"), // Required, 填要取得的欄位,`All` 是全取
        // More values...
    },
}
resp, err := svc.GetQueueAttributes(params)

Set queue attributes

params := &sqs.SetQueueAttributesInput{
    Attributes: map[string]*string{
        "ReceiveMessageWaitTimeSeconds": aws.String("0"),
    },
    QueueUrl: aws.String(QueueUrl), // Required
}
_, err := svc.SetQueueAttributes(params) // 成功不會返回內容

設定 RedrivePolicy (retry 機制)

Attributes: map[string]*string{
    // 刪除
    "RedrivePolicy": aws.String(""),

    // 最多一個 message 收到 3 次,超過就會送到 dead letter queue
    "RedrivePolicy": aws.String("{\"maxReceiveCount\":\"3\", \"deadLetterTargetArn\":\"arn:aws:sqs:ap-northeast-1:3**********2:MyDeadLetterQueue\"}"),
},

Change visibility timeout

// Change visibility timeout
change_params := &sqs.ChangeMessageVisibilityInput{
    QueueUrl:          aws.String(QueueUrl),  // Required
    ReceiptHandle:     message.ReceiptHandle, // Required
    VisibilityTimeout: aws.Int64(0),          // Required
}
_, err = w.Svc.ChangeMessageVisibility(change_params) // 成功不會返回內容

CloudWatch

PutMetricData

自已推一些數據讓 cloudwatch 幫你監控;上報 metric,不需要去 CloudWatch 設定,如果是新的 metric,它自已會新增

支援一次上報多個 metric data, 所以以下設計成多個

// Metric collection
metric_collection := map[string]float64{}{
    "success": 1,
}

// (optional) Dimensions
dimensions := map[stirng]string{}{
    "job_type": "curl",
}

// New metrict data input
params := &cloudwatch.PutMetricDataInput{
    Namespace: aws.String(namespace), // Required
}

// Give value to every metric data.
for k, v := range metric_collection {
    metric_data := &cloudwatch.MetricDatum{
        MetricName: aws.String(k), // Required
        Timestamp:  aws.Time(time.Now()),
        Value:      aws.Float64(v),
    }

    if len(dimensions) > 0 {
        for k, v := range dimensions {
            dimension := &cloudwatch.Dimension{Name: aws.String(k), Value: aws.String(v)}
            metric_data.Dimensions = append(metric_data.Dimensions, dimension)
        }
    }
    params.MetricData = append(params.MetricData, metric_data)
}
_, err = c.Service.PutMetricData(params)

SES

AWS 為了防止自已的 mail server 被當作濫發 email 的工具,所以目前我們都是在 SES sandbox 模式下發信的,它有一些限制

  • 要先到 SES 後台 Email Addresses 新增你的 email,認證後才可以寄信, 收信也是
  • 一天最多 200 封信,每秒最多一封

要突破以上限制則需要另外向 AWS 申請

SendEmail

svc := ses.New(sess)
params := &ses.SendEmailInput{
    Destination: &ses.Destination{ // Required
        // BccAddresses: []*string{
        //  aws.String("Address"), // Required
        // },
        // CcAddresses: []*string{
        //  aws.String("Address"), // Required
        // },
        ToAddresses: []*string{
            aws.String("jex+to@gmail.com"), // Required, 如果傳進 slice 改用 aws.StringSlice
        },
    },
    Message: &ses.Message{ // Required
        Body: &ses.Body{ // Required !! Html / Text 擇一使用就好
            // Html: &ses.Content{
            //     Data:    aws.String("Test html content"), // Required
            //     Charset: aws.String("utf-8"),
            // },
            Text: &ses.Content{
                Data:    aws.String("Test raw content"), // Required
                Charset: aws.String("utf-8"),
            },
        },
        Subject: &ses.Content{ // Required
            Data:    aws.String("Test subject"), // Required
            Charset: aws.String("utf-8"),
        },
    },
    Source: aws.String("Jex Lin <jex@gmail.com>"), // Required
    ReplyToAddresses: []*string{
        aws.String("jex+reply@gmail.com"), // Required
    },
}

resp, err := svc.SendEmail(params)
if err != nil {
    return errors.New("SES response error: " + err.Error())
}

resp :

(*ses.SendEmailOutput)(0xc42002c0a0)({
  MessageId: "010101581e1837c2-e0c68369-e7c4-47e4-b01e-3f7f6afca529-000000"
})

Sendor (from) 只支援 Ascill, 如果要改用 utf-8 字元要改成

=?utf-8?B?V2ktRmnjgqvjg6Hjg6k=?= <noreply@example.com>

SNS

Topics - Publish to topic

先建立一個 Topic 然候再 Subscribe 它,選擇你要使用什麼收到你訂閱的東西, 最簡單是用 email 的方式 - 填上自已的 email 後,你需要收信驗證,驗證完後只要有人 publish 到這個 topic 就會收到 email 了

params := &sns.PublishInput{
    Message:  aws.String("message"), // Required
    TopicArn: aws.String("arn:aws:sns:ap-northeast-1:4**********7:event_update"),
}

resp, err := svc.Publish(params)

if err != nil {
    return
}

resp :

{
  MessageId: "f56bf715-2584-5fe4-8f0a-a7b9c0c2c757"
}

Applications - Push Notification

先去 SNS 的 Applications 註冊 Push Notification 的服務,並把 ARN 記下來, 手機裡的 App 會有個 UUID (app 跟 gcm/apns 註冊拿到的),帶這個上來到 Server, 拿這個 UUID 向 SNS 註冊 Token (createPlatformEndpoint 帶上面註冊 SNS 的 ARN, 及 app UUID, enabled: true (enabled 預設是 false, 所以要改成 true)),會拿到 EndpointArn, 建議把這個 Token 存下來,以便日後再發送時使用, 註冊完後 AWS 的 SNS web UI 後台就有一筆 record ,也可以直接用 web UI 發送 notification 做測試, 每筆 record 後面都有 enabled 值,如果是 false 就代表不能推送,只要 SNS 推送一次但送不成功後就會把它改成 false, 後端要推送只要對 EndpointArn 發送 message 就可以了,格式可以選擇 raw 或 json

GCM 可以帶 title, 但 APNS 的 title 預設是 application name

GCM

{ "GCM": "{ \"notification\": { \"body\": \"test body\",\"title\": \"test titel\",\"icon\": \"test icon\" },\"data\": { \"custom_field\": \"custom_value\" } }" }

APNS or APNS_SANDBOX (dev)

 { "APNS":"{\"aps\":{\"alert\":\"Hello World!!\"},\"custom_field\": \"custom_value\"}" }
 { "APNS_SANDBOX":"{\"aps\":{\"alert\":\"Hello World!!\"},\"custom_field\": \"custom_value\"}" }

上面 payload 要注意的是最後總共要 json encode 兩次 (最外層 GCM/APNS key 的值已經先被 json encode 過一次了)

Code :

params := &sns.PublishInput{
    Message:          aws.String(message), // Required
    TargetArn:        aws.String(target_arn),
    MessageStructure: aws.String("json"),
}

_, err = s.Service.Publish(params)

Rekognition

DetectLabels (image file)

ff, _ := os.Open("test.jpg")
defer ff.Close()
bin = make([]byte, 500000)
ff.Read(bin)

params := &rekognition.DetectLabelsInput{
    Image: &rekognition.Image{
        Bytes: []byte(bin),
    },
    MaxLabels:     aws.Int64(5),
    MinConfidence: aws.Float64(1.0),
}
resp, err = svc.DetectLabels(params)

DetectLabels (s3)

params := &rekognition.DetectLabelsInput{
    Image: &rekognition.Image{
        S3Object: &rekognition.S3Object{
            Bucket: aws.String("bucket"),
            Name:   aws.String("file_path"),
        },
    },
    MaxLabels:     aws.Int64(5),
    MinConfidence: aws.Float64(1.0),
}
resp, err = svc.DetectLabels(params)

Golang Beego

上手資源

安裝

按照官網指令即可

Routers

基本語法

beego.Router("/api/:id", &controllers.RController{})
beego.Router("/api/list", &RestController{}, "*:ListFood")  # * = ANY, 可替換為 post / put
beego.Router("/api/list", &RestController{}, "get,post:ApiFunc")  # 指定兩個 Http method 到同一個 func
beego.Router("/api/list", &RestController{}, "get:GetFunc;post:PostFunc")  # 指定到不同 Http method 即不同的 Func
beego.AutoRouter("&controllers.ObjectController{}) # 自動 Match /object/blog/2013/09/12 -> ObjectController 的 blog 方法,參數: map[0: 2013 1:09 2:12]

Match

  • /api:id : Match /api/123 (即使沒有 :id, :id 相當於 .* 任意字元)
  • /api/:id! : Match /api/123 (:id 一定要有值)
  • /api/:id([0-9]+) = api/:id:int : 指定要配對的字元
  • /news/:all : Match /news/path/to/123.html (:all 是框架自定義的)
  • /news/* : 與 :all 一樣,差別在於 * 會將變數給 splat 而不是 all
  • /user/:username([\w]+) = /user/:username:string : Match username = 字串
  • /download/*.* : Match /download/file/api.xml

404

beego.ErrorHandler("404", page_not_found)

func page_not_found(rw http.ResponseWriter, r *http.Request) {
    rw.Write([]byte("404"))
}

match url path 直接在 router 層輸出

import "github.com/astaxie/beego/context"

beego.Get("/gl", func(ctx *context.Context) {
    ctx.Output.Body([]byte("ok"))
})

Controllers

基本語法

輸出字串

this.Ctx.WriteString("hello")
this.Ctx.Output.Body([]byte("ok"))

產生 view

this.Data["show"] = "Test"
this.TplName = "user/index.tpl"

改變輸出的 status code

this.Ctx.Output.SetStatus(400)

取得 parameters 變數

取得 form 變數

this.Ctx.Input.Param("user_id")    // 注意!有 `:`

取得 router 對應的變數 (url params mapping)

router:
    beego.Router("/users/:user_id/", &controllers.UserController{}, "post:update")
this.Ctx.Input.Param(":user_id")    // 注意!有 `:`

讓 struct int 參數支援選填

因為 int 初始值為 0, 所以這裡用 pointer 來支援 nil, 再額外判斷如果有值再放到 struct 裡

type AddReq struct {
    Gender *int
}

req := AddReq{}
age, err := this.GetInt("age")
if err == nil {
    req.Age = &age
}

log.Println(*req.Age)

conf

取得變數值

單一值

httpport = 8080

beego.AppConfig.String("httpport") # dev

Section 值/陣列

[demo]
peers = one;two;three

beego.AppConfig.Strings("demo::peers") # [one two three]

根據環境不同讀取不同的設定檔

app.conf

runmode = dev
sessionon = true            # 啟用 session, 預設使用 mem
copyrequestbody = true      # 要打開,否則 this.Ctx.Input.RequestBody 抓不到資料

[dev]
host = 127.0.0.1

[prod]
host = 137.111.120.179

取變數時 :

beego.AppConfig.String("runmode")

log

在 beego 目錄下產生 log file, 更多參數請參考官方文件

beego.SetLogger(logs.AdapterFile, `{"filename":"project.log"}`)
beego.Run()

// logs.SetLogger(logs.AdapterMultiFile, ``{"filename":"test.log","separate":["emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"]}``)

ORM

Beego orm 相當地方便,雖然 beego orm 是包在 beego 裡面,但是它分的相當的乾淨,你可以在任何 project 上引入, 以下是使用 MySQL 的 example :

import (
    "fmt"

    "github.com/astaxie/beego/orm"
    _ "github.com/go-sql-driver/mysql"                  // 一定要 import
)

// 這是你的 Model, 用來跟 User Table Mapping 的,欄位名的規則是: user_id (mysql column name) => UserId (struct field)
type User struct {
    Id     int
    UserId string
    Email  string
    Name   string
}

func init() {
    orm.RegisterModel(new(User))
}

func main() {
    orm.RegisterDataBase("default", "mysql", "username:password@/your_db_name?charset=utf8")        // 第一個一定要是 default
    orm.SetMaxIdleConns("default", 5)
    orm.SetMaxOpenConns("default", 30)
    o := orm.NewOrm()
    user := User{UserId: "2c024jka-cc06-4fc1-8fd3-f1c72dw22dac"}            // 相當於是 WHERE 條件
    err := o.Read(&user, "user_id")
    if err == orm.ErrNoRows {
        fmt.Println("Not found")
    } else if err == orm.ErrMissPK {
        fmt.Println("Missed PK key")
    } else {
        fmt.Println(user.Email)
    }
}

WHERE 多項條件

user := User{UserId: "2c024jka-cc06-4fc1-8fd3-f1c72dw22dac", Name: "Jex"}
err := o.Read(&user, "user_id", "name")

切換到另一個 DB

err = orm.Using("another_db")

SELECT 取一筆 record (limit: 1)

var users []orm.Params
num, err := o.QueryTable(new(model.User)).Filter("user_id", user_id).OrderBy("-created_at").Limit(1).Values(&users)

var d Device
err := o.QueryTable(new(Device)).Filter("did", "A123").One(&d)

user := User{UserId: "2c024jka-cc06-4fc1-8fd3-f1c72dw22dac"}
err := o.Read(&user, "user_id")

Read vs Values : Read 取不到值時會擲 error, Values 則不會

Count

num, err = o.QueryTable(new(model.User)).Filter("user_id", user_id).Count()

Update 特定欄位

affected_num, err := o.QueryTable(new(model.User)).Filter("user_id", user_id).Update(orm.Params{
    "name": name,
})

Update 特定欄位

o := orm.NewOrm()
user.Name = "Jex"
user.Address = "Taiwan ... "
o.Update(user, "name", "address")

Raw Query

_, err = j.Service.MySQL.Raw("UPDATE user SET name = ? WHERE uid = ?", 'Bob', 'uid00001').Exec()

Deploy on Heroku

介紹

Heroku 是雲端主機供應商,它有提供免費方案,提供 DB 及 mailgun etc.,如果只是一個小型不複雜的專案使用它還蠻合適的

如何 Deploy 到 Heroku

1) 安裝 Heroku 的 command 並登入

heroku login

2) 到你的 Repository 目錄裡

3) Push 你的 code 到 heroku

  • Production 不能使用 sqlite3,詳細說明在這
  • Production 要用 PostgreSQL,最好是 Dev 及 Production 都用 PostgreSQL
  • Production 一定要有 gem 'pg' 否則即使 deploy 成功,頁面也會噴錯
  • Gemfile.lock 必須要有,不要在 .gitignore 加上它

這裡可以先做個小修改,讓 App 能順利 deploy 再說,Postgres 安裝再另外解決,Gemfile :

gem 'sqlite3', group: [:development, :test]
gem 'pg', group: :production

如果 PostgreSQL 還沒安裝,在 bundle install 先略過它

bundle install --without production

Deploy 指令 :

heroku create
git push heroku master

當 app 被 deploy 時,heroku 會自動執行 bundle install

4) 觀看你的 rails app 頁面

heroku open

會自動打開瀏覽器

5) 完成

如果打算使用 Postgres 當作開發環境的 DB,安裝及設定請參考此篇的”連接 PostgreSQL 設定”

如何連到 Heroku Postgret

修改 database.yml,只需要簡單的設定這樣就好,其他的就交給 heroku

production:
  <<: *default
  adapter: postgresql
  encoding: unicode

migrate

heroku run rake db:migrate

成功後可以連到 heroku 的 rails console 看看 Table 是否建立成功

其他設定

Rails Log:heroku 的 log 只有標準輸出,所以要將 rails log 導到標準輸出,否則會很難 Debug

gem 'rails_12factor', group: :production

其他指令

觀看 log

heroku logs --tail

進入 console

heroku run rails console

查看 Postgrets 資訊

heroku pg

restart

heroku restart

刪除 Postgres PID

heroku pg:kill PROCESSID

重置 DB,使用這個 command 要確定你要這麼做

heroku pg:reset DATABASE_URL
或
heroku pg:reset DATABASE