Question
Our infrastructure allows only one Kubernetes namespace to be used both for Ververica Platform and its Apache Flink deployments. How can I enforce fine grained access control for AWS resources (such as S3, Kinesis Data Streams, etc.) so that we make sure each Apache Flink deployment will have the least privileges that it requires.
Answer
Note: This section applies to Flink 1.15 with Ververica Platform 2.8.
Overview
Ideally, the suggested approach to structure the desired functionality is to isolate the resource management per Kubernetes Namespace, which can then reflected based on Ververica Namespace.
This means each Kubernetes Namespace will run only single Apache Flink deployment and each Kubernetes Namespace will have only single Service Account with that Apache Flink deployment's required permissions.
The overall strategy would be then:
- Create Kubernetes Namespace per Use Case
- Create Single Kubernetes Service Account per Kubernetes Namespace
- Create Ververica Platform Namespace per Use Case
- Create Single Deployment Target per Ververica Platform Namespace
- Create Single Deployment per Ververica Platform Namespace
However, it is also not a rare case that an organisation's Kubernetes infrastructure team expects the other teams to solely work on a single Kubernetes namespace. In this scenario, there is still a requirement to enforce least privileges on Apache Flink deployments that is managed by Ververica Platform. At the moment, there is no out of the box policy mechanism in Kubernetes that will restrict Service Account usage by pods.
Since Kubernetes Native Policy Management is a Kubernetes administrative matter and the Ververica Platform is not a Kubernetes management platform, rather a control plane for Apache Flink applications, we will use a policy engine named Kyverno [1] designed for Kubernetes. Kyverno lets us define policies that are not supported out of the box by Kubernetes, such as restricting Service Account usage by pods based on labels [2].
As an example, we will create an EKS cluster to deploy both Ververica Platform, with a blob storage configured as S3, as well as Kyverno. Furthermore, we will have one Kinesis Data Stream (greeting-request) where one Apache Flink deployment (greeting-request-producer) is producing to it, and the second Apache Flink deployment (greeting-request-consumer) is consuming from it. For all the operations, we will setup IAM policies and Service Accounts with the least privileges.
Final restriction we will introduce is the Kyverno policy for Apache Flink deployments so that each Apache Flink deployment's name have to be exactly same as the Service Account's name it will be using. This will enforce a one-to-one mapping in between Apache Flink deployments that are managed by Ververica Platform and the Kubernetes Service Accounts we will create with the least privileges assigned to it.
Cluster Setup
We will use the command line tool eksctl to create the EKS cluster and associate it with an oidc provider. Finally, we will create the namespaces for the Ververica Platfom (vvp) and Kyverno (kyverno). Please refer to eksctl documentation [3] for additional parameters like setting the region, zone, etc.
eksctl create cluster vvp
eksctl utils associate-iam-oidc-provider --cluster vvp --approve
kubectl create namespace vvp
kubectl create namespace kyverno
Kyverno Setup
Since Kyverno is providing the official helm charts, we will use their helm chart repository to first install by running the following commands.
helm repo add kyverno https://kyverno.github.io/kyverno
helm repo update
helm install kyverno kyverno/kyverno --namespace kyverno
Furthermore, we will create the Kyverno policy to enforce our one-to-one mapping with Apache Flink deployments and Service Accounts. Here, we will use the label that is added by Ververica Platform to all the deployments, named deploymentName. Run the following command
kubectl apply -f vvp-policy-deployments.yaml
with the contents of the vvp-policy-deployments.yaml as follows:
apiVersion: kyverno.io/v1
kind: Policy
metadata:
name: deployments
namespace: vvp
spec:
validationFailureAction: enforce
rules:
- name: validate-service-account
match:
any:
- resources:
kinds:
- Pod
preconditions:
any:
- key: "{{ request.object.metadata.labels.deploymentName || '' }}"
operator: NotEquals
value: ""
validate:
message: "Invalid service account {{ request.object.spec.serviceAccountName }} for deployment {{ request.object.metadata.labels.deploymentName }}"
deny:
conditions:
any:
- key: "{{ request.object.spec.serviceAccountName }}"
operator: NotEquals
value: "{{ request.object.metadata.labels.deploymentName }}"
Here the templating values are Kyverno specific syntax where several more possibilities are available.
Ververica Platform Setup
The next step is to set up the Ververica Platform and the first thing to do is to create the S3 bucket for the blob storage of Ververica Platform as well as the Apache Flink deployments. Also, we will create the necessary IAM Role and Role Policy followed by Kubernetes Service Account, Role and Role Binding.
aws s3 mb s3://vvp-blob-storage
aws iam create-policy --policy-name vvp-blob-storage --policy-document file://vvp-policy-blob-storage.json
aws iam create-role --role-name vvp-ververica-platform --assume-role-policy-document file://vvp-trust-relationship-ververica-platform.json
aws iam attach-role-policy --role-name vvp-ververica-platform --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/vvp-blob-storage
kubectl apply -f vvp-rbac-ververica-platform.yaml
with the manifest are as following:
#tabs
## vvp-policy-blob-storage.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::vvp-blob-storage/*",
"arn:aws:s3:::vvp-blob-storage"
]
}
]
}
## vvp-trust-relationship-ververica-platform.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_ID}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_ID}:aud": "sts.amazonaws.com",
"${OIDC_ID}:sub": "system:serviceaccount:vvp:ververica-platform"
}
}
}
]
}
## vvp-rbac-ververica-platform.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: ververica-platform
namespace: vvp
annotations:
eks.amazonaws.com/role-arn: "arn:aws:iam::${ACCOUNT_ID}:role/vvp-ververica-platform"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: ververica-platform
namespace: vvp
rules:
- apiGroups: ["apps", "extensions"]
resources: ["deployments"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: [""]
resources: ["configmaps", "pods", "services", "secrets"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: ververica-platform
namespace: vvp
subjects:
- kind: ServiceAccount
name: ververica-platform
roleRef:
kind: Role
name: ververica-platform
apiGroup: rbac.authorization.k8s.io
#--
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
OIDC_ID=$(aws eks describe-cluster --name=vvp --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
while applying.
Finally, we will install Ververica Platform using official helm charts by executing the following command:
helm repo add ververica https://charts.ververica.com
helm repo update
helm install vvp ververica/ververica-platform \
--namespace vvp \
--values vvp-configuration-persistence.yaml \
--values vvp-configuration-blob-storage.yaml \
--values vvp-configuration-rbac.yaml \
--values vvp-configuration-license.yaml
whereas the configuration files are as following:
#tabs
## vvp-configuration-persistence.yaml
vvp:
persistence:
type: local
## vvp-configuration-blob-storage.yaml
vvp:
blobStorage:
baseUri: s3://vvp-blob-storage
## vvp-configuration-rbac.yaml
rbac:
create: false
serviceAccountName: ververica-platform
#--
with the additional vvp-configuration-license.yaml file with your license key.
Important: Please check Getting Started with Ververica Platform on Amazon AWS [4] for setting up remote persistence before going into production.
Application Setup
For the demo, we will have one Kinesis Data Stream by executing the following command:
aws kinesis create-stream --stream-name greeting-request --shard-count 1
aws iam create-policy --policy-name vvp-greeting-request-consumer --policy-document file://vvp-policy-greeting-request-consumer.json
aws iam create-policy --policy-name vvp-greeting-request-producer --policy-document file://vvp-policy-greeting-request-producer.json
with the following files:
#tabs
## vvp-policy-greeting-request-consumer.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": "arn:aws:kinesis:${AWS_REGION}:${ACCOUNT_ID}:stream/greeting-request"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "kinesis:ListShards",
"Resource": "*"
}
]
}
## vvp-policy-greeting-request-producer.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "kinesis:PutRecords",
"Resource": "arn:aws:kinesis:${AWS_REGION}:${ACCOUNT_ID}:stream/greeting-request"
}
]
}
#--
following with the creation of our consumer application by executing the following commands:
aws iam create-role --role-name vvp-greeting-request-consumer --assume-role-policy-document file://vvp-trust-relationship-greeting-request-consumer.json
aws iam attach-role-policy --role-name vvp-greeting-request-consumer --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/vvp-blob-storage
aws iam attach-role-policy --role-name vvp-greeting-request-consumer --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/vvp-greeting-request-consumer
kubectl apply -f vvp-rbac-greeting-request-consumer.yaml
whereas the configuration files are as following:
#tabs
## vvp-trust-relationship-greeting-request-consumer.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_ID}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_ID}:aud": "sts.amazonaws.com",
"${OIDC_ID}:sub": "system:serviceaccount:vvp:greeting-request-consumer"
}
}
}
]
}
## vvp-rbac-greeting-request-consumer.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: greeting-request-consumer
namespace: vvp
annotations:
eks.amazonaws.com/role-arn: "arn:aws:iam::${ACCOUNT_ID}:role/vvp-greeting-request-consumer"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: greeting-request-consumer
namespace: vvp
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: greeting-request-consumer
namespace: vvp
subjects:
- kind: ServiceAccount
name: greeting-request-consumer
roleRef:
kind: Role
name: greeting-request-consumer
apiGroup: rbac.authorization.k8s.io
#--
with the following SQL deployments for creating the example deployments:
curl -X POST "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" -H "accept: application/json" -H "Content-Type: application/json" --data @vvp-catalog-greeting-request-consumer.json
curl -X POST "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" -H "accept: application/json" -H "Content-Type: application/json" --data @vvp-catalog-greeting-request-blackhole.json
curl -X POST "http://localhost:8080/api/v1/namespaces/default/deployments" -H "accept: application/json" -H "Content-Type: application/json" --data @vvp-deployment-greeting-request-consumer.json
whereas the configuration files are as following:
#tabs
## vvp-catalog-greeting-request-consumer.json
{
"statement": "CREATE TABLE `vvp`.`default`.`greeting_request_consumer` (`name` STRING) WITH ('connector' = 'kinesis', 'aws.region' = '${AWS_REGION}', 'format' = 'json', 'stream' = 'greeting-request');"
}
## vvp-catalog-greeting-request-blackhole.json
{
"statement": "CREATE TABLE `vvp`.`default`.`greeting_request_blackhole` (`name` STRING) WITH ('connector' = 'blackhole');"
}
## vvp-deployment-greeting-request-consumer.json
{
"apiVersion": "v1",
"kind": "Deployment",
"metadata": {
"name": "greeting-request-consumer"
},
"spec": {
"deploymentTargetName": "vvp",
"template": {
"spec": {
"artifact": {
"kind": "SQLSCRIPT",
"sqlScript": "INSERT INTO greeting_request_blackhole SELECT * FROM greeting_request_consumer;"
},
"kubernetes": {
"jobManagerPodTemplate": {
"spec": {
"containers": [],
"serviceAccountName": "greeting-request-consumer"
}
},
"taskManagerPodTemplate": {
"spec": {
"containers": [],
"serviceAccountName": "greeting-request-consumer"
}
}
}
}
}
}
}
#--
following with the creation of our producer application by executing the following commands:
aws iam create-role --role-name vvp-greeting-request-producer --assume-role-policy-document file://vvp-trust-relationship-greeting-request-producer.json
aws iam attach-role-policy --role-name vvp-greeting-request-producer --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/vvp-blob-storage
aws iam attach-role-policy --role-name vvp-greeting-request-producer --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/vvp-greeting-request-producer
kubectl apply -f vvp-rbac-greeting-request-producer.yaml
whereas the configuration files are as following:
#tabs
## vvp-trust-relationship-greeting-request-producer.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_ID}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_ID}:aud": "sts.amazonaws.com",
"${OIDC_ID}:sub": "system:serviceaccount:vvp:greeting-request-producer"
}
}
}
]
}
## vvp-rbac-greeting-request-consumer.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: greeting-request-producer
namespace: vvp
annotations:
eks.amazonaws.com/role-arn: "arn:aws:iam::${ACCOUNT_ID}:role/vvp-greeting-request-producer"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: greeting-request-producer
namespace: vvp
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["create", "delete", "get", "list", "patch", "update", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: greeting-request-producer
namespace: vvp
subjects:
- kind: ServiceAccount
name: greeting-request-producer
roleRef:
kind: Role
name: greeting-request-producer
apiGroup: rbac.authorization.k8s.io
#--
with the following SQL deployments for creating the example deployments:
curl -X POST "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" -H "accept: application/json" -H "Content-Type: application/json" --data @vvp-catalog-greeting-request-datagen.json
curl -X POST "http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" -H "accept: application/json" -H "Content-Type: application/json" --data @vvp-catalog-greeting-request-producer.json
curl -X POST "http://localhost:8080/api/v1/namespaces/default/deployments" -H "accept: application/json" -H "Content-Type: application/json" --data @vvp-deployment-greeting-request-producer.json
whereas the configuration files are as following:
#tabs
## vvp-catalog-greeting-request-datagen.json
{
"statement": "CREATE TABLE `vvp`.`default`.`greeting_request_datagen` (`name` STRING) WITH ('connector' = 'datagen', 'rows-per-second' = '10');"
}
## vvp-catalog-greeting-request-producer.json
{
"statement": "CREATE TABLE `vvp`.`default`.`greeting_request_producer` (`name` STRING) WITH ('connector' = 'kinesis', 'aws.region' = '${AWS_REGION}', 'format' = 'json', 'stream' = 'greeting-request', 'sink.fail-on-error' = 'true');"
}
## vvp-deployment-greeting-request-producer.json
{
"apiVersion": "v1",
"kind": "Deployment",
"metadata": {
"name": "greeting-request-producer"
},
"spec": {
"deploymentTargetName": "vvp",
"template": {
"spec": {
"artifact": {
"kind": "SQLSCRIPT",
"sqlScript": "INSERT INTO greeting_request_producer SELECT * FROM greeting_request_datagen;"
},
"kubernetes": {
"jobManagerPodTemplate": {
"spec": {
"containers": [],
"serviceAccountName": "greeting-request-producer"
}
},
"taskManagerPodTemplate": {
"spec": {
"containers": [],
"serviceAccountName": "greeting-request-producer"
}
}
}
}
}
}
}
#--