Skip to content

ixb

Functions for integrating with the Atlas Copco IxB tool

close(sock, send)

Disconnect from the tool.

Parameters:

Name Type Description Default
sock SocketType

A socket object connected to the tool.

required
send Callable[[str], None]

A function to send a message to the tool with the connection information prepopulated.

required
Source code in lat_alignment/ixb.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def close(sock: socket.SocketType, send: Callable[[str], None]):
    """
    Disconnect from the tool.

    Parameters
    ----------
    sock : socket.SocketType
        A socket object connected to the tool.
    send : Callable[[str], None]
        A function to send a message to the tool with
        the connection information prepopulated.
    """
    send(MID3)
    sock.close()

connect(host, port=4545, timeout=5, cur_try=0, max_retry=5)

Connect to the open protocol port in the tool.

Parameters:

Name Type Description Default
host str

The IP address of the IxB tool.

required
port int

The port that open protocol is running at.

4545
timeout float

The time in seconds to set the timeout on network operations when communicating.

5
cur_try int

The attempt at connecting we are at.

0
max_retry int

The maximum number of attempts to connect we should try.

5

Returns:

Name Type Description
sock SocketType

A socket object connected to the tool.

Source code in lat_alignment/ixb.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def connect(
    host: str,
    port: int = 4545,
    timeout: float = 5,
    cur_try: int = 0,
    max_retry: int = 5,
) -> socket.SocketType:
    """
    Connect to the open protocol port in the tool.

    Parameters
    ----------
    host : str
        The IP address of the IxB tool.
    port : int, default: 4545
        The port that open protocol is running at.
    timeout : float, default: 5
        The time in seconds to set the timeout on
        network operations when communicating.
    cur_try : int, default: 0
        The attempt at connecting we are at.
    max_retry : int, default: 5
        The maximum number of attempts to connect we should try.

    Returns
    -------
    sock : socket.SocketType
        A socket object connected to the tool.
    """
    if cur_try >= max_retry:
        raise RecursionError("Maximum attemps to connect exceeded!")
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        sock.connect((host, port))
        return sock
    except OSError:
        sock.close()
        return connect(host, port, cur_try + 1, max_retry)

construct_2500(pset, prog)

Construct a message to update a program via MID 2500.

Parameters:

Name Type Description Default
pset int

The id of the program to update.

required
prog dict[str, Any]

The program in dict form. This will be dumped to a yaml and sent.

required

Returns:

Name Type Description
message str

The message to send to the tool.

Source code in lat_alignment/ixb.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def construct_2500(pset: int, prog: dict[str, Any]) -> str:
    """
    Construct a message to update a program via MID 2500.

    Parameters
    ----------
    pset : int
        The id of the program to update.
    prog : dict[str, Any]
        The program in dict form.
        This will be dumped to a yaml and sent.

    Returns
    -------
    message : str
        The message to send to the tool.
    """
    prog_txt = json.dumps(prog, separators=(",", ":"))
    info = f"20100101000004010000000{pset:04}{len(prog_txt):06}"
    mid = "00002500002000000000"
    mid += f"{info}{prog_txt}"
    mid = f"{len(mid):04}" + mid[4:]
    return mid

construct_2501(pset)

Construct a message to request MID 2501 via MID 0006.

Parameters:

Name Type Description Default
pset int

The id of the program you want.

required

Returns:

Name Type Description
message str

The message to send to the tool.

Source code in lat_alignment/ixb.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def construct_2501(pset: int) -> str:
    """
    Construct a message to request MID 2501 via MID 0006.

    Parameters
    ----------
    pset : int
        The id of the program you want.

    Returns
    -------
    message : str
        The message to send to the tool.
    """
    mid = f"003400060010    00  "
    mid += f"250100207{pset:04}2"
    return mid

decode0002(mid, message)

Decode a message from MID 0002.

Parameters:

Name Type Description Default
mid str

The MID string. This should be '0002' or '0004'.

required
message str

The data returned by recv after sending MID1.

required

Returns:

Name Type Description
rep str

A printable string describing the message.

Source code in lat_alignment/ixb.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def decode0002(mid, message):
    """
    Decode a message from MID 0002.

    Parameters
    ----------
    mid : str
        The MID string.
        This should be '0002' or '0004'.
    message : str
        The data returned by `recv` after
        sending MID1.

    Returns
    -------
    rep : str
        A printable string describing the message.
    """
    if mid == "0004":
        return decode0004(message)
    elif mid != "0002":
        raise ValueError("Expected MID 0002 or 0004 but got MID " + mid)
    rep = f"Connected to tool {message[10:35]}"
    return rep

decode0004(message)

Decode an error message from MID 0004.

Parameters:

Name Type Description Default
message str

The message to decode.

required

Returns:

Name Type Description
rep str

A printable string describing the error.

Source code in lat_alignment/ixb.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def decode0004(message: str) -> str:
    """
    Decode an error message from MID 0004.

    Parameters
    ----------
    message : str
        The message to decode.

    Returns
    -------
    rep : str
        A printable string describing the error.
    """
    mid = message[:4]
    err = message[4:]
    rep = f"MID {mid} failed with error {err}"
    return rep

decode0041(mid, message)

Decode a message from MID 0041.

Parameters:

Name Type Description Default
mid str

The MID string. This should be '0041' or '0004'.

required
message str

The data returned by recv after sending MID40.

required

Returns:

Name Type Description
rep str

A printable string describing the message.

Source code in lat_alignment/ixb.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def decode0041(mid, message):
    """
    Decode a message from MID 0041.

    Parameters
    ----------
    mid : str
        The MID string.
        This should be '0041' or '0004'.
    message : str
        The data returned by `recv` after
        sending MID40.

    Returns
    -------
    rep : str
        A printable string describing the message.
    """
    if mid == "0004":
        return decode0004(message)
    elif mid != "0041":
        raise ValueError("Expected MID 0041 or 0004 but got MID " + mid)
    serial = message[:14]
    tightenings = message[16:26]
    cal_date = message[28:38]
    cont_serial = message[49:59]
    cal = float(message[62:67]) / 100
    service = message[69:79]
    tightenings_since = message[91:100]
    firmware = message[115:134]

    rep = f"Tool info:\n\tSerial: {serial}\n\tTotal Tightenings: {tightenings}\n\tLast Calibration: {cal_date}\n\tController Serial: {cont_serial}\n\tCalibration: {cal}\n\tLast Service: {service}\n\tTightenings Since Service: {tightenings_since}\n\tFirmware: {firmware}"
    return rep

decode2501(mid, message)

Decode a message from MID 2501.

Parameters:

Name Type Description Default
mid str

The MID string. This should be '2501' or '0004'.

required
message str

The data returned by recv after sending MID2501 via MID6.

required

Returns:

Name Type Description
info str

If MID 2501 was received then this is the non json info returned. If MID 0004 was received then this is a printable error message.

prog dict[str, Any]

A dict generated from the json of the program. If MID 0004 was received then an empty dict is returned.

Source code in lat_alignment/ixb.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def decode2501(mid, message) -> tuple[str, dict[str, Any]]:
    """
    Decode a message from MID 2501.

    Parameters
    ----------
    mid : str
        The MID string.
        This should be '2501' or '0004'.
    message : str
        The data returned by `recv` after
        sending MID2501 via MID6.

    Returns
    -------
    info : str
        If MID 2501 was received then this is the non json info returned.
        If MID 0004 was received then this is a printable error message.
    prog : dict[str, Any]
        A dict generated from the json of the program.
        If MID 0004 was received then an empty dict is returned.
    """
    if mid == "0004":
        return decode0004(message), {}
    elif mid != "2501":
        raise ValueError("Expected MID 2501 or 0004 but got MID " + mid)
    idx = message.find("{")
    if idx == -1:
        raise ValueError("JSON not found in MID 2501 response!")
    info = message[:idx]
    info = info[:-6]
    prog_txt = message[idx:]
    prog = json.loads(prog_txt)

    return info, prog

init(host, port, **kwargs)

Connect to the tool and print identifying information. Also generates convenience functions.

Parameters:

Name Type Description Default
host str

The IP address of the IxB tool.

required
port int

The port that open protocol is running at.

4545
**kwargs

Additional arguments to pass to connect.

{}
Returns
required
sock SocketType

A socket object connected to the tool.

required
send Callable[[str], None]

A function to send a message to the tool with the connection information prepopulated.

required
recd Callable[[], tuple[str, str, str, bool, bool]]

A function to receive a message from the tool with the connection information prepopulated.

required
Source code in lat_alignment/ixb.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def init(host: str, port: int, **kwargs) -> tuple[
    socket.SocketType,
    Callable[[str], None],
    Callable[[], tuple[str, str, str, bool, bool]],
]:
    """
    Connect to the tool and print identifying information.
    Also generates convenience functions.

    Parameters
    ----------
    host : str
        The IP address of the IxB tool.
    port : int, default: 4545
        The port that open protocol is running at.
    **kwargs
        Additional arguments to pass to `connect`.

    Returns:
    sock : socket.SocketType
        A socket object connected to the tool.
    send : Callable[[str], None]
        A function to send a message to the tool with
        the connection information prepopulated.
    recd : Callable[[], tuple[str, str, str, bool, bool]]
        A function to receive a message from the tool with
        the connection information prepopulated.
    """
    sock = connect(host, port, **kwargs)
    send = partial(send_mid, sock=sock, host=host, port=port)
    recv = partial(recv_mid, sock=sock)
    send(MID1)
    mid, _, dat, _, _ = recv()
    print(decode0002(mid, dat))
    send(MID40)
    mid, _, dat, _, _ = recv()
    print(decode0041(mid, dat))
    return sock, send, recv

recv_mid(sock)

Receive a message from the tool.

Parameters:

Name Type Description Default
sock SocketType

Socket to communicate with the tool.

required

Returns:

Name Type Description
mid str

The MID of the returned message.

rev str

The revision of the returned message's MID.

data str

The data in the returned message.

disconnect bool

True if a disconnect event was noticed.

timeout bool

True if a timeout event was noticed.

Source code in lat_alignment/ixb.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def recv_mid(sock: socket.SocketType) -> tuple[str, str, str, bool, bool]:
    """
    Receive a message from the tool.

    Parameters
    ----------
    sock : socket.SocketType
        Socket to communicate with the tool.

    Returns
    -------
    mid : str
        The MID of the returned message.
    rev : str
        The revision of the returned message's MID.
    data : str
        The data in the returned message.
    disconnect : bool
        True if a disconnect event was noticed.
    timeout : bool
        True if a timeout event was noticed.
    """
    keep_receiving = True
    message = ""
    timeout = False
    disconnect = False
    while keep_receiving:
        chunk = ""
        try:
            chunk = sock.recv(1024).decode()
        except socket.timeout:
            timeout = True
            break
        if chunk == "":
            disconnect = True
            break
        message += chunk
        if chunk[-1:] == chr(0):
            keep_receiving = False
    if disconnect:
        warnings.warn("Noticed a disconnect event, you may want to recconect!")
    if timeout:
        warnings.warn("Noticed a timeout event, you may want to recconect!")
    if len(message) < 12:
        raise ValueError("Incomplete header! You may need to recconect!")
    return message[4:8], message[8:11], message[22:-1], disconnect, timeout

send_mid(message, sock=None, **kwargs)

Send a message to the tool. One attempt at recconecting and sending the message is made.

Parameters:

Name Type Description Default
message str

The message to send. Should be formatted with the correct header for your MID.

required
sock Optional[SocketType]

Socket to communicate with the tool. If None a new connection is made.

None
**kwargs

Arguments to pass to connect in case we need to recconect.

{}
Source code in lat_alignment/ixb.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def send_mid(message: str, sock: Optional[socket.SocketType] = None, **kwargs):
    """
    Send a message to the tool.
    One attempt at recconecting and sending the message is made.

    Parameters
    ----------
    message : str
        The message to send.
        Should be formatted with the correct header for your MID.
    sock : Optional[socket.SocketType], default: None
        Socket to communicate with the tool.
        If `None` a new connection is made.
    **kwargs
        Arguments to pass to `connect` in case we need to recconect.
    """
    if sock is None:
        sock = connect(**kwargs)
    to_send = (message + chr(0)).encode()
    try:
        sock.sendall(to_send)
    except BrokenPipeError:
        connect(**kwargs)
        sock.sendall(to_send)