Project Summary
For ANU COMP6442 Project Demo
UI Design and Testing
- UI tests using espresso or similar. Please note that your tests must be of reasonable quality. (For UI testing, you may use something such as espresso) a. Espresso is not covered in lectures/labs, but is a simple framework to write Android UI tests. (hard)
Greater Data Usage, Handling and Sophistication
- User profile activity containing a media file (image, animation (e.g. gif), video). (easy)
User Interactivity
- The ability to micro-interact with 'posts' (e.g. like, report, etc.) [stored in-memory].(easy)
Peer to Peer Messaging
- Provide users with the ability to message each other directly. (hard)
Firebase Integration
- Use Firebase to implement user Authentication/Authorisation. (easy)
- Use Firebase to persist all data used in your app (this item replace the requirement to retrieve data from a local file) (medium)
New feature
- Use ML models for efficient user matching. (hard)
Installation
Download Project
Gitlab
git clone https://gitlab.cecs.anu.edu.au/u7201825/comp6422gp/-/tree/master/app/src/main
Design
Frontend
Our app supports four primary functionality, including login, post, chat, and a SURPRISING PAINT. Each functionality is implemented with some dedicated user interface and interaction. Below is a screenshot for each screen.
Login
Just like any other apps in the world, our app starts with login activitiy. It integrates with the firebase authentication as the backend and supports email as username and any password. The user first should click the "Register" button to register an account. After the toast "Register Succeed", he/she could then click the "Login" button to login.
Post
After clicking "Post" on the main page, the user should jump to the post screen. Post screen will read posts data stored in the firebase database and add two posts to the screen every few seconds. Each post is clickable, which will jump to the next screen with more info like text, and users are able to like by clicking the "LIKE" button and save by clicking the "Star" button. All saved posts can be retrieved from the "FAVORITEPOSTS" button at the post screen, and where a list of favorite posts is shown. Lastly, the user could also search posts with tag and/or user by clicking the "Search" icon. A more detailed grammar is shown in the search section.
Chat
Similarly, the user could click "Message" on the main screen and jump to chat functionality. The chat screen will read all users registered in the firebase datastore and display them in the list. The user could click one of the users and instantly starts a chat. Chats are read in real-time.
Paint
Our project builds a surprising paint for friends functionality. After clicking the "Paint" button on the main screen, the user will jump to a paint palette. Here, the user could paint with different colors, stroke widths, and undo and erase lines they do not want. After clicking the circle, the image you just painted will be uploaded to our secret backend server at ccyy.xyz for doodle recognition using a deep learning model. The prediction results will pair with the users who have a similar drawing and redirect you to chat with that person. This is an absolute opportunity for you to find new friends with the same interests.
Backend
Our secret backend is implemented using GoLang. It has three major functions.
- Receiving and sending users' drawings.
- Maintaining a connection for users to ask for who has a similar painting and return the matched users.
- Recognizing users drawing for matching.
The first requirement can be implemented using a simple HTTP multipart protocol for image sending and receiving. In contrast, the second part is quite tricky because when the user sends an image for recognition, the server may not find any matched images from other users to pair. So it has to maintain a long connection until the server returns a paired user. Hence, we choose to use WebSocket to realize this long connection.
To fulfill the third requirement, we trained a MobileNet doodle recognition network based on Google quickdraw dataset. It can classify doodles into 345 categories. The current accuracy of our model is not high, only reaches 61%. This is due to our private server's relatively low computing capability (ccyy.xyz), which only has 1 CPU core and 2 GB ram. It is infeasible to run large models other than MobileNet. Later we will deploy our backend to a better server, and at that time, our model will be improved.
Firebase
All other data and connections are stored in firebase for convenient, including user information, chat information, post information. Our app does not store any user data locally.
Design Pattern
Singleton
Just like firebase, we build a custom WebSocket singleton to control the creation of our WebSocket client, In this case, only 1 client can be created across our app, reducing the potential workload and the number of network connections to server
public class CustomWebSocket {
// websocket to ccyy.xyz for predicting and pairing functionality
private static OkHttpClient mClient;
private static CustomWebSocket myInstance;
private CustomWebSocket() {
// websocket to ccyy.xyz for pairing functionality
mClient = new OkHttpClient.Builder()
.readTimeout(3600, TimeUnit.SECONDS)
.writeTimeout(3600, TimeUnit.SECONDS)
.connectTimeout(3600, TimeUnit.SECONDS)
.build();
}
public static CustomWebSocket getInstance() {
if (myInstance == null) {
myInstance = new CustomWebSocket();
}
return myInstance;
}
public OkHttpClient getClient() {
return mClient;
}
}
Iterator
Iterator design patten here is used to create a stream of posts, iterator helps us hide the complexity of managing the counter and the post collection. Making our code much more tidy and readable
public interface Iterator {
public boolean hasNext();
public Object next();
}
public interface IterableCollection {
public Iterator createIterator();
}
/**
* Iterator pattern, used for showing post as a stream
*/
public class PostsConcreteCollection implements IterableCollection {
private ArrayList<HashMap<String, Object>> AllPostCollection = allPostItems;
@Override
public Iterator createIterator() {
return new PostsConcreteIterator();
}
private class PostsConcreteIterator implements Iterator {
int index = 0;
/**
* determine whether has next post
*
* @return True if has next post
*/
@Override
public boolean hasNext() {
if (AllPostCollection != null && index < AllPostCollection.size()) {
return true;
}
return false;
}
/**
* if has next post,get it
*
* @return next post if has next, otherwise null
*/
@Override
public Object next() {
if (this.hasNext()) {
return AllPostCollection.get(index++);
}
return null;
}
}
}
Timer timer = new Timer();
timer.schedule(new MyTask(PostActivity.this), 0, 2000);
/**
* used to give a time gap for showing new posts and show posts as a stream
*/
private class MyTask extends TimerTask {
private Activity context;
MyTask(Activity context) {
this.context = context;
}
@Override
public void run() {
context.runOnUiThread(updateThread);
}
}
Runnable updateThread = new Runnable() {
/**
* if has next post, add it and show, otherwise not update
*/
@Override
public void run() {
if (PostIterator.hasNext()) {
HashMap<String, Object> SinglePost = (HashMap<String, Object>) PostIterator.next();
BufferPostList.add(SinglePost);
}
postAdapter = new PostAdapter(getApplicationContext(), BufferPostList);
recyclerView.setAdapter(postAdapter);
}
};
Project Summary
Data Preparition
web crawler
crawed 3335 from instagram
Store Engine
B+Tree
B+Tree Node Classes
B+Tree Base Node
public BNode(int order) {
this.order = order;
this.keys = new LimitedArrayList<>(order);
}
B+Tree Leaf Node
/**
* Leaf node for B+Tree
* leaf node has prev and next connections in B+Tree
* @param order
*/
public BLeafNode(int order) {
super(order);
this.prev = null;
this.next = null;
}
B+Tree Tree Node (Non-Leaf Node)
public BTreeNode(int order) {
super(order);
this.children = new LimitedArrayList<>(order + 1);
}
B+Tree Insert
B+Tree Leaf Node Insert
/**
* Adds a key to the node. Splitting if necessary.
*
* @param key to be inserted.
*/
@Override
public void insert(T key, List<BNode<T>> path) {
// Ensure input is not null.
if (key == null)
throw new IllegalArgumentException("Input cannot be null");
path.add(this);
for (int i = 0; i < this.keys.size(); i++) {
if (key.compareTo(this.keys.get(i)) <= 0) {
this.keys.add(i, key);
return;
}
}
this.keys.add(key);
}
B+Tree Tree Node Insert
/**
* Adds a key to the node. Splitting if necessary.
*
* @param key to be inserted.
*/
@Override
public void insert(T key, List<BNode<T>> path) {
// Ensure input is not null.
if (key == null)
throw new IllegalArgumentException("Input cannot be null");
path.add(this);
// non-leaf node does not insert, pass to leaf node to insert in B+Tree
for (int i = 0; i < this.keys.size(); i++) {
if (key.compareTo(this.keys.get(i)) <= 0) {
this.children.get(i).insert(key, path);
return;
}
}
this.children.get(this.children.size() - 1).insert(key, path);
}
B+Tree Get
B+Tree Leaf Node Get
/**
* get the key from the node.
*
* @param key to be searched.
*/
@Override
public List<T> get(T key) {
List<T> keys = new ArrayList<>();
BLeafNode<T> current = this;
// search current node
keys.addAll(current.simpleGet(key);
current = current.next;
// search a series of consecutive nodes to get all values in B+Tree
while (current != null) {
List<T> newKeys = current.simpleGet(key);
keys.addAll(newKeys);
if (current.keys.size() != newKeys.size()) {
break;
}
current = current.next;
}
return keys;
}
/**
* get the key from one node only.
*
* @param key to be searched.
*/
public List<T> simpleGet(T key) {
if (key == null) {
return null;
}
List<T> keys = new ArrayList<>();
for (T t : this.keys) {
if (t.compareTo(key) == 0) {
keys.add(t);
}
}
return keys;
}
B+Tree Tree Node Get
/**
* Get a key from the tree.
*
* @param key to be searched.
*/
@Override
public List<T> get(T key) {
if (key == null) {
return null;
}
// non-leaf node does not get, pass to leaf node to get in B+Tree
for (int i = 0; i < this.keys.size(); i++) {
if (this.keys.get(i).compareTo(key) >= 0) {
return this.children.get(i).get(key);
}
}
return this.children.get(this.children.size() - 1).get(key);
}
B+Tree.java
/**
* B+Tree
*
* @param <T> the generic type this B+Tree uses. It extends comparable
* which allows us to order two of the same type.
*/
public class BTree<T extends Comparable<T>> {
/**
* Fields of a B+Tree.
* <p>
* Notice that we keep track of the root of the B+Tree. This is because
* of the nature of the B+Tree. It grows upwards! Unless you can return the
* root each time (which is a possible implementation approach) you will need
* to keep track of the root.
*/
private int order; // Order of the BTree.
private BNode<T> root; // Root node of the BTree.
/**
* Constructor which sets the field 'order'
*
* @param order of the BTree.
*/
public BTree(int order) {
this.order = order;
root = new BLeafNode<T>(order);
}
/**
* Adds key to the BTree.
*
* @param key to be inserted.
*/
public void insert(T key) {
// Ensure input is not null.
if (key == null)
throw new IllegalArgumentException("Input cannot be null");
List<BNode<T>> path = new ArrayList<BNode<T>>();
root.insert(key, path);
// split if required
for (int i = path.size() - 1; i > 0; i--) {
if (path.get(i).keys.size() == order) {
split(path.get(i - 1), path.get(i));
}
}
// split root if required
if (root.keys.size() == order) {
BTreeNode<T> newRoot = new BTreeNode<>(order);
split(newRoot, root);
root = newRoot;
}
}
public List<T> get(T key) {
if (key == null) {
throw new IllegalArgumentException("Input cannot be null");
}
if (root != null) {
return root.get(key);
}
return null;
}
/**
* Will conduct a split on the provided indexed child of the provided node.
*
* @param node The current node whose provided child to split will be split.
* @param childToSplit The child to split within the provided node.
*/
private void split(BNode<T> node, BNode<T> childToSplit) {
// Ensure neither input is null.
if (node == null || childToSplit == null)
throw new IllegalArgumentException("Input cannot be null");
// Get median key
int med = childToSplit.keys.size() / 2;
T medValue = childToSplit.keys.get(med);
if (childToSplit instanceof BTreeNode) {
// not leaf
BTreeNode<T> leftNode = new BTreeNode<>(order);
BTreeNode<T> rightNode = new BTreeNode<>(order);
// here use med not med+1 to let med value goes up
leftNode.keys = childToSplit.keys.get(0, med);
rightNode.keys = childToSplit.keys.get(med + 1, order);
leftNode.children = ((BTreeNode<T>) childToSplit).children.get(0, med + 1);
rightNode.children = ((BTreeNode<T>) childToSplit).children.get(med + 1, order + 1);
boolean found = false;
int i = 0;
for (; i < node.keys.size(); i++) {
if (medValue.compareTo(node.keys.get(i)) <= 0) {
node.keys.add(i, medValue);
found = true;
break;
}
}
if (!found) {
node.keys.add(medValue);
}
((BTreeNode<T>) node).children.remove(childToSplit);
((BTreeNode<T>) node).children.add(i, leftNode);
((BTreeNode<T>) node).children.add(i + 1, rightNode);
} else {
// leaf node
BLeafNode<T> leftNode = new BLeafNode<>(order);
BLeafNode<T> rightNode = new BLeafNode<>(order);
// here use med+1 not med to let the med value also stays at left leaf
leftNode.keys = childToSplit.keys.get(0, med + 1);
rightNode.keys = childToSplit.keys.get(med + 1, order);
// reset connections
leftNode.prev = ((BLeafNode<T>) childToSplit).prev;
leftNode.next = rightNode;
rightNode.prev = leftNode;
rightNode.next = ((BLeafNode<T>) childToSplit).next;
boolean found = false;
int i = 0;
for (; i < node.keys.size(); i++) {
if (medValue.compareTo(node.keys.get(i)) <= 0) {
node.keys.add(i, medValue);
found = true;
break;
}
}
if (!found) {
node.keys.add(medValue);
}
((BTreeNode<T>) node).children.remove(childToSplit);
((BTreeNode<T>) node).children.add(i, leftNode);
((BTreeNode<T>) node).children.add(i + 1, rightNode);
}
}
PostObj & IndexPostObj
PostObj
public class PostObj {
String image_url;
int comment_count;
int like_count;
String text;
public PostObj(String image_url, int comment_count, int like_count, String text) {
this.image_url = image_url;
this.comment_count = comment_count;
this.like_count = like_count;
this.text = text;
}
}
IndexPostObj
public class IndexPostObj implements Comparable<IndexPostObj> {
PostObj postObj;
String index;
public IndexPostObj(PostObj postObj, String index) {
this.postObj = postObj;
this.index = index;
}
/**
* compare by index, used in b+tree
* @param indexPostObj the index - post obj
* @return greater or smaller
*/
@Override
public int compareTo(IndexPostObj indexPostObj) {
return this.index.compareTo(indexPostObj.index);
}
}
Store Engine
/**
* extract post object from json string
*
* @param jsonString the json string to extract
* @return list of post obj
*/
public List<PostObj> extractPostObj(String jsonString) {
List<PostObj> postObjs = new ArrayList<PostObj>();
try {
JSONObject json = new JSONObject(jsonString);
JSONArray posts = (JSONArray) json.get("allpost");
// extract all fields, and then create new postobj
for (int i = 0; i < posts.length(); i++) {
JSONObject post = (JSONObject) posts.get(i);
String imageUrl = post.getString("img_url");
int commentCount = post.getInt("comment_count");
int likeCount = post.getInt("like_count");
String text = post.getString("text");
PostObj postObj = new PostObj(imageUrl, commentCount, likeCount, text);
postObjs.add(postObj);
}
} catch (Exception e) {
e.printStackTrace();
}
return postObjs;
}
/**
* extract tag and add tag post obj to b+tree
*
* @param postObjs the list of post obj
* @return btree of index post obj
*/
public BTree<IndexPostObj> extractTagPostObj(List<PostObj> postObjs) {
Pattern pattern = Pattern.compile(HASHTAG_REGEX);
BTree<IndexPostObj> bTree = new BTree<>(ORDER);
for (PostObj postObj : postObjs) {
List<String> indexes = new ArrayList<String>();
boolean matchFound = true;
String textString = postObj.text;
while (matchFound && !textString.equals("")) {
Matcher matcher = pattern.matcher(textString);
matchFound = matcher.find();
if (matchFound) {
String index = matcher.group(0);
if (!indexes.contains(index))
indexes.add(index);
textString = textString.substring(matcher.end());
}
}
if (indexes.size() != 0) {
for (String index : indexes) {
bTree.insert(new IndexPostObj(postObj, index));
}
} else {
bTree.insert(new IndexPostObj(postObj, "#NoTag"));
}
}
return bTree;
}
/**
* query all post obj given tag
*
* @param tag to search
* @return list of post obj
*/
public List<PostObj> queryTag(String tag) {
List<PostObj> returnedPostObjs = new ArrayList<>();
List<IndexPostObj> indexPostObjs = this.tagTree.get(new IndexPostObj(null, tag));
for (IndexPostObj indexPostObj : indexPostObjs) {
returnedPostObjs.add(indexPostObj.postObj);
}
return returnedPostObjs;
}
/**
* query all post obj given user
*
* @param user to search
* @return list of post obj
*/
public List<PostObj> queryUser(String user) {
List<PostObj> returnedPostObjs = new ArrayList<>();
List<IndexPostObj> indexPostObjs = this.userTree.get(new IndexPostObj(null, user));
for (IndexPostObj indexPostObj : indexPostObjs) {
returnedPostObjs.add(indexPostObj.postObj);
}
return returnedPostObjs;
}
public List<PostObj> queryPost() {
return this.postObjs;
}
Query Engine
Grammar
- <exp> ::= <term> | <term> AND <exp> | <term> OR <exp>
- <term> ::= <factor> | NOT <factor>
- <factor> ::= <user> | <tag> | ( expression )
Exp Classes
Exp
public abstract class Exp {
public String show() {
return null;
}
public List<PostObj> evaluate() {
return null;
}
}
UserExp
public UserExp(String value, StoreEngine se) {
this.value = value;
this.list = se.queryUser(this.value);
}
@Override
public List<PostObj> evaluate() {
return this.list;
}
TagExp
public TagExp(String value, StoreEngine se) {
this.value = value;
this.list = se.queryTag(this.value);
}
@Override
public List<PostObj> evaluate() {
return this.list;
}
AndExp
public AndExp(Exp term, Exp exp) {
this.term = term;
this.exp = exp;
}
/**
* in term and exp
* set + retain for intersect
*
* @return list of post obj
*/
@Override
public List<PostObj> evaluate() {
// AND
Set<PostObj> s1 = new HashSet<>(term.evaluate());
s1.retainAll(exp.evaluate());
return new ArrayList<>(s1);
}
OrExp
public OrExp(Exp term, Exp exp) {
this.term = term;
this.exp = exp;
}
/**
* in term or exp
* set + addall for union
*
* @return list of post obj
*/
@Override
public List<PostObj> evaluate() {
// OR
Set<PostObj> s1 = new HashSet<>(term.evaluate());
s1.addAll(exp.evaluate());
return new ArrayList<>(s1);
}
NotExp
public NotExp(Exp factor, StoreEngine se) {
this.factor = factor;
this.list = se.queryPost();
}
/**
* not in exp
* world - set for not
*
* @return list of post obj
*/
@Override
public List<PostObj> evaluate() {
// NOT
Set<PostObj> s1 = new HashSet<>(this.list);
s1.removeAll(factor.evaluate());
return new ArrayList<>(s1);
}
Tokenizer.java
public class Tokenizer {
private String buffer; // String to be transformed into tokens each time next() is called.
private Token currentToken; // The current token. The next token is extracted when next() is called.
/**
* Tokenizer class constructor
* The constructor extracts the first token and save it to currentToken
* **** please do not modify this part ****
*/
public Tokenizer(String text) {
buffer = text; // save input text (string)
next(); // extracts the first token.
}
/**
* This function will find and extract a next token from {@code _buffer} and
* save the token to {@code currentToken}.
*/
public void next() {
buffer = buffer.trim(); // remove whitespace
if (buffer.isEmpty()) {
currentToken = null; // if there's no string left, set currentToken null and return
return;
}
/*
To help you, we have already written the first few steps in the tokenization process.
The rest will follow a similar format.
*/
char firstChar = buffer.charAt(0);
if (firstChar == '&')
currentToken = new Token("&", Token.Type.AND);
if (firstChar == '|')
currentToken = new Token("|", Token.Type.OR);
if (firstChar == '!')
currentToken = new Token("!", Token.Type.NOT);
if (firstChar == '(')
currentToken = new Token("(", Token.Type.LBRA);
if (firstChar == ')')
currentToken = new Token(")", Token.Type.RBRA);
if (firstChar == '#') {
int i = 1;
while (i < buffer.length()
&& (Character.isAlphabetic(buffer.charAt(i)) || Character.isDigit(buffer.charAt(i)))) {
i++;
}
currentToken = new Token(buffer.substring(0, i), Token.Type.TAG);
}
if (firstChar == '@') {
int i = 1;
while (i < buffer.length()
&& (Character.isAlphabetic(buffer.charAt(i)) || Character.isDigit(buffer.charAt(i)))) {
i++;
}
currentToken = new Token(buffer.substring(0, i), Token.Type.USER);
}
if (firstChar != '&'
&& firstChar != '|'
&& firstChar != '!'
&& firstChar != '('
&& firstChar != ')'
&& firstChar != '@'
&& firstChar != '#')
throw new Token.IllegalTokenException("IllegalToken");
// Remove the extracted token from buffer
int tokenLen = currentToken.getToken().length();
buffer = buffer.substring(tokenLen);
}
/**
* Returns the current token extracted by {@code next()}
* **** please do not modify this part ****
*
* @return type: Token
*/
public Token current() {
return currentToken;
}
/**
* Check whether there still exists another tokens in the buffer or not
* **** please do not modify this part ****
*
* @return type: boolean
*/
public boolean hasNext() {
return currentToken != null;
}
}
Parser.java
public class Parser {
// The tokenizer (class field) this parser will use.
Tokenizer tokenizer;
StoreEngine storeEngine;
int numLBRA = 0;
/**
* Parser class constructor
* Simply sets the tokenizer field.
*/
public Parser(Tokenizer tokenizer, StoreEngine storeEngine) {
this.tokenizer = tokenizer;
this.storeEngine = storeEngine;
}
public List<PostObj> evaluate() {
return this.parseExp().evaluate();
}
/**
* for debug use
*/
public void printTokenizer() {
while (tokenizer.hasNext()) {
System.out.println("TYPE " + tokenizer.current().getType());
System.out.println("TOKEN " + tokenizer.current().getToken());
tokenizer.next();
}
}
/**
* Adheres to the grammar rule:
* <exp> ::= <term> | <term> AND <exp> | <term> OR <exp>
*
* @return type: Exp.
*/
public Exp parseExp() {
Exp term = parseTerm();
if (tokenizer.hasNext()) {
if (tokenizer.current().getType() == Token.Type.AND) {
tokenizer.next();
return new AndExp(term, parseExp());
} else if (tokenizer.current().getType() == Token.Type.OR) {
tokenizer.next();
return new OrExp(term, parseExp());
} else if (tokenizer.current().getType() == Token.Type.RBRA && numLBRA > 0) {
return term;
} else {
throw new IllegalProductionException("Illegal Exp");
}
} else {
return term;
}
}
/**
* Adheres to the grammar rule:
* <term> ::= <factor> | NOT <factor>
*
* @return type: Exp.
*/
public Exp parseTerm() {
if (tokenizer.hasNext()) {
if (tokenizer.current().getType() == Token.Type.NOT) {
tokenizer.next();
return new NotExp(parseFactor(), this.storeEngine);
} else {
return parseFactor();
}
} else {
throw new IllegalProductionException("Illegal Term");
}
}
/**
* Adheres to the grammar rule:
* <factor> ::= <user> | <tag> | ( expression )
*
* @return type: Exp.
*/
public Exp parseFactor() {
if (tokenizer.hasNext() && tokenizer.current().getType() == Token.Type.USER) {
String user = tokenizer.current().getToken();
tokenizer.next();
return new UserExp(user, this.storeEngine);
} else if (tokenizer.hasNext() && tokenizer.current().getType() == Token.Type.TAG) {
String tag = tokenizer.current().getToken();
tokenizer.next();
return new TagExp(tag, this.storeEngine);
} else if (tokenizer.hasNext() && tokenizer.current().getType() == Token.Type.LBRA) {
numLBRA++;
tokenizer.next();
if (tokenizer.hasNext()) {
Exp exp = parseExp();
if (tokenizer.hasNext() && tokenizer.current().getType() == Token.Type.RBRA) {
numLBRA--;
tokenizer.next();
return exp;
} else {
throw new IllegalProductionException("Illegal Factor");
}
} else {
throw new IllegalProductionException("Illegal Factor");
}
} else {
throw new IllegalProductionException("Illegal Factor");
}
}
/**
* The following exception should be thrown if the parse is faced with series of tokens that do not
* correlate with any possible production rule.
*/
public static class IllegalProductionException extends IllegalArgumentException {
public IllegalProductionException(String errorMessage) {
super(errorMessage);
}
}
}
Query Engine
public class QueryEngine {
StoreEngine storeEngine;
Parser parser;
public QueryEngine(List<PostObj> postObjList) {
this.storeEngine = new StoreEngine(postObjList);
}
public QueryEngine(String jsonStr) {
this.storeEngine = new StoreEngine(jsonStr);
}
/**
* given query, return matched post obj
*
* @param query the query
* @return list of post obj
*/
public List<PostObj> queryObject(String query) {
this.parser = new Parser(new Tokenizer(query), storeEngine);
return this.parser.evaluate();
}
/**
* given query, return matched text
*
* @param query the query
* @return list of text
*/
public List<String> queryText(String query) {
this.parser = new Parser(new Tokenizer(query), storeEngine);
List<PostObj> list = this.parser.evaluate();
List<String> returnedList = new ArrayList<>();
for (PostObj postObj : list) {
returnedList.add(postObj.text);
}
return returnedList;
}
}
Example Usage:
private void searchExample() {
try {
InputStream is = getAssets().open("allpostdata.json");
int size = is.available();
byte[] buffer = new byte[size];
is.read(buffer);
is.close();
String json = new String(buffer, "UTF-8");
QueryEngine queryEngine = new QueryEngine(json);
System.out.println("==============================================================");
List<String> outputs = queryEngine.queryText("#APeoplesJourney");
// outputs = queryEngine.queryText("#NoTag");
// outputs = queryEngine.queryText("#NoTag & @acommonname");
// outputs = queryEngine.queryText("#NoTag | @acommonname");
// outputs = queryEngine.queryText("!#NoTag");
outputs = queryEngine.queryText("#APeoplesJourney | (#NoTag & !@acommonname)");
// outputs = queryEngine.queryText("(#NoTag & !@acommonname) | #APeoplesJourney");
System.out.println(outputs.size());
System.out.println("==============================================================");
} catch (IOException e) {
e.printStackTrace();
}
}
Paint Functionality
PaintActivity.java
OnCreate Activity
public class PaintActivity extends AppCompatActivity {
// Permission for read and write
private static final int MY_PERMISSIONS_REQUEST_READ_EXTERNAL_STORAGE = 0;
private static final int MY_PERMISSIONS_REQUEST_WRITE_EXTERNAL_STORAGE = 0;
// views
private List<ImageView> colorButtons;
private ConstraintLayout sizeButton;
private boolean isSizeOpen;
private boolean isColorOpen;
// after pairing, jump to chat with firebase
private FirebaseAuth mAuth;
private FirebaseUser user;
private CustomWebSocket client;
private CustomWebSocket mClient;
// uuid for image transfer to ccyy.xyz
private String uuid;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_paint);
// request read and write permission
requestStoragePermission();
// get instance for firebase
mAuth = FirebaseAuth.getInstance();
user = mAuth.getCurrentUser();
client = CustomWebSocket.getInstance();
// view related
sizeButton = findViewById(R.id.size_buttons);
colorButtons = new ArrayList<>();
colorButtons.add(findViewById(R.id.blue_button));
colorButtons.add(findViewById(R.id.green_button));
colorButtons.add(findViewById(R.id.orange_button));
colorButtons.add(findViewById(R.id.pink_button));
isSizeOpen = true;
isColorOpen = true;
}
}
Functionality 1: Change paint color
/**
* change paint color
*
* @param view to click
*/
public void selectColors(View view) {
DrawView drawView = findViewById(R.id.draw_view);
if (view.getId() == R.id.blue_button) {
drawView.changeColor(getResources().getColor(R.color.blue));
} else if (view.getId() == R.id.green_button) {
drawView.changeColor(getResources().getColor(R.color.green));
} else if (view.getId() == R.id.orange_button) {
drawView.changeColor(getResources().getColor(R.color.orange));
} else if (view.getId() == R.id.pink_button) {
drawView.changeColor(getResources().getColor(R.color.pink));
}
showColors(view);
}
/**
* show or hide color panel
*
* @param view to show or hide
*/
public void showColors(View view) {
if (isColorOpen) {
for (int i = 0; i < colorButtons.size(); i++) {
ImageView colorButton = colorButtons.get(i);
colorButton.animate().translationYBy(-170f * (i + 1))
.setDuration(300)
.setInterpolator(new DecelerateInterpolator())
.start();
}
} else {
for (int i = 0; i < colorButtons.size(); i++) {
ImageView colorButton = colorButtons.get(i);
colorButton.animate().translationYBy(170f * (i + 1))
.setDuration(300)
.setInterpolator(new DecelerateInterpolator())
.start();
}
}
isColorOpen = !isColorOpen;
}
Functionality 2: Change paint size
/**
* change paint size
*
* @param view size view
*/
public void selectSizes(View view) {
DrawView drawView = findViewById(R.id.draw_view);
if (view.getId() == R.id.s_button) {
drawView.changeSize(10f);
} else if (view.getId() == R.id.m_button) {
drawView.changeSize(15f);
} else if (view.getId() == R.id.l_button) {
drawView.changeSize(20f);
} else if (view.getId() == R.id.x_button) {
drawView.changeSize(25f);
}
if (sizeButton.getVisibility() == View.VISIBLE) {
showSizes(view);
}
}
/**
* show or hide size panel
*
* @param view to show or hide
*/
public void showSizes(View view) {
if (isSizeOpen) {
sizeButton.setVisibility(View.VISIBLE);
TranslateAnimation anim = new TranslateAnimation(
Animation.ABSOLUTE, 0f,
Animation.ABSOLUTE, 0f,
Animation.RELATIVE_TO_SELF, 1f,
Animation.RELATIVE_TO_SELF, 0f
);
anim.setDuration(300);
anim.setFillAfter(true);
anim.setInterpolator(new DecelerateInterpolator());
sizeButton.startAnimation(anim);
} else {
sizeButton.setVisibility(View.GONE);
TranslateAnimation anim = new TranslateAnimation(
Animation.ABSOLUTE, 0f,
Animation.ABSOLUTE, 0f,
Animation.RELATIVE_TO_SELF, 0f,
Animation.RELATIVE_TO_SELF, 1f
);
anim.setDuration(300);
anim.setFillAfter(true);
anim.setInterpolator(new DecelerateInterpolator());
sizeButton.startAnimation(anim);
}
isSizeOpen = !isSizeOpen;
}
Functionality 3: Erase lines
/**
* erase lines
*
* @param view the erase view
*/
public void erase(View view) {
DrawView drawView = findViewById(R.id.draw_view);
drawView.erase();
}
Functionality 4: Undo lines
/**
* undo a line
*
* @param view the undo view
*/
public void undo(View view) {
DrawView drawView = findViewById(R.id.draw_view);
drawView.undo();
}
Functionality 5: Save Image
/**
* save image to media with new uuid
*
* @param view
*/
public void saveImage(View view) {
DrawView drawView = findViewById(R.id.draw_view);
drawView.setDrawingCacheEnabled(true);
uuid = UUID.randomUUID().toString();
String imageSaved = MediaStore.Images.Media.insertImage(
getContentResolver(),
drawView.createBitmap(),
uuid + ".png",
"myPainting");
if (imageSaved != null) {
Toast.makeText(getApplicationContext(), "ImageSaved", Toast.LENGTH_SHORT).show();
Handler handler = new Handler();
handler.postDelayed(new Runnable() {
// send image to ccyy.xyz
public void run() {
sendImage(uuid, imageSaved);
}
}, 1000);
} else {
Toast.makeText(getApplicationContext(), "NoImageSaved", Toast.LENGTH_SHORT).show();
}
}
Functionality 6: Upload image to backend HTTP image server
/**
* send image to ccyy.xyz
*
* @param uuid the uuid of the image
* @param imageSaved the real image
*/
public void sendImage(String uuid, String imageSaved) {
try {
// use https multipart to send png files
new MultipartUploadRequest(this, uuid, "https://ccyy.xyz/api/v2/post/" + uuid)
// get file from temp
.addFileToUpload(Environment.getExternalStorageDirectory() + "/temp.png", "file")
// POST to server
.setMethod("POST")
// listen on progress
.setDelegate(new UploadStatusDelegate() {
@Override
public void onProgress(Context context, UploadInfo uploadInfo) {
Toast.makeText(context, "Transmission in progress", Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Context context, UploadInfo uploadInfo, Exception exception) {
Toast.makeText(context, "Transmission failed", Toast.LENGTH_SHORT).show();
}
@Override
public void onCompleted(Context context, UploadInfo uploadInfo, ServerResponse serverResponse) {
Toast.makeText(context, "Transmission success", Toast.LENGTH_SHORT).show();
Handler handler = new Handler();
handler.postDelayed(new Runnable() {
// after image successfully send to ccyy.xyz using https
// tell server to predict my paint and pair
public void run() {
tellServer();
}
}, 1000);
}
@Override
public void onCancelled(Context context, UploadInfo uploadInfo) {
Toast.makeText(context, "Transmission cancel", Toast.LENGTH_SHORT).show();
}
})
// start upload()
.startUpload();
} catch (IOException e) {
e.printStackTrace();
}
}
Functionality 7: Send new friend pairing request to WebSocket Server
/**
* Websocket listener
*/
private final class EchoWebSocketListener extends WebSocketListener {
private static final int CLOSE_STATUS = 1000;
@Override
public void onOpen(WebSocket webSocket, Response response) {
// send email and uid and image uuid in json string to ccyy.xyz, to further predict and pair
webSocket.send(String.format("{\"Proto\":1,\"Proto1\":5,\"PlayerName\":\"%s\",\"PlayerId\":\"%s\",\"Img\":\"%s\"}",
user.getEmail(),
user.getUid(),
uuid + ".png"));
}
@RequiresApi(api = Build.VERSION_CODES.O)
@Override
public void onMessage(WebSocket webSocket, String message) {
// if reply coming back from websocket, decode it and parse it
// extract user and userid for chat
Base64.Decoder decoder = Base64.getMimeDecoder();
try {
String jsonString = new String(decoder.decode(message), "UTF-8");
JSONObject json = new JSONObject(jsonString);
String playerName = json.getString("PlayerName");
String playerId = json.getString("PlayerId");
Intent intent = new Intent();
intent.setClass(getApplicationContext(), ChatBoxActivity.class);
intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK);
intent.putExtra("user", playerName);
intent.putExtra("userId", playerId);
startActivity(intent);
} catch (UnsupportedEncodingException | JSONException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
print("Receive Bytes : " + bytes.hex());
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(CLOSE_STATUS, null);
print("Closing Socket : " + code + " / " + reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
print("Error : " + throwable.toString());
}
}
/**
* after sending image to server, tell server to predict and pair
*/
private void tellServer() {
// build the request
// inject a fake header to pass ccyy.xyz server's nginx checking, otherwise receive 403 forbidden :(
// use wss (ws with ssl) for security!
Request request = new Request.Builder().url("wss://ccyy.xyz/api/v3/")
.removeHeader("User-Agent")
.addHeader("Cache-Control", "no-cache")
.addHeader("Connection", "Upgrade")
.addHeader("Host", "ccyy.xyz")
.addHeader("Origin", "https://ccyy.xyz")
.addHeader("Pragma", "no-cache")
.addHeader("Upgrade", "websocket")
.addHeader("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36")
.build();
// set up my websocket request listener
EchoWebSocketListener listener = new EchoWebSocketListener();
// init websocket
WebSocket webSocket = client.getClient().newWebSocket(request, listener);
// execute my request
client.getClient().dispatcher().executorService();//.shutdown();
}
}
Helper method 1: request read / write permission for android version >= M
/**
* request permission for read and store
*/
private void requestStoragePermission() {
// only require this after android M
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
if (checkSelfPermission(Manifest.permission.READ_EXTERNAL_STORAGE) != PackageManager.PERMISSION_GRANTED) {
requestPermissions(new String[]{Manifest.permission.READ_EXTERNAL_STORAGE},
MY_PERMISSIONS_REQUEST_READ_EXTERNAL_STORAGE);
}
}
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
if (checkSelfPermission(Manifest.permission.WRITE_EXTERNAL_STORAGE) != PackageManager.PERMISSION_GRANTED) {
requestPermissions(new String[]{Manifest.permission.WRITE_EXTERNAL_STORAGE},
MY_PERMISSIONS_REQUEST_WRITE_EXTERNAL_STORAGE);
}
}
}
Helper method 2: print debug info on image.
/**
* debug print for child threads, need to print on ui threads
*
* @param message the message to print
*/
private void print(final String message) {
runOnUiThread(new Runnable() {
@Override
public void run() {
TextView textResult = findViewById(R.id.text_result);
textResult.setText(textResult.getText().toString() + "\n" + message);
}
});
}
PathLine.java
public class PathLine {
Path path;
int color;
float size;
public PathLine(Path path, int color, float size) {
this.path = path;
this.color = color;
this.size = size;
}
}
DrawView.java
public class DrawView extends View {
// default color and size
private int currentColor = getResources().getColor(R.color.blue);
private float currentSize = 10f;
private Paint myPaint;
private Path myPath;
private final List<PathLine> lines;
public DrawView(Context context) {
super(context);
lines = new ArrayList<>();
initPaint();
}
public DrawView(Context context, @Nullable AttributeSet attrs) {
super(context, attrs);
lines = new ArrayList<>();
initPaint();
}
/**
* initialise painting config
*/
public void initPaint() {
myPaint = new Paint();
myPaint.setColor(currentColor);
myPaint.setStrokeWidth(currentSize);
myPaint.setStrokeJoin(Paint.Join.ROUND);
myPaint.setStrokeCap(Paint.Cap.ROUND);
myPaint.setStyle(Paint.Style.STROKE);
}
@Override
public void onDraw(Canvas canvas) {
super.onDraw(canvas);
// draw previous lines first
for (PathLine pathLine : lines) {
myPaint.setColor(pathLine.color);
myPaint.setStrokeWidth(pathLine.size);
canvas.drawPath(pathLine.path, myPaint);
}
// draw new line
if (myPath != null) {
myPaint.setColor(currentColor);
myPaint.setStrokeWidth(currentSize);
canvas.drawPath(myPath, myPaint);
}
}
@Override
public boolean onTouchEvent(MotionEvent event) {
switch (event.getAction()) {
case MotionEvent.ACTION_DOWN:
// init path at x, y
myPath = new Path();
myPath.moveTo(event.getX(), event.getY());
break;
case MotionEvent.ACTION_MOVE:
// move to x, y
myPath.lineTo(event.getX(), event.getY());
invalidate();
break;
case MotionEvent.ACTION_UP:
// add my line to lines
lines.add(new PathLine(myPath, currentColor, currentSize));
myPath = null;
break;
}
return true;
}
}
change size
/**
* change size to size
*
* @param size the size
*/
public void changeSize(float size) {
currentSize = size;
}
change color
/**
* change color to color
*
* @param color the color
*/
public void changeColor(int color) {
currentColor = color;
}
erase
/**
* use white line to erase
*/
public void erase() {
currentColor = Color.WHITE;
}
undo
/**
* remove the last line
*/
public void undo() {
if (lines.size() > 0) {
lines.remove(lines.size() - 1);
}
invalidate();
}
createBitmap
/**
* save my lines to a bitmap and save to disk
*
* @return the bitmap
*/
public Bitmap createBitmap() {
Bitmap bitmap = Bitmap.createBitmap(this.getWidth(), this.getHeight(), Bitmap.Config.ARGB_8888);
Canvas canvas = new Canvas(bitmap);
for (PathLine pathLine : lines) {
myPaint.setColor(pathLine.color);
myPaint.setStrokeWidth(pathLine.size);
canvas.drawPath(pathLine.path, myPaint);
}
// save to temp
File file = new File(Environment.getExternalStorageDirectory() + "/temp.png");
try {
bitmap.compress(Bitmap.CompressFormat.PNG, 100, new FileOutputStream(file));
} catch (Exception e) {
e.printStackTrace();
}
return bitmap;
}
Nginx Proxy
Introduction
Nginx.conf
Currently, we have two servers:
- Image File server sitting at localhost:8000
- Pair User server sitting at localhost:8888
We use nginx to proxy the traffic to the correct server. Also in the future, we can add support of multiple server and machine rather than a single machine. Note for ssl connections like https and wss, some extra configuration is required. Meanwhile, unlike HTTP, WebSocket needs some extra parameters to upgrade as well.
server {
listen 80;
server_name ccyy.xyz;
location / {
proxy_pass http://localhost:9000;
}
location /api/v2/ {
proxy_pass http://localhost:8000;
}
location /api/v1/ {
proxy_pass http://localhost:8000;
}
location /api/v0/ {
proxy_pass http://localhost:8000;
}
location /api/v3/ {
proxy_pass http://localhost:8888;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_connect_timeout 10s;
proxy_read_timeout 3600s;
}
}
server {
listen 443;
server_name ccyy.xyz;
ssl_certificate /root/ccyy.xyz/Nginx/1_ccyy.xyz_bundle.crt;
ssl_certificate_key /root/ccyy.xyz/Nginx/2_ccyy.xyz.key;
ssl on;
ssl_session_cache shared:SSL:50m;
ssl_session_timeout 10m;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
location / {
proxy_pass http://localhost:9000;
}
location /api/v2/ {
proxy_pass http://localhost:8000;
}
location /api/v1/ {
proxy_pass http://localhost:8000;
}
location /api/v0/ {
proxy_pass http://localhost:8000;
}
location /api/v3/ {
proxy_pass http://localhost:8888;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_connect_timeout 10s;
proxy_read_timeout 3600s;
}
}
HTTP server
Introduction
Setup Server
Using GoLang, it is easy to set up a HTTP server listening at localhost port 8000 by writing.
func main() {
r := RegisterHandlers()
mh := NewMiddlewareHandler(r, 20)
http.ListenAndServe(":8000", mh)
}
Handlers
Each request from the client-side will be forwarded to one of the handlers in "RegisterHandlers" function for processing based on the URI. When all handlers are registered, we wrap the handler with a "middlewareHandler". This middleware handler does nothing special but limits the number of concurrent connections to our file HTTP server. This is usually the case when we build file servers with images and videos, where too many connection request data simultaneously, "ConnLimiter" protects our server's network from jam and failure.
Currently, the connection limit is set to 20. We will increase this limit when we have a better server with a larger bandwidth
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
// upload, stream, picture
router.GET ("/api/v0/testpage", testPageHandler)
router.POST("/api/v0/upload/:vid-id", postVideoHandler)
router.GET("/api/v0/videos/:vid-id", streamHandler)
router.GET("/api/v0/images/:iid-id", pictureHandler)
// paint project
router.POST("/api/v2/post/:iid-id", postImageHandler)
router.GET("/api/v2/images/:iid-id", paintHandler)
// bird project (not used in this course)
router.GET("/api/v1/billboard", billboardHandler)
router.GET("/api/v1/dashboard", dashboardHandler)
return router
}
type middlewareHandler struct {
r *httprouter.Router
l *ConnLimiter
}
func NewMiddlewareHandler(r *httprouter.Router, cc int) http.Handler {
m := middlewareHandler{}
m.r = r
m.l = NewConnLimiter(cc)
return m
}
func (m *middlewareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// check if go beyond our limit 20
if !m.l.GetConn() {
sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")
return
}
m.r.ServeHTTP(w, r)
defer m.l.ReleaseConn()
}
Implement a ConnLimiter
Below is a simple implementation of "ConnLimiter", basically it builds a channel as a bucket with size cc (20 in our case). If the bucket is not full, add a token 1 to our bucket and return true. Otherwise, return false. Hence, reject connection. "ReleaseConn" function on the other hand will remove a token from bucket, indicating a connection has finished. GoLang prefers CSP model for concurrency, so we use channel.
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter{
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
glog.Infoln("Reached the rate limitation")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c := <-cl.bucket
glog.Infof("A connection finished %d\n", c)
}
Handle Image Post
Images are sent via HTTP Post MultipartForm with the name "file", Unlike WebSocket, where we need to define our own protocol and message types, HTTP already defines those for us. Typically, we Post MultipartForm to send images. Later, we store the image with name provided by "Params(iid-id)" (In our case, it is the UUID generated by Android) on our server and replay to the client "UploadSuccessfully"
func postImageHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)
if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {
sendErrorResponse(w, http.StatusBadRequest, "File is too big") // 400
return
}
file, _, err := r.FormFile("file") // <form name = "file"
if err != nil {
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
data, err := ioutil.ReadAll(file)
if err != nil {
glog.Errorf("Read file error: %v\n", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
}
fn := p.ByName("iid-id") + ".png"
err = ioutil.WriteFile(IMAGE_DIR + fn, data, 0666)
if err != nil {
glog.Errorf("Write file error: %v\n", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.WriteHeader(http.StatusCreated)
io.WriteString(w, "Uploaded Successfully")
}
func sendErrorResponse(w http.ResponseWriter, sc int, errMsg string) {
w.WriteHeader(sc)
io.WriteString(w, errMsg)
}
Handle Image Get
Find the file on server given name "Params(iid-id)", and send it to client via HTTP protocol.
func paintHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
iid := p.ByName("iid-id")
vl := IMAGE_DIR + iid
image, err := os.Open(vl)
if err != nil {
glog.Errorf("Error when open file: %v\n", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error") // 500
return
}
w.Header().Set("Content-Type", "image/png")
http.ServeContent(w, r, "", time.Now(), image)
defer image.Close()
}
WebSocket server
Introduction
Setup Server
Using GoLang, it is easy to set up a WebSocket server listening at localhost port 8888 by writing.
func main() {
http.Handle("/api/v3/", websocket.Handler(ccyyHandler))
if err := http.ListenAndServe(":8888", nil); err != nil {
glog.Errorf("%s", err.Error())
}
}
Pull from connection
Each request from the client-side will be forwarded to a "ccyyHandler" function for processing. The handler will build a "PlayerConn" struct to store the connection. Recall WebSocket connection is a long and duplex connection. This means we will reuse this "ws" multiple times instead of creating it whenever we need to send data back to the server. Therefore we need to bind the "Connection" and "PlayerName" together for future use.
"PullFromClient" will constantly try to pull content from the receiving WebSocket. Each content pulled will start a new thread to process content. in GoLang the keyword "go" is used to start a new thread.
type PlayerConn struct {
PlayerName string
Connection *websocket.Conn
}
func ccyyHandler(ws *websocket.Conn) {
NetDataConnTmp := &PlayerConn{
Connection: ws,
PlayerName: "",
}
NetDataConnTmp.PullFromClient()
}
func (c *PlayerConn) PullFromClient() {
for {
var content string
if err := websocket.Message.Receive(c.Connection, &content); err != nil {
break
}
if len(content) == 0 {
break
}
go c.ProcessContent(content)
}
}
Unserialise json content
Here we try to unmarshal JSON bytes to a map object with string as key and interface{} as value, interface{} here basically means a pointer to some memory space, which we could not know its type right now. When using WebSocket, it is our coder's responsibility to define the protocol and, based on the protocol, to encode and decode the content. Here, we extract "Proto" and "Proto1" to learn what protocol we are using. Hence we could decode content accordingly
type RequestBody struct {
req string
}
func (r *RequestBody) JsonString2Map() (result map[string]interface{}, err error) {
if err = json.Unmarshal([]byte(r.req), &result); err != nil {
return nil, err
}
return result, nil
}
func (c *PlayerConn) ProcessContent(content string) {
glog.Infof("receive content: %s\n", content)
var r RequestBody
r.req = content
if data, err := r.JsonString2Map(); err != nil {
glog.Errorf("failed to process content %s\n", err.Error())
} else {
c.HandleProtocol(data["Proto"], data["Proto1"], data)
}
}
Define protocol
Before decoding content based on the protocol, let us define our protocol first XD. For a large project like this, usually, a one-layer protocol is not enough. Here we use a two-layer protocol structure with "Proto" as the primary protocol, defining the main class of the protocol and "Proto1" as the sub proto, defining the behavior under each main class. For example, "Proto" determines if the protocol is used for the Game server or DB server. "Proto1" under Game server defines the protocol between client-server communications like logging in, choosing a room, and uploading paint. For each protocol defined, we will also designate a structure (data) to send or receive. Right now, please focus on the paint protocols and structs as we have the login functionality implemented in firebase. And I will explain why it is named "GameServer" later.
const (
INIT_PROTO = iota
Game_Proto // 1, Main Protocol
Game_DB_Proto // 2, DB Protocol
)
const (
INIT_PROTO1 = iota
C2S_PlayerLogin_Proto1 // 1 C2S = Client to Server
S2C_PlayerLogin_Proto1 // 2 S2C = Server to Client
C2S_PlayerChooseRoom_Proto1 // 3
S2C_PlayerChooseRoom_Proto1 // 4
C2S_PlayerUploadPaint_Proto1 // 5
S2C_PlayerUploadPaint_Proto1 // 6
S2C_PlayerUploadPaintOther_Proto1 // 7
)
type PlayerSt struct {
UID int
PlayerName string
OpenID string
}
type C2S_PlayerLogin struct {
Proto int
Proto1 int
IType int // 1,login 2,register
Code string
}
type S2C_PlayerLogin struct {
Proto int
Proto1 int
Player PlayerSt
}
type C2S_PlayerUploadPaint struct {
Proto int
Proto1 int
PlayerName string
Img string
PlayerId string
}
type S2C_PlayerUploadPaint struct {
Proto int
Proto1 int
PlayerName string
PlayerId string
Probability float32
Prediction int
Category string
}
type S2C_PlayerUploadPaintOther struct {
Proto int
Proto1 int
PlayerName string
PlayerId string
}
Handle protocol
Now let us dispatch our data based on the protocol. We build two switches to pass our data to the correct function according to our protocol. If the data sent from the client has "Proto = Game_Proto" and "Proto1 = C2S_PlayerUplaodPaint_Proto1", then this is a message to our Upload function.
func (c *PlayerConn) HandleProtocol(protocol interface{}, protocol1 interface{}, data map[string]interface{}) {
switch protocol {
case float64(proto.Game_Proto):
c.HandleProtocol1(protocol1, data)
case float64(proto.Game_DB_Proto):
default:
glog.Errorln("failed to handle main protocol")
}
}
func (c *PlayerConn) HandleProtocol1(protocol1 interface{}, data map[string]interface{}) {
switch protocol1 {
case float64(proto1.C2S_PlayerLogin_Proto1):
c.Login(data)
case float64(proto1.C2S_PlayerChooseRoom_Proto1):
case float64(proto1.C2S_PlayerUploadPaint_Proto1):
c.UploadPaintAndPredict(data)
case float64(proto1.S2C_PlayerUploadPaint_Proto1):
default:
glog.Errorln("failed to handle sub protocol 1")
}
}
Upload and Predict
Finally, we are going to start implementing our core logic. The first is to extract "PlayerName", "PlayerId", "Img" from the data using reflection. "PlayerName" is the email registered on firebase. "PlayerId" is the UID generated by firebase when creating the user. These two values are helpful for pairing and starting a new chat with each other. The third is the "Img", which is the UUID of the user drawings uploaded with HTTP server [check HTTP Server section]. Then, we called predict to get the probability, index, and category [check AI Serving section]. Lastly, we start a new thread for pairing based on the prediction.
func (c *PlayerConn) UploadPaintAndPredict(data map[string]interface{}) {
if data["PlayerName"] == nil || data["PlayerId"] == nil || data["Img"] == nil {
return
}
playerName := data["PlayerName"].(string)
playerId := data["PlayerId"].(string)
image := data["Img"].(string)
c.PlayerName = playerName
G_PlayerData[playerName] = c
glog.Infoln("user upload paint: ", playerName, playerId, image)
probability, index, category := paint(image)
glog.Infoln("predict upload paint: ", probability, index, category)
go c.Pair(playerName, playerId, index, c.Connection, probability, category)
}
Pair and Notify
We can treat our user paring functionality as a game paring functionality we experience every day (If you play multi-player games every day like me). We send people to rooms and let them start playing together. In our case, a pairing is simply a room of two people. We first create 345 rooms (channels), since we have 345 possible prediction categories. Each room (category) is uniquely labeled with a number. Then simply add users to the rooms. Here we have two scenarios.
- If the room is empty, add me in.
- If the room is not empty, pickup the person from the room, and lets chat (send back peer's info through websocket).
Rooms are implemented using channels, since GoLang recommands CSP model for concurrency, and context are used for timeous.
type PlayerInRoom struct {
PlayerName string
PlayerId string
}
var G_Rooms [345]chan PlayerInRoom
func InitRooms() {
for i := range G_Rooms {
G_Rooms[i] = make(chan PlayerInRoom, 2)
}
}
func (c *PlayerConn) Pair (playerName string, playerId string, roomNumber int, probability float32, category string) {
timeoutCtx, cancel := context.WithTimeout(G_BaseCtx, 2*time.Second)
defer cancel()
ticker := time.NewTicker(500 * time.Millisecond)
round := 0
for range ticker.C {
select {
//case <-time.After(5 * time.Second):
case <-timeoutCtx.Done():
glog.Infoln("time up, add myself %s to the channel", playerName)
// noone in the room, add me in
G_Rooms[roomNumber] <- PlayerInRoom{
PlayerName: playerName,
PlayerId: playerId,
}
return
case playerInRoom := <-G_Rooms[roomNumber]:
glog.Infof("get other player %s at room %d, round %d", playerInRoom.PlayerName, roomNumber, round)
// send me your info
PlayerSendMessageToServer(c.Connection,
&proto1.S2C_PlayerUploadPaint{
Proto: proto.Game_Proto,
Proto1: proto1.S2C_PlayerUploadPaint_Proto1,
PlayerName : playerInRoom.PlayerName,
PlayerId: playerInRoom.PlayerId,
Probability: probability,
Prediction: roomNumber,
Category: category,
})
// send you my info
PlayerSendMessageToServer(G_PlayerData[playerInRoom.PlayerName].Connection,
&proto1.S2C_PlayerUploadPaintOther{
Proto: proto.Game_Proto,
Proto1: proto1.S2C_PlayerUploadPaintOther_Proto1,
PlayerName : playerName,
PlayerId : playerId,
})
return
default:
glog.Infof("no one responding now at round %d", round)
}
round++
}
}
func PlayerSendMessageToServer(conn *websocket.Conn, data interface{}) {
dataSend, err := json.Marshal(data)
if err != nil {
glog.Errorln("data marchal failed", err.Error())
}
glog.Infof("dataSend: %s\n", string(dataSend[:]))
if err := websocket.JSON.Send(conn, dataSend); err != nil {
glog.Errorln("send failed", err.Error())
}
}
Machine Learning
Serving
func paint(path string) (probability float32, index int, category string ){
float32s := processImage(path)
model := LoadModel("/home/jiaanguo/codespace/python/paint/models/final/", []string{"serve"}, nil)
fakeInput, _ := tf.NewTensor(float32s)
if err := fakeInput.Reshape([]int64{1, 64, 64, 1}); err != nil {
return 0, 0, ""
}
results := model.Exec([]tf.Output{
model.Op("StatefulPartitionedCall", 0),
}, map[tf.Output]*tf.Tensor{
model.Op("serving_default_input_2", 0): fakeInput,
})
predictions := results[0].Value().([][]float32)[0]
indexMap := readJson("/home/jiaanguo/codespace/python/paint/bin/classes.json")
maxP, maxI, maxC := float32(0.0), 0, ""
for i, p := range predictions {
if p > maxP {
maxP = p
maxI = i
maxC = indexMap[i]
}
}
//glog.Infoln(predictions)
glog.Infof("Max P %f\n", maxP)
glog.Infof("Max I %d\n", maxI)
glog.Infof("Max C %s\n", maxC)
return maxP, maxI, maxC
}
// Model represents a trained model
type Model struct {
saved *tf.SavedModel
}
func LoadModel(modelPath string, modelNames []string, options *tf.SessionOptions) (model *Model) {
var err error
model = new(Model)
model.saved, err = tf.LoadSavedModel(modelPath, modelNames, options)
if err != nil {
glog.Errorf("LoadSavedModel(): %v", err)
}
//log.Println("List possible ops in graphs")
//for _, operation := range model.saved.Graph.Operations() {
// log.Printf("Op name: %v", operation.Name())
//}
return model
}
// Exec executes the nodes/tensors that must be present in the loaded model
// feedDict values to feed to placeholders (that must have been saved in the model definition)
// panics on error
func (model *Model) Exec(tensors []tf.Output, feedDict map[tf.Output]*tf.Tensor) (results []*tf.Tensor) {
var err error
if results, err = model.saved.Session.Run(feedDict, tensors, nil); err == nil {
return results
}
panic(err)
}
// Op extracts the output in position idx of the tensor with the specified name from the model graph
func (model *Model) Op(name string, idx int) tf.Output {
op := model.saved.Graph.Operation(name)
if op == nil {
glog.Errorf("op %s not found", name)
}
nout := op.NumOutputs()
if nout <= idx {
glog.Errorf("op %s has %d outputs. Requested output number %d", name, nout, idx)
}
return op.Output(idx)
}
func readJson(path string) map[int]string {
indexMap := make(map[int]string)
classMap := make(map[string]int)
str, err := ioutil.ReadFile(path)
if err != nil {
glog.Errorf("File not found: %s", path)
}
err = json.Unmarshal(str, &classMap)
if err != nil {
glog.Errorf("Unable to marshal: %v", err)
}
for k, v := range classMap {
indexMap[v] = k
}
return indexMap
}
func processImage(path string) [4096]float32 {
filename := path
srcImg := gocv.IMRead(filename, gocv.IMReadColor)
if srcImg.Empty() {
glog.Errorf("Error reading image from: %v\n", filename)
return [4096]float32{}
}
defer srcImg.Close()
dstImg0 := gocv.NewMat()
dstImg1 := gocv.NewMat()
dstImg2 := gocv.NewMat()
dstImg3 := gocv.NewMat()
dstImg4 := gocv.NewMat()
dstImg5 := gocv.NewMat()
defer dstImg0.Close()
defer dstImg1.Close()
defer dstImg2.Close()
defer dstImg3.Close()
defer dstImg4.Close()
defer dstImg5.Close()
maxY := srcImg.Size()[0]
maxX := srcImg.Size()[1]
// crop a square matrix
dstImg0 = srcImg.Region(image.Rect(0, maxY/2 - maxX/2, maxX, maxY/2 + maxX/2))
// resize to dataset size
size1 := image.Point{X:256.0,Y:256.0}
gocv.Resize(dstImg0, &dstImg1, size1, 0, 0, gocv.InterpolationArea)
// resize to network size
gocv.CvtColor(dstImg1, &dstImg2, gocv.ColorRGBToGray)
gocv.BitwiseNot(dstImg2, &dstImg3)
size2 := image.Point{X:64.0,Y:64.0}
gocv.Resize(dstImg3, &dstImg4, size2,0, 0, gocv.InterpolationLanczos4)
gocv.Threshold(dstImg4, &dstImg5, 50, 255, gocv.ThresholdBinary | gocv.ThresholdOtsu)
//glog.Infoln(dstImg5.Type())
uint8s, err := dstImg5.DataPtrUint8()
if err != nil {
glog.Errorf("Unable to extract floats: %v", err)
}
float32s := [4096]float32{}
for i, v := range uint8s {
float32s[i] = float32(v) / 255.0
}
//glog.Infoln(float32s)
gocv.IMWrite("resized0.png", dstImg0)
gocv.IMWrite("resized1.png", dstImg1)
gocv.IMWrite("resized2.png", dstImg2)
gocv.IMWrite("resized3.png", dstImg3)
gocv.IMWrite("resized4.png", dstImg4)
gocv.IMWrite("resized5.png", dstImg5)
return float32s
}
Training
DataSet Google QuickDraw
Code (Accuracy only reaches 61%, still need a lot of improvements later.)
Some preprocessing code adapted from here
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
def get_available_gpus():
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos if x.device_type == 'GPU']
############################### Global Variable ###############################
name = "/home/jiaanguo/codespace/python/paint/models"
path = "/home/jiaanguo/codespace/python/paint/bin"
G = 1
################################################################################
def unpack_drawing(file_handle):
key_id, = unpack('Q', file_handle.read(8))
countrycode, = unpack('2s', file_handle.read(2))
recognized, = unpack('b', file_handle.read(1))
timestamp, = unpack('I', file_handle.read(4))
n_strokes, = unpack('H', file_handle.read(2))
image = []
for i in range(n_strokes):
n_points, = unpack('H', file_handle.read(2))
fmt = str(n_points) + 'B'
x = unpack(fmt, file_handle.read(n_points))
y = unpack(fmt, file_handle.read(n_points))
image.append((x, y))
return {
'key_id': key_id,
'countrycode': countrycode,
'recognized': recognized,
'timestamp': timestamp,
'image': image
}
def distance(a, b):
return np.power((np.power((a[0] - b[0]), 2) + np.power((a[1] - b[1]), 2)), 1. / 2)
def norm(image):
return image.astype('float32') / 255.
def global_min_max(coords):
x, y = [], []
# coords [((x1, x2, x3), (y1, y2, y3)), ((x1, x2), (y1, y2)) ... ]
for i in range(len(coords)):
# x = [x1, x2, x3] min max
x.append(int(min(coords[i][0])))
x.append(int(max(coords[i][0])))
# y = [y1, y2, y3] min max
y.append(int(min(coords[i][1])))
y.append(int(max(coords[i][1])))
# global min: min of min, global max: max of max
return min(x), max(x), min(y), max(y)
class QDPrep:
def __init__(self, path, to_drop, random_state=42, chunk_size=64, max_dataset_size=1000000, trsh=100, normed=True,
train_portion=0.9, k=0.05, min_points=10, min_edges=3, dot_size=3, offset=5, img_size=(64, 64)):
# pseudo random number generator
self.prng = RandomState(random_state)
# dot_size = 3
self.dot_size = dot_size
# offset 5 + 3 // 2 (// return integer)
self.offset = offset + dot_size // 2
# thre 100
self.trsh = trsh
# normalisation = true
self.normed = normed
# image size = (64, 64)
self.img_size = img_size
# max_dataset_size = 1,000,000 (1 million)
self.max_dataset_size = max_dataset_size
# train_portion = 1,000,000 * 0.9
self.train_portion = int(max_dataset_size * train_portion)
# min_edges = 3
self.min_edges = min_edges
# min_points = 3
self.min_points = min_points
# path to ? = /home/shapes/first_level/quickdraw
# /home/jiaanguo/codespace/python/paint/bin
self.path = path
# k = 0.05
self.k = k
# chunk_size = 64
self.chunk_size = chunk_size
# ['train', 'bottlecap', 'beard', 'dishwasher', 'The Mona Lisa', 'sun', 'shovel', ... ] 345 classes
# glob.glob() 返回所有匹配的文件路径列表。
self.classes = [f.split('/')[-1].split('.')[0] for f in glob.glob(os.path.join(self.path, '*.bin'))]
# drop unwanted classes
self.classes = {k: i for i, k in enumerate(self.classes) if k not in to_drop}
# images per class 1,000,000 // 345 = 2898 (// return integer)
self.images_per_class = max_dataset_size // len(self.classes)
# {'train':0, 'bottlecap':1, 'beard':2, 'dishwasher':3, 'The Mona Lisa':4, 'sun':5, ... } key-class, value-index
with open(self.path + '/classes.json', 'w') as f:
json.dump(self.classes, f)
self.names = []
self.binaries = {}
for key in tqdm(self.classes, desc='read classes binaries', ascii=True):
# unpack images_per_class drawings for each class
# for each drawing, represented by lines
# i['image'] = [((53, 56), (255, 110)), ((56, 61, 4, 0, 28, 75, 182, 187), (255, 97, 91, 35, 2, 0, 9, 18))]
self.binaries[key] = [i['image'] for i in list(self.unpack_drawings('%s/%s.bin' % (self.path, key)))]
# ['train_0', 'train_1', 'train_2', ... 'train_images_per_class-1']
self.names.extend([key + '_' + str(i) for i in range(len(self.binaries[key]))])
# shuffle names
self.prng.shuffle(self.names)
print(" [INFO] %s files & %s classes prepared" % (len(self.names), len(self.classes)))
# [INFO] 1000155 files & 345 classes prepared
# unpack images_per_class drawings from *.bin file
def unpack_drawings(self, filename):
with open(filename, 'rb') as f:
i = 0
while i <= self.images_per_class:
i += 1
try:
yield unpack_drawing(f)
except struct.error:
break
def OHE(self, y):
if type(y) != int:
ohe = np.zeros((len(y), len(self.classes)))
ohe[np.arange(len(y)), y.astype('int64')] = 1
else:
ohe = np.zeros(len(self.classes))
ohe[y] = 1
return ohe
def quickdraw_coords2img(self, image):
image = np.array([[list(j) for j in i] for i in image])
if self.img_size:
min_dists, dists = {}, [[] for i in range(len(image))]
for i in range(len(image)):
for j in range(len(image[i][0])):
dists[i].append(distance([0, 0], [image[i][0][j], image[i][1][j]]))
min_dists[min(dists[i])] = i
min_dist = min(list(min_dists.keys()))
min_index = min_dists[min_dist]
start_point = [image[min_index][0][dists[min_index].index(min_dist)],
image[min_index][1][dists[min_index].index(min_dist)]]
for i in range(len(image)):
for j in range(len(image[i][0])):
image[i][0][j] = image[i][0][j] - start_point[0]
image[i][1][j] = image[i][1][j] - start_point[1]
min_x, max_x, min_y, max_y = global_min_max(image)
scaleX = ((max_x - min_x) / (self.img_size[0] - (self.offset * 2 - 1)))
scaleY = ((max_y - min_y) / (self.img_size[1] - (self.offset * 2 - 1)))
for i in range(len(image)):
for j in range(len(image[i][0])):
image[i][0][j] = image[i][0][j] / scaleX
image[i][1][j] = image[i][1][j] / scaleY
min_x, max_x, min_y, max_y = global_min_max(image)
img = Image.new("RGB", (max_x - min_x + self.offset * 2, max_y - min_y + self.offset * 2), "white")
draw = ImageDraw.Draw(img)
for j in range(len(image)):
for i in range(len(image[j][0]))[1:]:
x, y = image[j][0][i - 1], image[j][1][i - 1]
x_n, y_n = image[j][0][i], image[j][1][i]
x -= min_x - self.offset;
y -= min_y - self.offset
x_n -= min_x - self.offset;
y_n -= min_y - self.offset
draw.line([(x, y), (x_n, y_n)], fill="black", width=self.dot_size)
if self.img_size:
return {'img': img, 'scaleX': scaleX, 'scaleY': scaleY, 'start_point': start_point}
return {'img': img}
def run_generator(self, val_mode=False):
pics, targets, i, n = [], [], 0, 0
lims = [0, self.train_portion]
if val_mode:
lims = [self.train_portion, None]
length = len(self.names[lims[0]:lims[1]])
N = length // self.chunk_size
while True:
for name in self.names[lims[0]:lims[1]]:
class_name, no = name.split('_')
target = self.classes[class_name]
coords = self.binaries[class_name][int(no)]
img = np.array(self.quickdraw_coords2img(coords)['img'])
img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
img = cv2.bitwise_not(img)
img = cv2.resize(img, self.img_size, Image.LANCZOS)
img = cv2.threshold(img, self.trsh, 255,
cv2.THRESH_BINARY | cv2.THRESH_OTSU)[1]
if self.normed:
img = norm(img)
img = img[:, :, np.newaxis]
pics.append(img)
targets.append(self.OHE(target))
i += 1
if n == N and i == (length % self.chunk_size):
yield (np.array(pics), np.array(targets))
elif i == self.chunk_size:
out_pics, out_target = np.array(pics), np.array(targets)
pics, targets, i = [], [], 0
n += 1
yield (out_pics, out_target)
if __name__ == '__main__':
print("[INFO] GPU devices:%s" % get_available_gpus())
try:
# recursive delete model directory
rmtree(name)
except:
pass
# recreate model directory
os.mkdir(name)
################################################################################
batch_size = 64 * G
num_epochs = 15 # 15
img_size = (64, 64)
network = 'MobileNetV2' # 'InceptionV3' or 'MobileNetV2'
params = {
'include_top': True,
'weights': None,
'input_tensor': Input(shape=img_size + (1,)) # shape=(None, 64, 64, 1) dtype=float32
}
reader = QDPrep(path, [], random_state=42, chunk_size=batch_size,
max_dataset_size=1000000, trsh=100, normed=True,
train_portion=0.9, k=0.05, min_points=10,
min_edges=3, dot_size=3, offset=5, img_size=img_size)
################################################################################
num_classes = len(reader.classes)
params['classes'] = num_classes
if G <= 1:
print("[INFO] training with 1 GPU...")
# network = 'MobileNetV2', params = params
# multi_model = get_model(network, params)
# model_json = multi_model.to_json()
multi_model = tf.keras.applications.MobileNetV2(
input_shape=None,
alpha=1.0,
include_top=True,
weights=None,
input_tensor=Input(shape=img_size + (1,)),
pooling=None,
classes=num_classes,
classifier_activation="softmax",
)
model_json = multi_model.to_json()
with open(name + "/model.json", "w") as json_file:
json_file.write(model_json)
adam = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, decay=0.0, clipnorm=5)
multi_model.compile(optimizer=adam, loss='categorical_crossentropy',
metrics=["accuracy", lambda x, y: top_k_categorical_accuracy(x, y, 5)])
multi_model.summary()
with open(name + '/model_summary.txt', 'w') as f:
multi_model.summary(print_fn=lambda x: f.write(x + '\n'))
train_steps = reader.train_portion // batch_size
val_steps = (reader.max_dataset_size - reader.train_portion) // batch_size
checkpoint = ModelCheckpoint(name + '/checkpoint_weights.hdf5', monitor='val_loss', verbose=1,
save_best_only=True, mode='min', save_weights_only=False)
clr = CyclicLR(base_lr=0.001, max_lr=0.006, step_size=train_steps * 2, mode='exp_range', gamma=0.99994)
print("[INFO] training network...")
H = multi_model.fit_generator(reader.run_generator(val_mode=False),
steps_per_epoch=train_steps, epochs=num_epochs, shuffle=False, verbose=1,
validation_data=reader.run_generator(val_mode=True), validation_steps=val_steps,
use_multiprocessing=False, workers=1, callbacks=[checkpoint, clr])
# Use TF to save the graph model instead of Keras save model to load it in Golang
tf.saved_model.save(multi_model, name + "/final")
multi_model.save_weights(name + "/final_weights.h5")
multi_model.save(name + "/final_model.hdf5")
pickle.dump(H.history, open(name + '/loss_history.pickle.dat', 'wb'))
print("[INFO] Finished!")
Concurrency
Google I/O 2012 - Go Cucurrency Patterns by Rob Pike
Introduction
Concurrency is the composition of independently executing computations
Concurrency is a way to structure software, particularly as a way to write clean code that interacts well with the real world.
it is not parallelism, if you have only one processor, your program can still be concurrent but it cannot be parallel.
History
Concurrency features are rooted in a long history, reaching back to Hoare's CSP in 1978 and even Dijkstra's guarded commands (1975).
Compared to Erlang, Erlang also uses CSP, but erlang communicates to a process by name rather than over a channel, (Erlang writing to a file by name (process) vs Go writing to a file descriptor (channel))
Example
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
go boring("boring!")
fmt.Println("I'm listening.")
time.Sleep(2 * time.Second)
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
Goroutines
What it is a goroutine? It's an independently executing function, launched by a go statement.
It has its own call stack, which grows and shrinks as required.
It's very cheap. It's practical to have thousands, even hundreds of thousands of goroutines.
It is not a thread
There might by only one thread in a program with thousands of goroutines
Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.
But if you think of it as a very cheap thread, you won't be far off
Channels
A channel in Go provides a connection between two goroutines, allowing them to communicate
// Declaring and initializing
var c chan int
c = make(chan int)
// or
c := make(chan int)
// Sending on a channel
c <- 1
// Receiving from a channel.
// The "arrow" indicates the direction of data flow.
value = <-c
Using channels
func main() {
c := make(chan string)
go boring("boring!", c)
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string, c chan string) {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
When the main function executes <-c, it will wait for a value to be sent. (blocking)
Similarly, when the boring function executes c <- value, it waits for a receiver to be ready. (blocking)
A sender and receiver must both be ready to play their part in the communication. Other we wait until they are.
Thus channels both communicate and synchronize.
Note for experts: Go channels can also be created with a buffer.
Buffering removes synchronization!
Buffering makes them more like Erlang's mailboxes.
Don't communicate by sharing memory, share memory by communicating.
Patten
Generator: function that returns a channel
func main() {
c := boring("boring!")
for i := 0; i < 5; i++ {
fmt.Println("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // return the channel to the caller.
}
this can be used to construct services!
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann)
}
fmt.Println("You're both boring; I'm leaving.")
}
However, in the code above, joe always goes before ann because of synchronization
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } } ()
go func() { for { c <- <-input2 } } ()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}
This time, you have an extra channel to aggregate (fan in) all messages
type MessageR struct {
str string
wait chan bool // embedding a sender controller channel to the receiver
}
func boringR(msg string) <-chan MessageR { // Returns receive-only channel of strings.
c := make(chan MessageR)
waitForIt := make(chan bool)
go func() {
for i := 0; ; i++ {
c <- MessageR{ fmt.Sprintf("%s %d", msg, i), waitForIt}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
<-waitForIt
}
}()
return c // return the channel to the caller.
}
func fanInR(input1, input2 <-chan MessageR) <-chan MessageR {
c := make(chan MessageR)
go func() { for { c <- <-input1 } } ()
go func() { for { c <- <-input2 } } ()
return c
}
func main() {
c := fanInR(boringR("Joe"), boringR("Ann"))
for i := 0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true
msg2.wait <- true
}
fmt.Println("You're both boring; I'm leaving.")
}
We could embed a sender controller channel to the receiver, to control the send behaviour
Select
The select statement provides another way to handle multiple channels. It's like a switch, but each case is a communication:
* All channels are evaluated. * Selection blocks until one communication can proceed, which then does. * If multiple can proceed, select choose pseudo-randomly. * A default clause, if present, executes immediately if no channel is ready.
select {
case v1:= <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2:= <-c2:
fmt.Printf("received %v from c2\n", v2)
case c3 <- 23:
fmt.Printf("send %v to c3\n", 23)
default:
fmt.Printf("no one was ready to communicate\n")
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}
time.After function returns a channel that blocks for the specified duration. After the interval, the channel delivers the current time, once.
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <- time.After(1 * time.Second):
fmt.Println("You are too slow.")
return
}
}
}
global for loop timeout instead of timeout for each message
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
Simple Q/A
How do you implement your search functionality?
We first read in all the data (in our case, all the posts), and build a postObj for each entry. Then, we create an index using B+Tree, with the key to be the username or tag, while the value is the PostObj. We wrap all those functions into our store engine. Then we build a query engine to implement the actual query logic with the help of a tokenizer and parser. The logic is blablabla. So the user could have queries like e.g., #NoTag & @User.
How do you realise your B+Tree, what is the difference to BTree?
B+Tree is a special version of BTree with some modifications. There are two major differences. First, compared with BTree, BTree has distinct LeafNode and TreeNode(Non-leaf node), Only Treenode has children, while Leafnode doesn't, But the leaf node has connections in between. So, we implement it as prev and next. Second, for the split, the split-leaf node does not go up, still has a position in the left leaf node. It stays at the bottom. This enables faster range search and storing multiple data with the same key.
What is the key of you B+Tree, and what is the value?
key is user or tag, value is the PostObj
Why do you design you indexPostObj in such way?
Because we have a user tree and tag tree, both user tree and tag tree have the same index type which is string so we only need one indexpostobj, the compare to method defines how we compare our keys, which in our case, just use default string comparison.
How do you add a value to your B+ Tree?
starts from root, go to left if key is smaller or equal to and go right if key is larger until reach the leaf node, find the correct spot to insert it, if size of the keys after insertion equals to the order, split and go upwards
How do you get a value from your B+ Tree?
starts from root, go to left if key is smaller or equal to and go right if key is larger until reach the leaf node, find the first match, and iteratively go through consecutive nodes until find a different.
Why do you use a b+tree?
we can have faster range search, consecutive memory storage leads to faster read, etc.....
How do you implement your And operator?
take first arg get a set (no duplication) take second arg get another set first set retainall second set
Kubernetes Installation
Kubernetes First App
Init
export KUBECONFIG=/etc/kubernetes/admin.conf
Nginx
Create nginx-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- image: nginx
name: nginx
Apply nginx-deploy.yaml
kubectl apply -f nginx-deploy.yaml
Check deployment
kubectl get deployment.apps/nginx
Describe deployment
kubectl describe deployment.apps/nginx
Results
Name: nginx
Namespace: default
CreationTimestamp: Fri, 17 Dec 2021 19:48:51 +0800
Labels: <none>
Annotations: deployment.kubernetes.io/revision: 1
Selector: app=nginx
Replicas: 1 desired | 1 updated | 1 total | 0 available | 1 unavailable
StrategyType: RollingUpdate
MinReadySeconds: 0
RollingUpdateStrategy: 25% max unavailable, 25% max surge
Pod Template:
Labels: app=nginx
Containers:
nginx:
Image: nginx
Port: <none>
Host Port: <none>
Environment: <none>
Mounts: <none>
Volumes: <none>
Conditions:
Type Status Reason
---- ------ ------
Available False MinimumReplicasUnavailable
Progressing True ReplicaSetUpdated
OldReplicaSets: <none>
NewReplicaSet: nginx-6799fc88d8 (1/1 replicas created)
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal ScalingReplicaSet 79s deployment-controller Scaled up replica set nginx-6799fc88d8 to 1
From the result, we can see a replica set was created by deployment-controller
Check replica set
kubectl get replicaset nginx-6799fc88d8
Describe replica set
kubectl describe replicaset nginx-6799fc88d8
Result
Name: nginx-6799fc88d8
Namespace: default
Selector: app=nginx,pod-template-hash=6799fc88d8
Labels: app=nginx
pod-template-hash=6799fc88d8
Annotations: deployment.kubernetes.io/desired-replicas: 1
deployment.kubernetes.io/max-replicas: 2
deployment.kubernetes.io/revision: 1
Controlled By: Deployment/nginx
Replicas: 1 current / 1 desired
Pods Status: 1 Running / 0 Waiting / 0 Succeeded / 0 Failed
Pod Template:
Labels: app=nginx
pod-template-hash=6799fc88d8
Containers:
nginx:
Image: nginx
Port: <none>
Host Port: <none>
Environment: <none>
Mounts: <none>
Volumes: <none>
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreate 3m29s replicaset-controller Created pod: nginx-6799fc88d8-5pvr8
From the result, we can see a pod was created by replicaset-controller
Check the pod
kubectl get pod nginx-6799fc88d8-5pvr8 -owide
Result
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
nginx-6799fc88d8-5pvr8 1/1 Running 0 5m10s 10.244.145.65 ecs-75909-0003 <none> <none>
Validate the pod
curl 10.244.145.65
Result
<!DOCTYPE html>
<html>
<head>
<title>Welcome to nginx!</title>
<style>
html { color-scheme: light dark; }
body { width: 35em; margin: 0 auto;
font-family: Tahoma, Verdana, Arial, sans-serif; }
</style>
</head>
<body>
<h1>Welcome to nginx!</h1>
<p>If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.</p>
<p>For online documentation and support please refer to
<a href="http://nginx.org/">nginx.org</a>.<br/>
Commercial support is available at
<a href="http://nginx.com/">nginx.com</a>.</p>
<p><em>Thank you for using nginx.</em></p>
</body>
</html>
Replica Set
Change replica set to 3
kubectl edit deploy nginx
Check pods
kubectl get pods -owide
Result (NOTE: not balanced on three machines)
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
nginx-6799fc88d8-5pvr8 1/1 Running 0 46m 10.244.145.65 ecs-75909-0003 <none> <none>
nginx-6799fc88d8-t7rx5 1/1 Running 0 2m14s 10.244.145.66 ecs-75909-0003 <none> <none>
nginx-6799fc88d8-vbjvc 1/1 Running 0 2m14s 10.244.145.67 ecs-75909-0003 <none> <none>
Service
Get a selector
kubectl get pods --show-labels
Result
NAME READY STATUS RESTARTS AGE LABELS
nginx-6799fc88d8-5pvr8 1/1 Running 0 51m app=nginx,pod-template-hash=6799fc88d8
nginx-6799fc88d8-t7rx5 1/1 Running 0 6m58s app=nginx,pod-template-hash=6799fc88d8
nginx-6799fc88d8-vbjvc 1/1 Running 0 6m58s app=nginx,pod-template-hash=6799fc88d8
Expose 3 pods as a single service
kubectl expose deploy nginx --selector app=nginx --port=80 --type=NodePort
Get service
kubectl get service
Result
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.1.0.1 <none> 443/TCP 36d
nginx NodePort 10.1.25.160 <none> 80:30884/TCP 2m46s
Validate result
curl 10.1.25.160
Result
<!DOCTYPE html>
<html>
<head>
<title>Welcome to nginx!</title>
<style>
html { color-scheme: light dark; }
body { width: 35em; margin: 0 auto;
font-family: Tahoma, Verdana, Arial, sans-serif; }
</style>
</head>
<body>
<h1>Welcome to nginx!</h1>
<p>If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.</p>
<p>For online documentation and support please refer to
<a href="http://nginx.org/">nginx.org</a>.<br/>
Commercial support is available at
<a href="http://nginx.com/">nginx.com</a>.</p>
<p><em>Thank you for using nginx.</em></p>
</body>
</html>
Kubeadm, kubectl, kubelet
Check kubectl logs
kubectl get ns default -v 9
Check config file
cat /etc/kubernetes/admin.conf
Add key from aliyun
curl https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg | sudo apt-key add -
Add source form aliyun
sudo apt-add-repository "deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main"
Update again
sudo apt-get update
Install all
sudo apt install kubelet kubeadm kubectl
Disable auto update
sudo apt-mark hold kubeadm kubelet kubectl
Test kub...
kubelet --version
Disable swap
sudo swapoff -a
Free
free -h
Add conf
vim /etc/systemd/system/kubelet.service.d/10-kubeadm.conf
Add line
Environment="KUBELET_SYSTEM_PODS_ARGS=--pod-manifest-path=/etc/kubernetes/manifests --allow-privileged=true --fail-swap-on=false"
Reload daemon
systemctl daemon-reload
Restart kubelet
systemctl restart kubelet
Kubeadm init on master
kubeadm init --image-repository registry.aliyuncs.com/google_containers --pod-network-cidr=10.244.0.0/16 --service-cidr=10.1.0.0/16 --kubernetes-version v1.22.3
Export
export KUBECONFIG=/etc/kubernetes/admin.conf
Join on other node
kubeadm join 192.168.0.18:6443 --token 2pwout.v81y9ss6f4fjed87 \
--discovery-token-ca-cert-hash sha256:20bfbde6985f6337e7585742572707b72b4dd4da6c1a4af9260c8295129c8d01
Validate system
watch kubectl get pods -n kube-system
Check nodes
kubectl get nodes -o wide
Example results
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
ecs-75909-0001 Ready <none> 123m v1.22.3 192.168.0.61 <none> Ubuntu 18.04.5 LTS 4.15.0-136-generic docker://20.10.10
ecs-75909-0002 Ready control-plane,master 136m v1.22.3 192.168.0.18 <none> Ubuntu 18.04.5 LTS 4.15.0-136-generic docker://20.10.10
ecs-75909-0003 Ready <none> 122m v1.22.3 192.168.0.67 <none> Ubuntu 18.04.5 LTS 4.15.0-136-generic docker://20.10.10
Calico (Only on master)
Create tigera-operator.yaml
kubectl create -f https://docs.projectcalico.org/manifests/tigera-operator.yaml
Download file
https://docs.projectcalico.org/manifests/custom-resources.yaml
Change cidr ip to 10.244.0.0/16 in the file
Create pods
kubectl create -f custom-resources.yaml
Validate system
watch kubectl get pods -n calico-system
Validate all
watch kubectl get pods --all-namespaces
Example results
NAMESPACE NAME READY STATUS RESTARTS AGE
calico-apiserver calico-apiserver-6fb4f77d54-ljbgg 1/1 Running 0 116m
calico-apiserver calico-apiserver-6fb4f77d54-wxx96 1/1 Running 0 116m
calico-system calico-kube-controllers-7bbdc8dbd7-s9xrb 1/1 Running 0 124m
calico-system calico-node-b6rg5 1/1 Running 0 117m
calico-system calico-node-dsx2r 1/1 Running 0 116m
calico-system calico-node-t4xwh 1/1 Running 0 124m
calico-system calico-typha-879f64854-jbh8c 1/1 Running 0 116m
calico-system calico-typha-879f64854-rmgdw 1/1 Running 0 124m
kube-system coredns-7f6cbbb7b8-76dwt 1/1 Running 0 130m
kube-system coredns-7f6cbbb7b8-f74v2 1/1 Running 0 130m
kube-system etcd-ecs-75909-0002 1/1 Running 0 131m
kube-system kube-apiserver-ecs-75909-0002 1/1 Running 0 131m
kube-system kube-controller-manager-ecs-75909-0002 1/1 Running 0 131m
kube-system kube-proxy-8vzwg 1/1 Running 0 117m
kube-system kube-proxy-jchsf 1/1 Running 0 116m
kube-system kube-proxy-jnkbs 1/1 Running 0 130m
kube-system kube-scheduler-ecs-75909-0002 1/1 Running 0 131m
tigera-operator tigera-operator-78b8976b89-6h86f 1/1 Running 0 130m
Taint master (Optional)
kubectl taint nodes --all node-role.kubernetes.io/master-
Debug
Reset kubeadm
kubeadm reset -f
Remove pod
kubectl delete -f https://docs.projectcalico.org/manifests/custom-resources.yaml